¿Cómo esperar a que todos los subprocesos finalicen, usando ExecutorService?


Necesito ejecutar alguna cantidad de tareas 4 a la vez, algo como esto:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

¿Cómo puedo ser notificado una vez que todos ellos están completos? Por ahora no puedo pensar en nada mejor que establecer un contador de tareas global y disminuirlo al final de cada tarea, luego monitorear en bucle infinito este contador para convertirse en 0; o obtener una lista de Futuros y en bucle infinito monitor isDone para todos ellos. ¿Cuáles son las mejores soluciones que no implican bucles infinitos?

Gracias.

Author: bluish, 2009-08-09

22 answers

, Básicamente, en un ExecutorService llame shutdown() y, a continuación,awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
 371
Author: cletus,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-08 06:43:58

Use un CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

Y dentro de su tarea (encierre en try / finally)

latch.countDown();
 154
Author: ChssPly76,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-08 06:45:00

ExecutorService.invokeAll() lo hace por usted.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
 77
Author: sjlee,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-08 06:45:54

También puedes usar Listas de Futuros:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

Luego, cuando se desea unir a todos ellos, es esencialmente el equivalente de unirse a cada uno, (con el beneficio adicional de que vuelve a elevar las excepciones de los subprocesos secundarios al principal):

for(Future f: this.futures) { f.get(); }

Básicamente el truco es llamar .get() en cada Futuro uno a la vez, en lugar de un bucle infinito llamando a isDone () en (todos o cada uno). Por lo tanto, está garantizado que "avanzará" a través y más allá de este bloque tan pronto como termine el último hilo. El advertencia es que desde el .get () call re-raises excepciones, si uno de los hilos muere, usted podría subir de esto posiblemente antes de que los otros hilos hayan terminado de completarse [para evitar esto, usted podría agregar un catch ExecutionException alrededor de la llamada get]. La otra advertencia es que mantiene una referencia a todos los hilos, por lo que si tienen variables locales de hilo no se recopilarán hasta después de pasar este bloque (aunque podría ser capaz de sortear esto, si se convirtió en un problema, eliminando el futuro de la ArrayList). Si quieres saber qué Futuro "termina primero" podrías usar algo como https://stackoverflow.com/a/31885029/32453

 41
Author: rogerdpack,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 11:55:03

Solo mis dos centavos. Para superar el requisito de CountDownLatch de conocer el número de tareas de antemano, podría hacerlo a la antigua usanza utilizando un simple Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

En tu tarea simplemente llama s.release() como lo harías latch.countDown();

 23
Author: stryba,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-01-19 10:48:28

En Java8 puedes hacerlo con CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
 19
Author: AdamSkywalker,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-04-29 08:41:47

La clase CyclicBarrier en Java 5 y posteriores está diseñada para este tipo de cosas.

 12
Author: Pekka Enberg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-08 06:46:38

Un poco tarde para el juego, pero por el bien de la finalización...

En lugar de 'esperar' a que todas las tareas terminen, puedes pensar en términos del principio de Hollywood, "no me llames, te llamaré" - cuando haya terminado. Creo que el código resultante es más elegante...

La guayaba ofrece algunas herramientas interesantes para lograr esto.

Un ejemplo ::

Envuelve un ExecutorService en un ListeningExecutorService::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Enviar una colección de llamadas para su ejecución ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Ahora la parte esencial:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Adjunte una devolución de llamada a ListenableFuture, que puede usar para ser notificado cuando todos los futuros completen::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Esto también ofrece la ventaja de que puede recopilar todos los resultados en un solo lugar una vez que finalice el procesamiento...

Más información aquí

 10
Author: Răzvan Petruescu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-11-07 12:19:15

Siga uno de los siguientes enfoques.

  1. Itere a través de todas las tareas Futuras , devueltas desde submit en ExecutorService y verifique el estado con el bloqueo de la llamada get() en el objeto Future como sugiere Kiran
  2. Use invokeAll() on ExecutorService
  3. CountDownLatch
  4. ForkJoinPool o Ejecutores.html#newWorkStealingPool
  5. Use shutdown, awaitTermination, shutdownNow API de ThreadPoolExecutor en la secuencia adecuada

Relacionados preguntas:

¿Cómo se usa CountDownLatch en Java Multithreading?

Cómo apagar correctamente java ExecutorService

 6
Author: Ravindra babu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-18 07:35:55

Puede empaquetar sus tareas en otro ejecutable, que enviará notificaciones:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
 5
Author: Zed,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2009-08-09 04:46:45

Acabo de escribir un programa de muestra que resuelve su problema. No se dio una implementación concisa, así que agregaré una. Si bien puede usar executor.shutdown() y executor.awaitTermination(), no es la mejor práctica, ya que el tiempo que toman los diferentes hilos sería impredecible.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
 3
Author: Kiran,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-12-30 19:15:36

Solo para proporcionar más alternativas aquí diferentes para usar pestillo/barreras. También puede obtener los resultados parciales hasta que todos ellos terminen usando CompletionService.

De la Concurrencia de Java en la práctica: "Si tiene un lote de cálculos para enviar a un ejecutor y desea recuperar sus resultados a medida que se convierten disponible, puede retener el Futuro asociado con cada tarea y sondear repetidamente para completarlo llamando a get con un tiempo de espera de cero. Esto es posible, pero tedioso. Afortunadamente hay una mejor manera: un servicio de finalización."

Aquí la implementación

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
 3
Author: Alberto Gurrion,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-03-04 17:51:40

Puede usar su propia subclase de ExecutorCompletionService para envolver taskExecutor, y su propia implementación de BlockingQueue para informarse cuando se complete cada tarea y realizar cualquier devolución de llamada u otra acción que desee cuando el número de tareas completadas alcance su objetivo deseado.

 1
Author: Alex Martelli,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-09-19 17:53:07

Debe usar el método executorService.shutdown() y executorService.awaitTermination.

Un ejemplo como sigue:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
 1
Author: Rollen Holt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-06-21 05:41:18

Java 8 - Podemos usar stream API para procesar stream. Por favor, vea el fragmento de código a continuación

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
 1
Author: Vlad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-07-19 23:39:29

Puedes usar este código:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
 1
Author: Tuan Pham,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 06:15:40

Esta es mi solución, basada en la punta "AdamSkywalker", y funciona

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
 1
Author: frss-soft.com,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-23 15:26:45

Así que publico mi respuesta de la pregunta enlazada aquí, en caso de que alguien quiera una forma más simple de hacer esto

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
 0
Author: Mạnh Quyết Nguyễn,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-07 18:49:09

Aquí hay dos opciones , solo un poco confunda cuál es la mejor para ir.

Opción 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Opción 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Aquí poniendo futuro.get (); en try catch es buena idea ¿verdad?

 0
Author: user2862544,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-09-02 16:14:15

Esto podría ayudar

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
 -1
Author: Amol Desai,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-22 06:03:57

Puedes llamar a waitTillDone () en esta clase Runner :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Puedes reutilizar esta clase y llamar a waitTillDone() tantas veces como quieras antes de llamar a shutdown(), además tu código es extremadamente simple. También usted no tiene que saber el número de tareas por adelantado.

Para usarlo simplemente agrega esta dependencia de gradle/maven compile 'com.github.matejtymes:javafixes:1.1.1' a tu proyecto.

Se pueden encontrar Más detalles aquí:

Https://github.com/MatejTymes/JavaFixes

Http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

 -1
Author: Matej Tymes,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-11-30 09:56:33

Hay un método en executor getActiveCount() - que da la cuenta de hilos activos.

Después de abarcar el hilo, podemos comprobar si el valor activeCount() es 0. Una vez que el valor es cero, significa que no hay subprocesos activos actualmente en ejecución, lo que significa que la tarea ha terminado:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
 -2
Author: user,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-10-31 09:57:21