development

대기열을 사용하는 생산자 / 소비자 스레드

big-blog 2020. 12. 29. 08:19
반응형

대기열을 사용하는 생산자 / 소비자 스레드


일종의 Producer/Consumer스레딩 앱 을 만들고 싶습니다 . 그러나 둘 사이에 대기열을 구현하는 가장 좋은 방법이 무엇인지 모르겠습니다.

그래서 나는 두 가지 아이디어를 가지고 있습니다 (둘 다 완전히 틀릴 수 있습니다). 어느 쪽이 더 좋을지, 둘 다 짜증나는 경우 대기열을 구현하는 가장 좋은 방법이 무엇인지 알고 싶습니다. 내가 우려하는 것은 주로 이러한 예제에서 대기열 구현입니다. 사내 클래스이고 스레드로부터 안전한 큐 클래스를 확장하고 있습니다. 아래는 각각 4 개의 클래스가있는 두 가지 예입니다.

메인 클래스

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

소비자 클래스

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

프로듀서 클래스

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

큐 클래스

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

또는

메인 클래스

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

소비자 클래스

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

프로듀서 클래스

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

큐 클래스

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

그리고가!


Java 5+에는 이러한 종류의 작업에 필요한 모든 도구가 있습니다. 다음을 원할 것입니다.

  1. 모든 프로듀서를 하나로 모으십시오 ExecutorService.
  2. 모든 소비자를 다른 곳에 두십시오 ExecutorService.
  3. 필요한 경우 BlockingQueue.

내 경험상 불필요한 단계이기 때문에 (3)에 대해 "필요한 경우"라고 말합니다. 소비자 실행 서비스에 새 작업을 제출하기 만하면됩니다. 그래서:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

그래서이 producers직접 제출 consumers.


좋아요, 다른 사람들이 언급했듯이 가장 좋은 방법은 java.util.concurrent패키지 를 사용하는 것입니다 . "실습에서 Java 동시성"을 적극 권장합니다. 당신이 알아야 할 거의 모든 것을 다루는 훌륭한 책입니다.

특정 구현에 관해서는 주석에서 언급했듯이 생성자에서 스레드를 시작하지 마십시오. 안전하지 않을 수 있습니다.

제쳐두고 두 번째 구현이 더 좋아 보입니다. 정적 필드에 큐를 넣고 싶지 않습니다. 당신은 아마 아무것도 아닌 유연성을 잃고있을 것입니다.

자신의 구현을 계속하려면 (학습 목적으로 생각합니까?) start()적어도 방법을 제공하십시오 . 객체를 생성 한 다음 ( Thread객체를 인스턴스화 할 수 있음 ) start()스레드를 시작하기 위해 호출 해야합니다.

편집 : ExecutorService자체 대기열이 있으므로 혼동 될 수 있습니다. 시작하는 데 도움이되는 내용이 있습니다.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

추가 편집 : 생산자의 경우 대신 while(true)다음과 같은 작업을 수행 할 수 있습니다.

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

이렇게하면을 호출하여 실행기를 종료 할 수 있습니다 .shutdownNow(). 을 사용하면 while(true)종료되지 않습니다.

또한에 Producer여전히 취약합니다 RuntimeExceptions(즉 RuntimeException, 처리가 중지됨).


당신은 바퀴를 재발 명하고 있습니다.

지속성과 기타 엔터프라이즈 기능이 필요한 경우 JMS를 사용하십시오 ( ActiveMq를 권장 합니다 ).

빠른 메모리 내 대기열이 필요한 경우 java의 Queue 구현 중 하나를 사용하십시오 .

Java 1.4 또는 이전 버전을 지원해야하는 경우 Doug Lea의 우수한 동시 패키지를 사용하십시오.


작업 코드 예제에 대한 cletus 제안 답변을 확장했습니다.

  1. 하나 ExecutorService(pes)는 Producer작업을 수락 합니다.
  2. 하나 ExecutorService(ces)는 Consumer작업을 수락 합니다.
  3. 둘 다 ProducerConsumer주식 BlockingQueue.
  4. 여러 Producer작업은 다른 숫자를 생성합니다.
  5. 모든 Consumer작업은 다음에 의해 생성 된 수를 소비 할 수 있습니다.Producer

암호:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

산출:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

노트. 여러 생산자와 소비자가 필요하지 않은 경우 단일 생산자와 소비자를 유지합니다. 여러 생산자와 소비자 사이에서 BlockingQueue의 기능을 보여주기 위해 여러 생산자와 소비자를 추가했습니다.


이것은 매우 간단한 코드입니다.

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}

  1. 넣기 및 가져 오기 메소드를 동기화 한 Java 코드 "BlockingQueue".
  2. Java 코드 "Producer", 데이터를 생성하는 생산자 스레드.
  3. Java 코드 "Consumer", 생성 된 데이터를 소비하는 소비자 스레드입니다.
  4. Java 코드 "ProducerConsumer_Main", 생산자와 소비자 스레드를 시작하는 주요 기능.

BlockingQueue.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

Consumer.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}

참조 URL : https://stackoverflow.com/questions/2332537/producer-consumer-threads-using-a-queue

반응형