ExecutorService를 사용하여 모든 스레드가 완료되기를 기다리는 방법은 무엇입니까?
한 번에 몇 가지 작업 4를 다음과 같이 실행해야합니다.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
모두 완료되면 어떻게 알림을받을 수 있습니까? 지금은 글로벌 태스크 카운터를 설정하는 것보다 더 나은 것을 생각할 수 없으며 모든 태스크가 끝날 때마다 카운터를 줄인 다음 무한 루프 에서이 카운터를 모니터링하여 0이되도록하십시오. 또는 선물 목록을 얻거나 무한 루프 모니터에서 isDone for all them. 무한 루프를 포함하지 않는 더 나은 솔루션은 무엇입니까?
감사.
기본적으로 ExecutorService
당신은 전화 shutdown()
를 한 다음 awaitTermination()
:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
CountDownLatch를 사용하십시오 .
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
그리고 당신의 작업 (시도 / 마지막으로 동봉)
latch.countDown();
ExecutorService.invokeAll()
당신을 위해 그것을합니다.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
선물리스트를 사용할 수도 있습니다.
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):
for(Future f: this.futures) { f.get(); }
기본적으로 트릭은 (모두 또는 각각)에 isDone ()을 무한 루프하는 대신 각 Future에서 .get ()을 한 번에 하나씩 호출하는 것입니다. 따라서 마지막 스레드가 완료 되 자마자이 블록을 통해 "이동"할 수 있습니다. 주의해야 할 점은 .get () 호출이 예외를 다시 발생시키기 때문에 스레드 중 하나가 죽으면 다른 스레드가 완료되기 전에이 문제를 발생시킬 수 있습니다.이를 피하기 위해 catch ExecutionException
get 호출 주위에 ]. 다른주의 사항은 모든 스레드에 대한 참조를 유지하므로 스레드 로컬 변수가있는 경우이 블록을 통과 한 후에도 수집되지 않습니다 (문제가되면이 문제를 해결할 수는 있지만 제거하여 해결할 수 있음) 미래는 ArrayList에서 벗어남). 어떤 미래가 "먼저 끝나는가"를 알고 싶다면https://stackoverflow.com/a/31885029/32453
Java8에서 당신은 그것을 할 수 CompletableFuture :
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
내 두 센트. CountDownLatch
미리 작업 수를 알아야 한다는 요구 사항을 극복하기 위해 간단한을 사용하여 구식 방식으로 할 수 Semaphore
있습니다.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
당신의 작업에서는 단지 전화 s.release()
당신이하는 것처럼latch.countDown();
게임에 약간 늦었지만 완료를 위해 ...
모든 작업이 완료되기를 '대기'하는 대신 할리우드 원칙에 따라 "나에게 전화하지 마십시오. 전화하겠습니다"라고 생각할 수 있습니다. 결과 코드가 더 우아하다고 생각합니다 ...
구아바는이를 달성하기위한 몇 가지 흥미로운 도구를 제공합니다.
예 ::
ExecutorService를 ListeningExecutorService로 랩 ::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
실행할 콜 러블 컬렉션 제출 ::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
이제 필수 부분 :
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
모든 선물이 완료 될 때 알림을받을 수 있도록 ListenableFuture에 콜백을 연결합니다. ::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
또한 처리가 완료되면 모든 결과를 한 곳에서 수집 할 수 있다는 이점도 제공합니다.
더 자세한 정보는 여기
으로 CyclicBarrier 클래스 자바 5 이후는 이런 종류의 위해 설계되었습니다.
아래 방법 중 하나를 따르십시오.
- on 에서 반환 된 모든 Future 태스크를 반복하고 다음에 의해 제안 된 객체 에 대한 호출 을 차단하여 상태를 확인하십시오.
submit
ExecutorService
get()
Future
Kiran
- 사용
invokeAll()
에 ExecutorService입니다 - CountDownLatch
- ForkJoinPool 또는 Executors.html # newWorkStealingPool
shutdown, awaitTermination, shutdownNow
적절한 순서로 ThreadPoolExecutor의 API 사용
관련 SE 질문 :
CountDownLatch는 Java 멀티 스레딩에서 어떻게 사용됩니까?
Java ExecutorService를 올바르게 종료하는 방법
작업을 다른 실행 가능 파일로 감싸서 알림을 보냅니다.
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
여기에 두 가지 옵션이 있는데, 어느 것이 가장 적합한 지 혼란 스럽습니다.
옵션 1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
옵션 2 :
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
여기에 future.get (); 시도 잡기에 좋은 생각이 맞습니까?
방금 문제를 해결하는 샘플 프로그램을 작성했습니다. 간결한 구현이 없었으므로 추가하겠습니다. 당신이 사용할 수 있지만 executor.shutdown()
하고 executor.awaitTermination()
, 그것은 예측할 수없는 것입니다 다른 스레드에 의해 소요되는 시간으로 가장 좋은 방법이 아닙니다.
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
래치 / 배리어를 사용하는 것과 다른 여기에 더 많은 대안을 제공하기 위해. CompletionService를 사용하여 모든 결과가 완료 될 때까지 부분 결과를 얻을 수도 있습니다 .
실제로 Java Concurrency에서 : "실행자에게 제출할 계산 배치가 있고 결과가 사용 가능 해지면 결과를 검색하려는 경우 각 태스크와 연관된 미래를 유지하고 get을 호출하여 완료를 위해 반복적으로 폴링 할 수 있습니다. timeout of zero. 이것은 가능하지만 지루하다 . 다행히도 더 좋은 방법이있다 : 완성 서비스. "
여기에 구현
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
이 코드를 사용할 수 있습니다 :
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
이것은 "AdamSkywalker"팁을 기반으로 한 내 솔루션이며 작동합니다.
package frss.main;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestHilos {
void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
System.out.println("FIN DEL PROCESO DE HILOS");
}
private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();
Hilo01 task1 = new Hilo01();
tasks.add(task1);
Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}
private class Hilo01 extends Thread {
@Override
public void run() {
System.out.println("HILO 1");
}
}
private class Hilo02 extends Thread {
@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}
}
public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}
다음 작업 예제를 만들었습니다. 아이디어는 많은 스레드 (numberOfTasks / threshold에 의해 프로그래밍 방식으로 결정됨)로 작업 풀 (예 : 대기열을 사용하고 있음)을 처리하고 다른 스레드가 다른 처리를 계속하기 위해 모든 스레드가 완료 될 때까지 기다리는 방법입니다.
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** Testing CountDownLatch and ExecutorService to manage scenario where
* multiple Threads work together to complete tasks from a single
* resource provider, so the processing can be faster. */
public class ThreadCountDown {
private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();
public static void main(String[] args) {
// Create a queue with "Tasks"
int numberOfTasks = 2000;
while(numberOfTasks-- > 0) {
tasks.add(numberOfTasks);
}
// Initiate Processing of Tasks
ThreadCountDown main = new ThreadCountDown();
main.process(tasks);
}
/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
threadsCountdown = new CountDownLatch(numberOfThreads);
ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);
//Initialize each Thread
while(numberOfThreads-- > 0) {
System.out.println("Initializing Thread: "+numberOfThreads);
threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
}
try {
//Shutdown the Executor, so it cannot receive more Threads.
threadExecutor.shutdown();
threadsCountdown.await();
System.out.println("ALL THREADS COMPLETED!");
//continue With Some Other Process Here
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
int threshold = 100;
int threads = size / threshold;
if( size > (threads*threshold) ){
threads++;
}
return threads;
}
/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
return tasks.poll();
}
/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {
private String threadName;
protected MyThread(String threadName) {
super();
this.threadName = threadName;
}
@Override
public void run() {
Integer task;
try{
//Check in the Task pool if anything pending to process
while( (task = getTask()) != null ){
processTask(task);
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
/*Reduce count when no more tasks to process. Eventually all
Threads will end-up here, reducing the count to 0, allowing
the flow to continue after threadsCountdown.await(); */
threadsCountdown.countDown();
}
}
private void processTask(Integer task){
try{
System.out.println(this.threadName+" is Working on Task: "+ task);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}
그것이 도움이되기를 바랍니다!
ExecutorCompletionService 의 자체 서브 클래스를 사용하여 래핑하고 taskExecutor
, 자신의 BlockingQueue 구현 을 통해 각 태스크가 완료 될 때 알림 을 받고 완료된 태스크 수가 원하는 목표에 도달 할 때 원하는 콜백 또는 기타 조치를 수행 할 수 있습니다.
사용 executorService.shutdown()
하고 executorService.awaitTermination
방법 을 사용해야합니다 .
예를 들면 다음과 같습니다.
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
그래서 누군가가 이것을하는 더 간단한 방법을 원한다면 링크 된 질문에서 내 대답을 게시하십시오.
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] = CompletableFuture.runAsync(runner, executor);
}
CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
Java 8-스트림 API를 사용하여 스트림을 처리 할 수 있습니다. 아래의 스 니펫을 참조하십시오
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// doSomething();
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
경우 doSomething()
다른 예외를 던져의는 latch.countDown()
어떻게해야합니까, 그래서 실행되지 않습니다 보인다?
더 많은 스레드 ExecutionServices를 순서대로 사용하고 각 EXECUTIONSERVICE가 완료되기를 기다리는 경우 가장 좋은 방법은 다음과 같습니다.
ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
executer1.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer1.shutdown();
try{
executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
for (true) {
executer2.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer2.shutdown();
} catch (Exception e){
...
}
이것은 도움이 될 수 있습니다
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
이 러너 클래스 에서 waitTillDone () 을 호출 할 수 있습니다 .
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
이 클래스 를 재사용 하고 shutdown ()을 호출하기 전에 원하는만큼 waitTillDone ()을 호출 할 수 있으며 코드는 매우 간단 합니다. 또한 당신은 알 필요가 없습니다 작업의 수를 선행.
그것을 사용하려면이 gradle / maven compile 'com.github.matejtymes:javafixes:1.3.1'
의존성을 프로젝트에 추가 하십시오.
자세한 내용은 여기를 참조하십시오.
https://github.com/MatejTymes/JavaFixes
실행 프로그램 getActiveCount()
에는 활성 스레드 수를 제공 하는 메소드가 있습니다 .
스레드를 확장 한 후 activeCount()
값이 인지 확인할 수 있습니다 0
. 값이 0이면 현재 실행중인 활성 스레드가 없음을 의미하므로 작업이 완료되었습니다.
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}
'development' 카테고리의 다른 글
페이지로드시 AngularJS 컨트롤러 기능을 실행하는 방법은 무엇입니까? (0) | 2020.02.29 |
---|---|
"git rm -r"을 되 돌리는 방법? (0) | 2020.02.29 |
도커 이미지를 개인 저장소로 푸시하는 방법 (0) | 2020.02.29 |
PostgreSQL : "postgres"사용자의 비밀번호 인증에 실패했습니다 (0) | 2020.02.29 |
JavaScript : location.href를 새 창 / 탭에서 열 수 있습니까? (0) | 2020.02.29 |