Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
Expand Down Expand Up @@ -1117,8 +1118,11 @@ class Session implements ISession {
}

void notifyFlowComplete() {
notifyEvent(observersV1, ob -> ob.onFlowComplete())
notifyEvent(observersV2, ob -> ob.onFlowComplete())
final futures = []
futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowComplete()))
futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowComplete()))
// Wait for all async notifications to complete before proceeding with shutdown
CompletableFuture.allOf(futures as CompletableFuture[]).join()
}

/**
Expand All @@ -1130,8 +1134,11 @@ class Session implements ISession {
void notifyError( TaskHandler handler ) {

final trace = handler?.safeTraceRecord()
notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace))
notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace)))
final futures = []
futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace)))
futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace))))
// Wait for all async notifications to complete before proceeding
CompletableFuture.allOf(futures as CompletableFuture[]).join()

if( !errorAction )
return
Expand All @@ -1144,16 +1151,24 @@ class Session implements ISession {
}
}

private static <T> void notifyEvent(List<T> observers, Consumer<T> action) {
private static <T> List<CompletableFuture<Void>> notifyEvent(List<T> observers, Consumer<T> action) {
final futures = new ArrayList<CompletableFuture<Void>>(observers.size())
for ( int i=0; i<observers.size(); i++) {
final observer = observers.get(i)
try {
action.accept(observer)
}
catch ( Throwable e ) {
log.debug(e.getMessage(), e)
}
final future = CompletableFuture.runAsync(new Runnable() {
@Override
void run() {
try {
action.accept(observer)
}
catch ( Throwable e ) {
log.debug(e.getMessage(), e)
}
}
})
futures.add(future)
}
return futures
}

/**
Expand Down
38 changes: 38 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -462,4 +462,42 @@ class SessionTest extends Specification {
true | true

}

def 'test notifyEvent is asynchronous'() {
given:
def events = []
def latch = new java.util.concurrent.CountDownLatch(2)
def observer1 = new nextflow.trace.TraceObserverV2() {
@Override
void onFlowBegin() {
Thread.sleep(50) // Simulate slow observer
events << 'observer1'
latch.countDown()
}
}
def observer2 = new nextflow.trace.TraceObserverV2() {
@Override
void onFlowBegin() {
events << 'observer2'
latch.countDown()
}
}

def session = new Session()
session.@observersV2 = [observer1, observer2]

when:
def startTime = System.currentTimeMillis()
session.notifyFlowBegin()
def notifyTime = System.currentTimeMillis() - startTime
latch.await(1, java.util.concurrent.TimeUnit.SECONDS)
def completeTime = System.currentTimeMillis() - startTime

then:
notifyTime < 50 // Should return quickly without waiting for observers
completeTime >= 50 // But observers should complete
events.size() == 2
events.contains('observer1')
events.contains('observer2')
}
}
Loading