Skip to content

Commit a3a6988

Browse files
authored
Migrate FlowNode storage to BulkFlowNodeStorage upon execution completion to improve read performance (#807)
* Unconditionally migrate FlowNode storage to BulkFlowNodeStorage upon execution completion * Add smoke test for flow node storage optimization * Add system property to control behavior, expand catch clause, and update tests * Add FINE logging to optimizeStorage to track timing of internal operations and log overall errors at WARNING level * Add a read/write lock to control access to CpsFlowExecution.storage * Revert "Add a read/write lock to control access to CpsFlowExecution.storage" This reverts commit 4b89fed. * Acquire TimingFlowNodeStorage.readWriteLock write lock when deleting old storage dir and replacing storage.delegate * Unignore PersistenceProblemsTest.completedFinalFlowNodeNotPersisted
1 parent 769bb74 commit a3a6988

File tree

3 files changed

+120
-5
lines changed

3 files changed

+120
-5
lines changed

plugin/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.jenkinsci.plugins.workflow.graph.FlowEndNode;
6565
import org.jenkinsci.plugins.workflow.graph.FlowNode;
6666
import org.jenkinsci.plugins.workflow.graph.FlowStartNode;
67+
import org.jenkinsci.plugins.workflow.graphanalysis.DepthFirstScanner;
6768
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException;
6869
import org.jenkinsci.plugins.workflow.steps.StepContext;
6970
import org.jenkinsci.plugins.workflow.steps.StepExecution;
@@ -105,6 +106,7 @@
105106
import hudson.AbortException;
106107
import hudson.BulkChange;
107108
import hudson.Extension;
109+
import hudson.Util;
108110
import hudson.Functions;
109111
import hudson.init.Terminator;
110112
import hudson.model.Item;
@@ -143,6 +145,7 @@
143145
import org.acegisecurity.Authentication;
144146
import org.acegisecurity.userdetails.UsernameNotFoundException;
145147
import java.nio.charset.StandardCharsets;
148+
import jenkins.util.SystemProperties;
146149
import org.codehaus.groovy.GroovyBugError;
147150
import org.jboss.marshalling.reflect.SerializableClassRegistry;
148151

@@ -235,6 +238,13 @@
235238
*/
236239
@PersistIn(RUN)
237240
public class CpsFlowExecution extends FlowExecution implements BlockableResume {
241+
/**
242+
* If {@code true}, then when the execution completes, we migrate the flow node storage from
243+
* {@link SimpleXStreamFlowNodeStorage} to {@link BulkFlowNodeStorage}.
244+
*/
245+
@SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", justification = "non-final for modification via script console")
246+
public static boolean OPTIMIZE_STORAGE_UPON_COMPLETION = SystemProperties.getBoolean(CpsFlowExecution.class.getName() + ".OPTIMIZE_STORAGE_UPON_COMPLETION", true);
247+
238248
/**
239249
* Groovy script of the main source file (that the user enters in the GUI)
240250
*/
@@ -505,13 +515,59 @@ public FlowExecutionOwner getOwner() {
505515
private TimingFlowNodeStorage createStorage() throws IOException {
506516
FlowNodeStorage wrappedStorage;
507517

508-
FlowDurabilityHint hint = getDurabilityHint();
509-
wrappedStorage = (hint.isPersistWithEveryStep())
510-
? new SimpleXStreamFlowNodeStorage(this, getStorageDir())
511-
: new BulkFlowNodeStorage(this, getStorageDir());
518+
if (this.storageDir != null && this.storageDir.endsWith("-completed")) {
519+
wrappedStorage = new BulkFlowNodeStorage(this, getStorageDir());
520+
} else {
521+
FlowDurabilityHint hint = getDurabilityHint();
522+
wrappedStorage = (hint.isPersistWithEveryStep())
523+
? new SimpleXStreamFlowNodeStorage(this, getStorageDir())
524+
: new BulkFlowNodeStorage(this, getStorageDir());
525+
}
512526
return new TimingFlowNodeStorage(wrappedStorage);
513527
}
514528

529+
/**
530+
* Called when the execution completes to migrate from {@link SimpleXStreamFlowNodeStorage} to
531+
* {@link BulkFlowNodeStorage} to improve read performance for completed builds.
532+
*/
533+
private synchronized void optimizeStorage(FlowNode flowEndNode) {
534+
if (!OPTIMIZE_STORAGE_UPON_COMPLETION) {
535+
return;
536+
}
537+
if (storage.delegate instanceof SimpleXStreamFlowNodeStorage) {
538+
LOGGER.log(Level.FINE, () -> "Migrating " + this + " to BulkFlowNodeStorage");
539+
String newStorageDir = (this.storageDir != null) ? this.storageDir + "-completed" : "workflow-completed";
540+
try {
541+
FlowNodeStorage newStorage = new BulkFlowNodeStorage(this, new File(this.owner.getRootDir(), newStorageDir));
542+
DepthFirstScanner scanner = new DepthFirstScanner();
543+
scanner.setup(flowEndNode);
544+
// The hope is that by doing this right when the execution completes, most of the nodes will already be
545+
// cached in memory, saving us the cost of having to read them all from disk.
546+
for (FlowNode node : scanner) {
547+
newStorage.storeNode(node, true);
548+
}
549+
newStorage.flush();
550+
LOGGER.log(Level.FINE, () -> "Copied nodes to " + newStorageDir);
551+
File oldStorageDir = getStorageDir();
552+
this.storageDir = newStorageDir;
553+
storage.readWriteLock.writeLock().lock();
554+
try {
555+
storage.delegate = newStorage;
556+
try {
557+
Util.deleteRecursive(oldStorageDir);
558+
LOGGER.log(Level.FINE, () -> "Deleted " + oldStorageDir);
559+
} catch (IOException e) {
560+
LOGGER.log(Level.FINE, e, () -> "Unable to delete unused flow node storage directory " + oldStorageDir + " for " + this);
561+
}
562+
} finally {
563+
storage.readWriteLock.writeLock().unlock();
564+
}
565+
} catch (Exception e) {
566+
LOGGER.log(Level.WARNING, e, () -> "Unable to migrate " + this + " to BulkFlowNodeStorage");
567+
}
568+
}
569+
}
570+
515571
/**
516572
* Directory where workflow stores its state.
517573
*/
@@ -1297,6 +1353,7 @@ synchronized void onProgramEnd(Outcome outcome) {
12971353
} catch (IOException ioe) {
12981354
LOGGER.log(Level.WARNING, "Error flushing FlowNodeStorage to disk at end of run", ioe);
12991355
}
1356+
this.optimizeStorage(head);
13001357

13011358
this.persistedClean = Boolean.TRUE;
13021359
}
@@ -1851,7 +1908,7 @@ private <T> T readChild(HierarchicalStreamReader r, UnmarshallingContext context
18511908
static final ThreadLocal<CpsFlowExecution> PROGRAM_STATE_SERIALIZATION = new ThreadLocal<>();
18521909

18531910
class TimingFlowNodeStorage extends FlowNodeStorage {
1854-
private final FlowNodeStorage delegate;
1911+
FlowNodeStorage delegate;
18551912
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
18561913

18571914
TimingFlowNodeStorage(FlowNodeStorage delegate) {

plugin/src/test/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecutionTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.hamcrest.Matchers.containsString;
2929
import static org.hamcrest.Matchers.equalTo;
3030
import static org.hamcrest.Matchers.hasItem;
31+
import static org.hamcrest.Matchers.instanceOf;
3132
import static org.hamcrest.Matchers.nullValue;
3233
import static org.junit.Assert.assertEquals;
3334
import static org.junit.Assert.assertFalse;
@@ -52,6 +53,9 @@
5253
import java.net.HttpURLConnection;
5354
import java.net.MalformedURLException;
5455
import java.net.URL;
56+
import java.nio.file.Files;
57+
import java.nio.file.Path;
58+
import java.nio.file.Paths;
5559
import java.util.ArrayList;
5660
import java.util.Arrays;
5761
import java.util.List;
@@ -63,6 +67,7 @@
6367
import java.util.logging.Handler;
6468
import java.util.logging.Level;
6569
import java.util.logging.Logger;
70+
import java.util.stream.Collectors;
6671
import jenkins.model.Jenkins;
6772
import org.apache.commons.io.FileUtils;
6873
import org.hamcrest.Matchers;
@@ -75,10 +80,13 @@
7580
import org.htmlunit.WebRequest;
7681
import org.jenkinsci.plugins.scriptsecurity.sandbox.RejectedAccessException;
7782
import org.jenkinsci.plugins.scriptsecurity.sandbox.whitelists.Whitelisted;
83+
import org.jenkinsci.plugins.workflow.cps.CpsFlowExecution.TimingFlowNodeStorage;
7884
import org.jenkinsci.plugins.workflow.cps.GroovySourceFileAllowlist.DefaultAllowlist;
7985
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
8086
import org.jenkinsci.plugins.workflow.flow.FlowExecutionList;
8187
import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner;
88+
import org.jenkinsci.plugins.workflow.graph.FlowNode;
89+
import org.jenkinsci.plugins.workflow.graphanalysis.DepthFirstScanner;
8290
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
8391
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
8492
import org.jenkinsci.plugins.workflow.pickles.Pickle;
@@ -90,6 +98,9 @@
9098
import org.jenkinsci.plugins.workflow.steps.StepExecution;
9199
import org.jenkinsci.plugins.workflow.support.pickles.SingleTypedPickleFactory;
92100
import org.jenkinsci.plugins.workflow.support.pickles.TryRepeatedly;
101+
import org.jenkinsci.plugins.workflow.support.storage.BulkFlowNodeStorage;
102+
import org.jenkinsci.plugins.workflow.support.storage.FlowNodeStorage;
103+
import org.jenkinsci.plugins.workflow.support.storage.SimpleXStreamFlowNodeStorage;
93104
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
94105
import org.junit.ClassRule;
95106
import org.junit.Rule;
@@ -841,4 +852,46 @@ public boolean takesImplicitBlockArgument() {
841852
}
842853
}
843854

855+
@Test public void flowNodeStorageOptimizedUponExecutionCompletion() throws Throwable {
856+
sessions.then(r -> {
857+
WorkflowJob p = r.createProject(WorkflowJob.class, "p");
858+
p.setDefinition(new CpsFlowDefinition(
859+
"echo 'Hello, world!'\n" +
860+
"semaphore('wait')\n" +
861+
"echo 'Goodbye, world!'", true));
862+
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
863+
SemaphoreStep.waitForStart("wait/1", b);
864+
CpsFlowExecution e = (CpsFlowExecution) b.getExecution();
865+
FlowNodeStorage storage = ((TimingFlowNodeStorage) e.getStorage()).delegate;
866+
assertThat(storage, instanceOf(SimpleXStreamFlowNodeStorage.class));
867+
Path oldStorageDir = e.getStorageDir().toPath();
868+
assertThat(oldStorageDir.getFileName(), equalTo(Paths.get("workflow")));
869+
SemaphoreStep.success("wait/1", null);
870+
r.assertBuildStatusSuccess(r.waitForCompletion(b));
871+
storage = ((TimingFlowNodeStorage) e.getStorage()).delegate;
872+
assertThat(storage, instanceOf(BulkFlowNodeStorage.class));
873+
assertFalse("workflow/ should have been deleted", Files.exists(oldStorageDir));
874+
Path newStorageDir = e.getStorageDir().toPath();
875+
assertThat(newStorageDir.getFileName(), equalTo(Paths.get("workflow-completed")));
876+
assertThat(Files.list(newStorageDir).collect(Collectors.toList()), contains(newStorageDir.resolve("flowNodeStore.xml")));
877+
List<FlowNode> nodes = new DepthFirstScanner().allNodes(b.getExecution());
878+
assertThat(nodes.stream().map(FlowNode::getDisplayFunctionName).collect(Collectors.toList()), equalTo(
879+
List.of("End of Pipeline", "echo", "semaphore", "echo", "Start of Pipeline")));
880+
});
881+
sessions.then(r -> {
882+
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
883+
WorkflowRun b = p.getBuildByNumber(1);
884+
CpsFlowExecution e = (CpsFlowExecution) b.getExecution();
885+
FlowNodeStorage storage = ((TimingFlowNodeStorage) e.getStorage()).delegate;
886+
assertThat(storage, instanceOf(BulkFlowNodeStorage.class));
887+
Path newStorageDir = e.getStorageDir().toPath();
888+
assertFalse("workflow/ should have been deleted", Files.exists(newStorageDir.resolveSibling("workflow")));
889+
assertThat(newStorageDir.getFileName(), equalTo(Paths.get("workflow-completed")));
890+
assertThat(Files.list(newStorageDir).collect(Collectors.toList()), contains(newStorageDir.resolve("flowNodeStore.xml")));
891+
List<FlowNode> nodes = new DepthFirstScanner().allNodes(b.getExecution());
892+
assertThat(nodes.stream().map(FlowNode::getDisplayFunctionName).collect(Collectors.toList()), equalTo(
893+
List.of("End of Pipeline", "echo", "semaphore", "echo", "Start of Pipeline")));
894+
});
895+
}
896+
844897
}

plugin/src/test/java/org/jenkinsci/plugins/workflow/cps/PersistenceProblemsTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.junit.Rule;
2222
import org.junit.Test;
2323
import org.jvnet.hudson.test.BuildWatcher;
24+
import org.jvnet.hudson.test.FlagRule;
2425
import org.jvnet.hudson.test.Issue;
2526
import org.jvnet.hudson.test.JenkinsRule;
2627
import org.jvnet.hudson.test.RestartableJenkinsRule;
@@ -40,6 +41,9 @@ public class PersistenceProblemsTest {
4041
@Rule
4142
public RestartableJenkinsRule story = new RestartableJenkinsRule();
4243

44+
@Rule
45+
public FlagRule<Boolean> optimizeStorageFlag = new FlagRule<>(() -> CpsFlowExecution.OPTIMIZE_STORAGE_UPON_COMPLETION, v -> { CpsFlowExecution.OPTIMIZE_STORAGE_UPON_COMPLETION = v; });
46+
4347
/** Verifies all the assumptions about a cleanly finished build. */
4448
static void assertCompletedCleanly(WorkflowRun run) throws Exception {
4549
while (run.isBuilding()) {
@@ -155,6 +159,7 @@ private static InputStepExecution getInputStepExecution(WorkflowRun run, String
155159
/** Simulates something happening badly during final shutdown, which may cause build to not appear done. */
156160
@Test
157161
public void completedFinalFlowNodeNotPersisted() throws Exception {
162+
CpsFlowExecution.OPTIMIZE_STORAGE_UPON_COMPLETION = false;
158163
final int[] build = new int[1];
159164
final Result[] executionAndBuildResult = new Result[2];
160165
story.thenWithHardShutdown( j -> {

0 commit comments

Comments
 (0)