Skip to content

Commit d0a8f6a

Browse files
authored
[JENKINS-68092] Serialization of java.util.concurrent data structure in Pipeline: Groovy (#518)
1 parent a473dcd commit d0a8f6a

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

src/main/java/org/jenkinsci/plugins/workflow/cps/CpsThreadGroup.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,19 @@ public final class CpsThreadGroup implements Serializable {
101101
*/
102102
private /*almost final*/ transient CpsFlowExecution execution;
103103

104+
/**
105+
* Persistent version of {@link #runtimeThreads}.
106+
*/
107+
private volatile Map<Integer, CpsThread> threads;
108+
104109
/**
105110
* All the member threads by their {@link CpsThread#id}.
106111
*
107112
* All mutation occurs only on the CPS VM thread. Read access through {@link CpsStepContext#doGet}
108113
* and iteration through {@link CpsThreadDump#from(CpsThreadGroup)} may occur on other threads
109114
* (e.g. non-blocking steps, thread dumps from the UI).
110115
*/
111-
private final NavigableMap<Integer,CpsThread> threads = new ConcurrentSkipListMap<>();
116+
private transient NavigableMap<Integer, CpsThread> runtimeThreads;
112117

113118
/**
114119
* Unique thread ID generator.
@@ -178,6 +183,7 @@ private Object readResolve() {
178183
execution = CpsFlowExecution.PROGRAM_STATE_SERIALIZATION.get();
179184
setupTransients();
180185
assert execution!=null;
186+
runtimeThreads.putAll(threads);
181187
if (/* compatibility: the field will be null in old programs */ scripts != null && !scripts.isEmpty()) {
182188
GroovyShell shell = execution.getShell();
183189
// Take the canonical bindings from the main script and relink that object with that of the shell and all other loaded scripts which kept the same bindings.
@@ -193,15 +199,21 @@ private Object readResolve() {
193199
}
194200

195201
private void setupTransients() {
202+
runtimeThreads = new ConcurrentSkipListMap<>();
196203
runner = new CpsVmExecutorService(this);
197204
pausedByQuietMode = new AtomicBoolean();
198205
}
199206

207+
private Object writeReplace() {
208+
threads = new HashMap<>(runtimeThreads);
209+
return this;
210+
}
211+
200212
@CpsVmThreadOnly
201213
public CpsThread addThread(@NonNull Continuable program, FlowHead head, ContextVariableSet contextVariables) {
202214
assertVmThread();
203215
CpsThread t = new CpsThread(this, iota++, program, head, contextVariables);
204-
threads.put(t.id, t);
216+
runtimeThreads.put(t.id, t);
205217
return t;
206218
}
207219

@@ -223,9 +235,9 @@ private void assertVmThread() {
223235
* null if the thread has finished executing.
224236
*/
225237
public CpsThread getThread(int id) {
226-
CpsThread thread = threads.get(id);
238+
CpsThread thread = runtimeThreads.get(id);
227239
if (thread == null && LOGGER.isLoggable(Level.FINE)) {
228-
LOGGER.log(Level.FINE, "no thread " + id + " among " + threads.keySet(), new IllegalStateException());
240+
LOGGER.log(Level.FINE, "no thread " + id + " among " + runtimeThreads.keySet(), new IllegalStateException());
229241
}
230242
return thread;
231243
}
@@ -234,7 +246,7 @@ public CpsThread getThread(int id) {
234246
* Returns an unmodifiable snapshot of all threads in the thread group.
235247
*/
236248
public Iterable<CpsThread> getThreads() {
237-
return threads.values();
249+
return runtimeThreads.values();
238250
}
239251

240252
@CpsVmThreadOnly("root")
@@ -327,7 +339,7 @@ public void run() {
327339
// ensures that everything submitted in front of us has finished.
328340
runner.submit(new Runnable() {
329341
public void run() {
330-
if (threads.isEmpty()) {
342+
if (runtimeThreads.isEmpty()) {
331343
runner.shutdown();
332344
}
333345
// the original promise of scheduleRun() is now complete
@@ -403,7 +415,7 @@ private boolean run() {
403415
boolean stillRunnable = false;
404416

405417
// TODO: maybe instead of running all the thread, run just one thread in round robin
406-
for (CpsThread t : threads.values().toArray(new CpsThread[threads.size()])) {
418+
for (CpsThread t : runtimeThreads.values().toArray(new CpsThread[runtimeThreads.size()])) {
407419
if (t.isRunnable()) {
408420
Outcome o = t.runNextChunk();
409421
if (o.isFailure()) {
@@ -426,9 +438,9 @@ private boolean run() {
426438
LOGGER.fine("completed " + t);
427439
t.fireCompletionHandlers(o); // do this after ErrorAction is set above
428440

429-
threads.remove(t.id);
441+
runtimeThreads.remove(t.id);
430442
t.cleanUp();
431-
if (threads.isEmpty()) {
443+
if (runtimeThreads.isEmpty()) {
432444
execution.onProgramEnd(o);
433445
try {
434446
this.execution.saveOwner();
@@ -620,7 +632,7 @@ private void propagateErrorToWorkflow(Throwable t) {
620632
// as that's the ony more likely to have caused the problem.
621633
// TODO: when we start tracking which thread is just waiting for the body, then
622634
// that information would help. or maybe we should just remember the thread that has run the last time
623-
Map.Entry<Integer,CpsThread> lastEntry = threads.lastEntry();
635+
Map.Entry<Integer,CpsThread> lastEntry = runtimeThreads.lastEntry();
624636
if (lastEntry != null) {
625637
lastEntry.getValue().resume(new Outcome(null,t));
626638
} else {

0 commit comments

Comments
 (0)