development

ExecutorService를 사용하여 모든 스레드가 완료되기를 기다리는 방법은 무엇입니까?

big-blog 2020. 2. 29. 15:26
반응형

ExecutorService를 사용하여 모든 스레드가 완료되기를 기다리는 방법은 무엇입니까?


한 번에 몇 가지 작업 4를 다음과 같이 실행해야합니다.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

모두 완료되면 어떻게 알림을받을 수 있습니까? 지금은 글로벌 태스크 카운터를 설정하는 것보다 더 나은 것을 생각할 수 없으며 모든 태스크가 끝날 때마다 카운터를 줄인 다음 무한 루프 에서이 카운터를 모니터링하여 0이되도록하십시오. 또는 선물 목록을 얻거나 무한 루프 모니터에서 isDone for all them. 무한 루프를 포함하지 않는 더 나은 솔루션은 무엇입니까?

감사.


기본적으로 ExecutorService당신은 전화 shutdown()를 한 다음 awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

CountDownLatch를 사용하십시오 .

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

그리고 당신의 작업 (시도 / 마지막으로 동봉)

latch.countDown();

ExecutorService.invokeAll() 당신을 위해 그것을합니다.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

선물리스트를 사용할 수도 있습니다.

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):

for(Future f: this.futures) { f.get(); }

기본적으로 트릭은 (모두 또는 각각)에 isDone ()을 무한 루프하는 대신 각 Future에서 .get ()을 한 번에 하나씩 호출하는 것입니다. 따라서 마지막 스레드가 완료 되 자마자이 블록을 통해 "이동"할 수 있습니다. 주의해야 할 점은 .get () 호출이 예외를 다시 발생시키기 때문에 스레드 중 하나가 죽으면 다른 스레드가 완료되기 전에이 문제를 발생시킬 수 있습니다.이를 피하기 위해 catch ExecutionExceptionget 호출 주위에 ]. 다른주의 사항은 모든 스레드에 대한 참조를 유지하므로 스레드 로컬 변수가있는 경우이 블록을 통과 한 후에도 수집되지 않습니다 (문제가되면이 문제를 해결할 수는 있지만 제거하여 해결할 수 있음) 미래는 ArrayList에서 벗어남). 어떤 미래가 "먼저 끝나는가"를 알고 싶다면https://stackoverflow.com/a/31885029/32453


Java8에서 당신은 그것을 할 수 CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

내 두 센트. CountDownLatch미리 작업 수를 알아야 한다는 요구 사항을 극복하기 위해 간단한을 사용하여 구식 방식으로 할 수 Semaphore있습니다.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

당신의 작업에서는 단지 전화 s.release()당신이하는 것처럼latch.countDown();


게임에 약간 늦었지만 완료를 위해 ...

모든 작업이 완료되기를 '대기'하는 대신 할리우드 원칙에 따라 "나에게 전화하지 마십시오. 전화하겠습니다"라고 생각할 수 있습니다. 결과 코드가 더 우아하다고 생각합니다 ...

구아바는이를 달성하기위한 몇 가지 흥미로운 도구를 제공합니다.

예 ::

ExecutorService를 ListeningExecutorService로 랩 ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

실행할 콜 러블 컬렉션 제출 ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

이제 필수 부분 :

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

모든 선물이 완료 될 때 알림을받을 수 있도록 ListenableFuture에 콜백을 연결합니다. ::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

또한 처리가 완료되면 모든 결과를 한 곳에서 수집 할 수 있다는 이점도 제공합니다.

더 자세한 정보는 여기


으로 CyclicBarrier 클래스 자바 5 이후는 이런 종류의 위해 설계되었습니다.


아래 방법 중 하나를 따르십시오.

  1. on 에서 반환 된 모든 Future 태스크를 반복하고 다음에 의해 제안 된 객체 에 대한 호출 차단하여 상태를 확인하십시오.submitExecutorServiceget()FutureKiran
  2. 사용 invokeAll()ExecutorService입니다
  3. CountDownLatch
  4. ForkJoinPool 또는 Executors.html # newWorkStealingPool
  5. shutdown, awaitTermination, shutdownNow적절한 순서로 ThreadPoolExecutor의 API 사용

관련 SE 질문 :

CountDownLatch는 Java 멀티 스레딩에서 어떻게 사용됩니까?

Java ExecutorService를 올바르게 종료하는 방법


작업을 다른 실행 가능 파일로 감싸서 알림을 보냅니다.

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

여기에 두 가지 옵션이 있는데, 어느 것이 가장 적합한 지 혼란 스럽습니다.

옵션 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

옵션 2 :

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

여기에 future.get (); 시도 잡기에 좋은 생각이 맞습니까?


방금 문제를 해결하는 샘플 프로그램을 작성했습니다. 간결한 구현이 없었으므로 추가하겠습니다. 당신이 사용할 수 있지만 executor.shutdown()하고 executor.awaitTermination(), 그것은 예측할 수없는 것입니다 다른 스레드에 의해 소요되는 시간으로 가장 좋은 방법이 아닙니다.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

래치 / 배리어를 사용하는 것과 다른 여기에 더 많은 대안을 제공하기 위해. CompletionService를 사용하여 모든 결과가 완료 될 때까지 부분 결과를 얻을 수도 있습니다 .

실제로 Java Concurrency에서 : "실행자에게 제출할 계산 배치가 있고 결과가 사용 가능 해지면 결과를 검색하려는 경우 각 태스크와 연관된 미래를 유지하고 get을 호출하여 완료를 위해 반복적으로 폴링 할 수 있습니다. timeout of zero. 이것은 가능하지만 지루하다 . 다행히도 더 좋은 방법이있다 : 완성 서비스. "

여기에 구현

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

이 코드를 사용할 수 있습니다 :

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

이것은 "AdamSkywalker"팁을 기반으로 한 내 솔루션이며 작동합니다.

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

다음 작업 예제를 만들었습니다. 아이디어는 많은 스레드 (numberOfTasks / threshold에 의해 프로그래밍 방식으로 결정됨)로 작업 풀 (예 : 대기열을 사용하고 있음)을 처리하고 다른 스레드가 다른 처리를 계속하기 위해 모든 스레드가 완료 될 때까지 기다리는 방법입니다.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

그것이 도움이되기를 바랍니다!


ExecutorCompletionService 의 자체 서브 클래스를 사용하여 래핑하고 taskExecutor, 자신의 BlockingQueue 구현 을 통해 각 태스크가 완료 될 때 알림 을 받고 완료된 태스크 수가 원하는 목표에 도달 할 때 원하는 콜백 또는 기타 조치를 수행 할 수 있습니다.


사용 executorService.shutdown()하고 executorService.awaitTermination방법 을 사용해야합니다 .

예를 들면 다음과 같습니다.

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

그래서 누군가가 이것을하는 더 간단한 방법을 원한다면 링크 된 질문에서 내 대답을 게시하십시오.

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

Java 8-스트림 API를 사용하여 스트림을 처리 할 수 ​​있습니다. 아래의 스 니펫을 참조하십시오

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();


ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

경우 doSomething()다른 예외를 던져의는 latch.countDown()어떻게해야합니까, 그래서 실행되지 않습니다 보인다?


더 많은 스레드 ExecutionServices를 순서대로 사용하고 각 EXECUTIONSERVICE가 완료되기를 기다리는 경우 가장 좋은 방법은 다음과 같습니다.

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

이것은 도움이 될 수 있습니다

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

러너 클래스 에서 waitTillDone ()호출 할 수 있습니다 .

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

이 클래스 재사용 하고 shutdown ()을 호출하기 전에 원하는만큼 waitTillDone ()을 호출 할 수 있으며 코드는 매우 간단 합니다. 또한 당신은 알 필요가 없습니다 작업의 수를 선행.

그것을 사용하려면이 gradle / maven compile 'com.github.matejtymes:javafixes:1.3.1'의존성을 프로젝트에 추가 하십시오.

자세한 내용은 여기를 참조하십시오.

https://github.com/MatejTymes/JavaFixes


실행 프로그램 getActiveCount()에는 활성 스레드 수를 제공 하는 메소드가 있습니다 .

스레드를 확장 한 후 activeCount()값이 인지 확인할 수 있습니다 0. 값이 0이면 현재 실행중인 활성 스레드가 없음을 의미하므로 작업이 완료되었습니다.

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}

참고 URL : https://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice



반응형