File tree Expand file tree Collapse file tree 2 files changed +40
-2
lines changed Expand file tree Collapse file tree 2 files changed +40
-2
lines changed Original file line number Diff line number Diff line change @@ -1148,14 +1148,14 @@ class Session implements ISession {
11481148 private static <T> void notifyEvent (List<T> observers , Consumer<T> action ) {
11491149 for ( int i= 0 ; i< observers. size(); i++ ) {
11501150 final observer = observers. get(i)
1151- CompletableFuture . runAsync({
1151+ // CompletableFuture.runAsync({
11521152 try {
11531153 action. accept(observer)
11541154 }
11551155 catch ( Throwable e ) {
11561156 log. debug(e. getMessage(), e)
11571157 }
1158- })
1158+ // })
11591159 }
11601160 }
11611161
Original file line number Diff line number Diff line change @@ -462,4 +462,42 @@ class SessionTest extends Specification {
462462 true | true
463463
464464 }
465+
466+ def ' test notifyEvent is asynchronous' () {
467+ given :
468+ def events = []
469+ def latch = new java.util.concurrent.CountDownLatch (2 )
470+ def observer1 = new nextflow.trace.TraceObserverV2 () {
471+ @Override
472+ void onFlowBegin () {
473+ Thread . sleep(50 ) // Simulate slow observer
474+ events << ' observer1'
475+ latch. countDown()
476+ }
477+ }
478+ def observer2 = new nextflow.trace.TraceObserverV2 () {
479+ @Override
480+ void onFlowBegin () {
481+ events << ' observer2'
482+ latch. countDown()
483+ }
484+ }
485+
486+ def session = new Session ()
487+ session. @observersV2 = [observer1, observer2]
488+
489+ when :
490+ def startTime = System . currentTimeMillis()
491+ session. notifyFlowBegin()
492+ def notifyTime = System . currentTimeMillis() - startTime
493+ latch. await(1 , java.util.concurrent.TimeUnit . SECONDS )
494+ def completeTime = System . currentTimeMillis() - startTime
495+
496+ then :
497+ notifyTime < 50 // Should return quickly without waiting for observers
498+ completeTime >= 50 // But observers should complete
499+ events. size() == 2
500+ events. contains(' observer1' )
501+ events. contains(' observer2' )
502+ }
465503}
You can’t perform that action at this time.
0 commit comments