Skip to content

Commit 7ac4f3e

Browse files
pditommasoclaude
andauthored
Fix ResourcesAggregator deadlock with virtual thread executor (#6840)
Use a dedicated fixed thread pool in computeSummaryMap() instead of the shared session executor to prevent starvation deadlock when virtual thread carrier threads are saturated. Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 82e76b1 commit 7ac4f3e

File tree

4 files changed

+25
-51
lines changed

4 files changed

+25
-51
lines changed

modules/nextflow/src/main/groovy/nextflow/trace/ReportObserver.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class ReportObserver implements TraceObserverV2 {
113113
@Override
114114
void onFlowCreate(Session session) {
115115
this.session = session
116-
this.aggregator = new ResourcesAggregator(session)
116+
this.aggregator = new ResourcesAggregator()
117117
// check if the process exists
118118
if( Files.exists(reportFile) && !overwrite )
119119
throw new AbortOperationException("Report file already exists: ${reportFile.toUriString()} -- enable the 'report.overwrite' option in your config file to overwrite existing files")

modules/nextflow/src/main/groovy/nextflow/trace/ResourcesAggregator.groovy

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package nextflow.trace
22

33
import java.util.concurrent.Callable
4-
import java.util.concurrent.ExecutorService
4+
import java.util.concurrent.Executors
55
import java.util.concurrent.Future
66

77
import groovy.json.JsonOutput
88
import groovy.transform.CompileStatic
99
import groovy.util.logging.Slf4j
10-
import nextflow.Session
1110

1211
/**
1312
* Collect and aggregate execution metrics used by execution report
14-
*
13+
*
1514
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1615
*/
1716
@Slf4j
@@ -23,15 +22,6 @@ class ResourcesAggregator {
2322
*/
2423
final private Map<String,ReportSummary> summaries = new LinkedHashMap<>()
2524

26-
/**
27-
* Executor service used to compute report summary stats
28-
*/
29-
private ExecutorService executor
30-
31-
ResourcesAggregator(Session session) {
32-
this.executor = session.getExecService()
33-
}
34-
3525
/**
3626
* Aggregates task record for each process in order to render the
3727
* final execution stats
@@ -71,16 +61,24 @@ class ResourcesAggregator {
7161
result.put(process, new HashMap(10))
7262
}
7363

74-
// submit the parallel execution
75-
final allResults = executor.invokeAll(tasks)
76-
77-
// compose the final result
78-
for( Future<List> future : allResults ) {
79-
final triple = future.get()
80-
final name = triple[0] // the process name
81-
final series = triple[1] // the series name eg. `cpu`, `time`, etc
82-
final summary = triple[2] // the computed summary
83-
result.get(name).put(series, summary)
64+
// use a dedicated thread pool to avoid starvation deadlock
65+
// with the shared virtual thread executor (see #6833)
66+
final executor = Executors.newFixedThreadPool(Runtime.runtime.availableProcessors())
67+
try {
68+
// submit the parallel execution
69+
final allResults = executor.invokeAll(tasks)
70+
71+
// compose the final result
72+
for( Future<List> future : allResults ) {
73+
final triple = future.get()
74+
final name = triple[0] // the process name
75+
final series = triple[1] // the series name eg. `cpu`, `time`, etc
76+
final summary = triple[2] // the computed summary
77+
result.get(name).put(series, summary)
78+
}
79+
}
80+
finally {
81+
executor.shutdown()
8482
}
8583

8684
return result

modules/nextflow/src/test/groovy/nextflow/trace/ResourcesAggregatorTest.groovy

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ package nextflow.trace
22

33
import spock.lang.Specification
44

5-
import java.util.concurrent.Executors
6-
75
import groovy.json.JsonSlurper
8-
import nextflow.Session
96

107
/**
118
*
@@ -21,12 +18,7 @@ class ResourcesAggregatorTest extends Specification {
2118

2219
def 'should render summary json' () {
2320
given:
24-
def executor = Executors.newCachedThreadPool()
25-
def session = Mock(Session) {
26-
getExecService() >> executor
27-
}
28-
29-
def observer = new ResourcesAggregator(session)
21+
def observer = new ResourcesAggregator()
3022
observer.aggregate(r1)
3123
observer.aggregate(r2)
3224
observer.aggregate(r3)
@@ -119,21 +111,13 @@ class ResourcesAggregatorTest extends Specification {
119111
result[1].timeUsage.q2 == 102.50
120112
result[1].timeUsage.q3 == 108.75
121113
result[1].timeUsage.mean == 102.50
122-
123-
cleanup:
124-
observer?.executor?.shutdown()
125114
}
126115

127116

128117
def 'should compute summary list' () {
129118

130119
given:
131-
def executor = Executors.newCachedThreadPool()
132-
def session = Mock(Session) {
133-
getExecService() >> executor
134-
}
135-
136-
def observer = new ResourcesAggregator(session)
120+
def observer = new ResourcesAggregator()
137121
observer.aggregate(r1)
138122
observer.aggregate(r2)
139123
observer.aggregate(r3)
@@ -156,20 +140,12 @@ class ResourcesAggregatorTest extends Specification {
156140
result[1].mem."max" == 22000.0
157141
result[1].time.min == 18000
158142
result[1].time.max == 23000
159-
160-
cleanup:
161-
observer?.executor?.shutdown()
162143
}
163144

164145

165146
def 'should maintain insertion order' () {
166147
given:
167-
def executor = Executors.newCachedThreadPool()
168-
def session = Mock(Session) {
169-
getExecService() >> executor
170-
}
171-
172-
def observer = new ResourcesAggregator(session)
148+
def observer = new ResourcesAggregator()
173149
observer.aggregate(new TraceRecord([process: 'gamma', name: 'gamma-1']))
174150
observer.aggregate(new TraceRecord([process: 'delta', name: 'delta-1']))
175151
observer.aggregate(new TraceRecord([process: 'delta', name: 'delta-2']))

plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class TowerClient implements TraceObserverV2 {
271271
log.debug "Creating Seqera Platform observer -- endpoint=$endpoint; requestInterval=$requestInterval; aliveInterval=$aliveInterval; maxRetries=$maxRetries; backOffBase=$backOffBase; backOffDelay=$backOffDelay"
272272

273273
this.session = session
274-
this.aggregator = new ResourcesAggregator(session)
274+
this.aggregator = new ResourcesAggregator()
275275
this.runName = session.getRunName()
276276
this.runId = session.getUniqueId()
277277
this.httpClient = newHttpClient()

0 commit comments

Comments
 (0)