|
24 | 24 |
|
25 | 25 | package org.jenkinsci.plugins.workflow.flow; |
26 | 26 |
|
| 27 | +import static org.awaitility.Awaitility.await; |
27 | 28 | import static org.hamcrest.MatcherAssert.assertThat; |
28 | 29 | import static org.hamcrest.Matchers.containsString; |
29 | 30 | import static org.hamcrest.Matchers.hasItem; |
|
38 | 39 | import hudson.model.TaskListener; |
39 | 40 | import hudson.model.queue.QueueTaskFuture; |
40 | 41 | import java.io.Serializable; |
41 | | -import java.time.Duration; |
42 | | -import java.time.Instant; |
| 42 | +import java.lang.ref.WeakReference; |
43 | 43 | import java.util.Collections; |
44 | 44 | import java.util.Objects; |
| 45 | +import java.util.HashMap; |
| 46 | +import java.util.Map; |
45 | 47 | import java.util.Set; |
46 | | -import java.util.function.Supplier; |
| 48 | +import java.util.concurrent.TimeUnit; |
47 | 49 | import java.util.logging.Level; |
48 | 50 | import java.util.logging.LogRecord; |
49 | 51 | import java.util.stream.Collectors; |
50 | | -import org.hamcrest.Matcher; |
| 52 | +import jenkins.model.Jenkins; |
51 | 53 | import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; |
52 | 54 | import org.jenkinsci.plugins.workflow.job.WorkflowJob; |
53 | 55 | import org.jenkinsci.plugins.workflow.job.WorkflowRun; |
54 | 56 | import org.jenkinsci.plugins.workflow.steps.Step; |
55 | 57 | import org.jenkinsci.plugins.workflow.steps.StepContext; |
56 | 58 | import org.jenkinsci.plugins.workflow.steps.StepDescriptor; |
57 | 59 | import org.jenkinsci.plugins.workflow.steps.StepExecution; |
| 60 | +import org.jenkinsci.plugins.workflow.steps.StepExecutions; |
58 | 61 | import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; |
59 | 62 | import org.junit.ClassRule; |
60 | 63 | import org.junit.Test; |
|
63 | 66 | import org.jvnet.hudson.test.Issue; |
64 | 67 | import org.jvnet.hudson.test.LoggerRule; |
65 | 68 | import org.jvnet.hudson.test.JenkinsSessionRule; |
| 69 | +import org.jvnet.hudson.test.MemoryAssert; |
66 | 70 | import org.jvnet.hudson.test.TestExtension; |
67 | 71 | import org.jvnet.hudson.test.recipes.LocalData; |
68 | 72 | import org.kohsuke.stapler.DataBoundConstructor; |
@@ -136,7 +140,7 @@ public class FlowExecutionListTest { |
136 | 140 | at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) |
137 | 141 | at jenkins.model.Jenkins.<init>(Jenkins.java:1019) |
138 | 142 | */ |
139 | | - waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); |
| 143 | + await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); |
140 | 144 | WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); |
141 | 145 | SemaphoreStep.success("wait/1", null); |
142 | 146 | WorkflowRun b = p.getBuildByNumber(1); |
@@ -193,6 +197,34 @@ public class FlowExecutionListTest { |
193 | 197 | }); |
194 | 198 | } |
195 | 199 |
|
| 200 | + @Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable { |
| 201 | + sessions.then(r -> { |
| 202 | + var notStuck = r.createProject(WorkflowJob.class, "not-stuck"); |
| 203 | + notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true)); |
| 204 | + var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart(); |
| 205 | + SemaphoreStep.waitForStart("wait/1", notStuckBuild); |
| 206 | + WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild); |
| 207 | + // Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck. |
| 208 | + var stuck = r.createProject(WorkflowJob.class, "stuck"); |
| 209 | + stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false)); |
| 210 | + var stuckBuild = stuck.scheduleBuild2(0).waitForStart(); |
| 211 | + await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck")); |
| 212 | + // Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService |
| 213 | + // for stuck #1 that will never complete, so the resulting future will never complete. |
| 214 | + StepExecution.applyAll(e -> null); |
| 215 | + // Let notStuckBuild complete and clean up all references. |
| 216 | + SemaphoreStep.success("wait/1", null); |
| 217 | + r.waitForCompletion(notStuckBuild); |
| 218 | + notStuckBuild = null; // Clear out the local variable in this thread. |
| 219 | + Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared. |
| 220 | + // Make sure that the reference can be GC'd. |
| 221 | + MemoryAssert.assertGC(notStuckBuildRef, true); |
| 222 | + // Allow stuck #1 to complete so the test can be cleaned up promptly. |
| 223 | + SynchronousBlockingStep.unblock("stuck"); |
| 224 | + r.waitForCompletion(stuckBuild); |
| 225 | + }); |
| 226 | + } |
| 227 | + |
196 | 228 | public static class NonResumableStep extends Step implements Serializable { |
197 | 229 | public static final long serialVersionUID = 1L; |
198 | 230 | @DataBoundConstructor |
@@ -231,14 +263,59 @@ public String getFunctionName() { |
231 | 263 | } |
232 | 264 |
|
233 | 265 | /** |
234 | | - * Wait up to 5 seconds for the given supplier to return a matching value. |
| 266 | + * Blocks the CPS VM thread synchronously (bad!) to test related problems. |
235 | 267 | */ |
236 | | - private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException { |
237 | | - Instant end = Instant.now().plus(Duration.ofSeconds(5)); |
238 | | - while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { |
239 | | - Thread.sleep(100L); |
| 268 | + public static class SynchronousBlockingStep extends Step implements Serializable { |
| 269 | + private static final long serialVersionUID = 1L; |
| 270 | + private static final Map<String, State> blocked = new HashMap<>(); |
| 271 | + private final String id; |
| 272 | + |
| 273 | + @DataBoundConstructor |
| 274 | + public SynchronousBlockingStep(String id) { |
| 275 | + this.id = id; |
| 276 | + if (blocked.put(id, State.NOT_STARTED) != null) { |
| 277 | + throw new IllegalArgumentException("Attempting to reuse ID: " + id); |
| 278 | + } |
| 279 | + } |
| 280 | + |
| 281 | + @Override |
| 282 | + public StepExecution start(StepContext context) throws Exception { |
| 283 | + return StepExecutions.synchronous(context, c -> { |
| 284 | + blocked.put(id, State.BLOCKED); |
| 285 | + c.get(TaskListener.class).getLogger().println(id + " blocked"); |
| 286 | + while (blocked.get(id) == State.BLOCKED) { |
| 287 | + Thread.sleep(100L); |
| 288 | + } |
| 289 | + c.get(TaskListener.class).getLogger().println(id + " unblocked "); |
| 290 | + return null; |
| 291 | + }); |
| 292 | + } |
| 293 | + |
| 294 | + public static boolean isStarted(String id) { |
| 295 | + var state = blocked.get(id); |
| 296 | + return state != null && state != State.NOT_STARTED; |
| 297 | + } |
| 298 | + |
| 299 | + public static void unblock(String id) { |
| 300 | + blocked.put(id, State.UNBLOCKED); |
| 301 | + } |
| 302 | + |
| 303 | + private enum State { |
| 304 | + NOT_STARTED, |
| 305 | + BLOCKED, |
| 306 | + UNBLOCKED, |
| 307 | + } |
| 308 | + |
| 309 | + @TestExtension("stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck") public static class DescriptorImpl extends StepDescriptor { |
| 310 | + @Override |
| 311 | + public Set<? extends Class<?>> getRequiredContext() { |
| 312 | + return Collections.singleton(TaskListener.class); |
| 313 | + } |
| 314 | + @Override |
| 315 | + public String getFunctionName() { |
| 316 | + return "blockSynchronously"; |
| 317 | + } |
240 | 318 | } |
241 | | - assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); |
242 | 319 | } |
243 | 320 |
|
244 | 321 | } |
0 commit comments