diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 2a07576f09..c68dc58bd4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -19,6 +19,7 @@ package nextflow import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -1117,8 +1118,11 @@ class Session implements ISession { } void notifyFlowComplete() { - notifyEvent(observersV1, ob -> ob.onFlowComplete()) - notifyEvent(observersV2, ob -> ob.onFlowComplete()) + final futures = [] + futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowComplete())) + futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowComplete())) + // Wait for all async notifications to complete before proceeding with shutdown + CompletableFuture.allOf(futures as CompletableFuture[]).join() } /** @@ -1130,8 +1134,11 @@ class Session implements ISession { void notifyError( TaskHandler handler ) { final trace = handler?.safeTraceRecord() - notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace)) - notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace))) + final futures = [] + futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace))) + futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace)))) + // Wait for all async notifications to complete before proceeding + CompletableFuture.allOf(futures as CompletableFuture[]).join() if( !errorAction ) return @@ -1144,16 +1151,24 @@ class Session implements ISession { } } - private static void notifyEvent(List observers, Consumer action) { + private static List> notifyEvent(List observers, Consumer action) { + final futures = new ArrayList>(observers.size()) for ( int i=0; i= 50 // But observers should complete + events.size() == 2 + events.contains('observer1') + events.contains('observer2') + } }