대기열을 사용하는 생산자 / 소비자 스레드
일종의 Producer/Consumer
스레딩 앱 을 만들고 싶습니다 . 그러나 둘 사이에 대기열을 구현하는 가장 좋은 방법이 무엇인지 모르겠습니다.
그래서 나는 두 가지 아이디어를 가지고 있습니다 (둘 다 완전히 틀릴 수 있습니다). 어느 쪽이 더 좋을지, 둘 다 짜증나는 경우 대기열을 구현하는 가장 좋은 방법이 무엇인지 알고 싶습니다. 내가 우려하는 것은 주로 이러한 예제에서 대기열 구현입니다. 사내 클래스이고 스레드로부터 안전한 큐 클래스를 확장하고 있습니다. 아래는 각각 4 개의 클래스가있는 두 가지 예입니다.
메인 클래스
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
소비자 클래스
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
프로듀서 클래스
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
큐 클래스
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
또는
메인 클래스
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
소비자 클래스
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
프로듀서 클래스
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
큐 클래스
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
그리고가!
Java 5+에는 이러한 종류의 작업에 필요한 모든 도구가 있습니다. 다음을 원할 것입니다.
- 모든 프로듀서를 하나로 모으십시오
ExecutorService
. - 모든 소비자를 다른 곳에 두십시오
ExecutorService
. - 필요한 경우
BlockingQueue
.
내 경험상 불필요한 단계이기 때문에 (3)에 대해 "필요한 경우"라고 말합니다. 소비자 실행 서비스에 새 작업을 제출하기 만하면됩니다. 그래서:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
그래서이 producers
직접 제출 consumers
.
좋아요, 다른 사람들이 언급했듯이 가장 좋은 방법은 java.util.concurrent
패키지 를 사용하는 것입니다 . "실습에서 Java 동시성"을 적극 권장합니다. 당신이 알아야 할 거의 모든 것을 다루는 훌륭한 책입니다.
특정 구현에 관해서는 주석에서 언급했듯이 생성자에서 스레드를 시작하지 마십시오. 안전하지 않을 수 있습니다.
제쳐두고 두 번째 구현이 더 좋아 보입니다. 정적 필드에 큐를 넣고 싶지 않습니다. 당신은 아마 아무것도 아닌 유연성을 잃고있을 것입니다.
자신의 구현을 계속하려면 (학습 목적으로 생각합니까?) start()
적어도 방법을 제공하십시오 . 객체를 생성 한 다음 ( Thread
객체를 인스턴스화 할 수 있음 ) start()
스레드를 시작하기 위해 호출 해야합니다.
편집 : ExecutorService
자체 대기열이 있으므로 혼동 될 수 있습니다. 시작하는 데 도움이되는 내용이 있습니다.
public class Main {
public static void main(String[] args) {
//The numbers are just silly tune parameters. Refer to the API.
//The important thing is, we are passing a bounded queue.
ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
//No need to bound the queue for this executor.
//Use utility method instead of the complicated Constructor.
ExecutorService producer = Executors.newSingleThreadExecutor();
Runnable produce = new Produce(consumer);
producer.submit(produce);
}
}
class Produce implements Runnable {
private final ExecutorService consumer;
public Produce(ExecutorService consumer) {
this.consumer = consumer;
}
@Override
public void run() {
Pancake cake = Pan.cook();
Runnable consume = new Consume(cake);
consumer.submit(consume);
}
}
class Consume implements Runnable {
private final Pancake cake;
public Consume(Pancake cake){
this.cake = cake;
}
@Override
public void run() {
cake.eat();
}
}
추가 편집 : 생산자의 경우 대신 while(true)
다음과 같은 작업을 수행 할 수 있습니다.
@Override
public void run(){
while(!Thread.currentThread().isInterrupted()){
//do stuff
}
}
이렇게하면을 호출하여 실행기를 종료 할 수 있습니다 .shutdownNow()
. 을 사용하면 while(true)
종료되지 않습니다.
또한에 Producer
여전히 취약합니다 RuntimeExceptions
(즉 RuntimeException
, 처리가 중지됨).
당신은 바퀴를 재발 명하고 있습니다.
지속성과 기타 엔터프라이즈 기능이 필요한 경우 JMS를 사용하십시오 ( ActiveMq를 권장 합니다 ).
빠른 메모리 내 대기열이 필요한 경우 java의 Queue 구현 중 하나를 사용하십시오 .
Java 1.4 또는 이전 버전을 지원해야하는 경우 Doug Lea의 우수한 동시 패키지를 사용하십시오.
작업 코드 예제에 대한 cletus 제안 답변을 확장했습니다.
- 하나
ExecutorService
(pes)는Producer
작업을 수락 합니다. - 하나
ExecutorService
(ces)는Consumer
작업을 수락 합니다. - 둘 다
Producer
및Consumer
주식BlockingQueue
. - 여러
Producer
작업은 다른 숫자를 생성합니다. - 모든
Consumer
작업은 다음에 의해 생성 된 수를 소비 할 수 있습니다.Producer
암호:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/ * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
산출:
Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1
노트. 여러 생산자와 소비자가 필요하지 않은 경우 단일 생산자와 소비자를 유지합니다. 여러 생산자와 소비자 사이에서 BlockingQueue의 기능을 보여주기 위해 여러 생산자와 소비자를 추가했습니다.
이것은 매우 간단한 코드입니다.
import java.util.*;
// @author : rootTraveller, June 2017
class ProducerConsumer {
public static void main(String[] args) throws Exception {
Queue<Integer> queue = new LinkedList<>();
Integer buffer = new Integer(10); //Important buffer or queue size, change as per need.
Producer producerThread = new Producer(queue, buffer, "PRODUCER");
Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");
producerThread.start();
consumerThread.start();
}
}
class Producer extends Thread {
private Queue<Integer> queue;
private int queueSize ;
public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super(ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
System.out.println(Thread.currentThread().getName() + " FULL : waiting...\n");
try{
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue empty then produce one, add and notify
int randomInt = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt);
queue.add(randomInt);
queue.notifyAll(); //Important
} //synchronized ends here : NOTE
}
}
}
class Consumer extends Thread {
private Queue<Integer> queue;
private int queueSize;
public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super (ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.isEmpty()){
System.out.println(Thread.currentThread().getName() + " Empty : waiting...\n");
try {
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue not empty then consume one and notify
System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
queue.notifyAll();
} //synchronized ends here : NOTE
}
}
}
- 넣기 및 가져 오기 메소드를 동기화 한 Java 코드 "BlockingQueue".
- Java 코드 "Producer", 데이터를 생성하는 생산자 스레드.
- Java 코드 "Consumer", 생성 된 데이터를 소비하는 소비자 스레드입니다.
- Java 코드 "ProducerConsumer_Main", 생산자와 소비자 스레드를 시작하는 주요 기능.
BlockingQueue.java
public class BlockingQueue
{
int item;
boolean available = false;
public synchronized void put(int value)
{
while (available == true)
{
try
{
wait();
} catch (InterruptedException e) {
}
}
item = value;
available = true;
notifyAll();
}
public synchronized int get()
{
while(available == false)
{
try
{
wait();
}
catch(InterruptedException e){
}
}
available = false;
notifyAll();
return item;
}
}
Consumer.java
package com.sukanya.producer_Consumer;
public class Consumer extends Thread
{
blockingQueue queue;
private int number;
Consumer(BlockingQueue queue,int number)
{
this.queue = queue;
this.number = number;
}
public void run()
{
int value = 0;
for (int i = 0; i < 10; i++)
{
value = queue.get();
System.out.println("Consumer #" + this.number+ " got: " + value);
}
}
}
ProducerConsumer_Main.java
package com.sukanya.producer_Consumer;
public class ProducerConsumer_Main
{
public static void main(String args[])
{
BlockingQueue queue = new BlockingQueue();
Producer producer1 = new Producer(queue,1);
Consumer consumer1 = new Consumer(queue,1);
producer1.start();
consumer1.start();
}
}
참조 URL : https://stackoverflow.com/questions/2332537/producer-consumer-threads-using-a-queue
'development' 카테고리의 다른 글
자바 게터 및 세터 (0) | 2020.12.29 |
---|---|
XML 속성에 줄 바꿈을 저장하는 방법은 무엇입니까? (0) | 2020.12.29 |
기본 클래스에서 공개적으로 상속하지만 파생 클래스에서 기본 클래스의 일부 공용 메서드를 개인으로 만드는 방법은 무엇입니까? (0) | 2020.12.29 |
오른쪽으로 떠있는 것과 왼쪽으로 떠있는 것 사이에 div를 중앙에 배치 (0) | 2020.12.29 |
디렉터리 및 하위 디렉터리의 모든 파일을 시간 역순으로 나열하려면 어떻게합니까? (0) | 2020.12.28 |