Skip to content

Commit 8452b5d

Browse files
rjrudinMarkLogic Builder
authored andcommitted
DHFPROD-7570: stopOnError now works when the collector fails
I had to update a couple existing tests, which I think in hindsight were erroneously expecting the wrong job status. If stopOnError=true, and the collector fails, then a user expects the flow to be stopped and no other steps to execute, which means the job status should be stop-on-error.
1 parent 6e6f28a commit 8452b5d

File tree

2 files changed

+55
-14
lines changed

2 files changed

+55
-14
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,7 @@ public void run() {
337337
.onItemFailed((jobId, itemId)-> {
338338
errorCount.incrementAndGet();
339339
if(flow.isStopOnError()){
340-
jobStoppedOnError.set(true);
341-
stopJob(jobId);
340+
stopJobOnError(jobId);
342341
}
343342
});
344343

@@ -347,6 +346,10 @@ public void run() {
347346
}
348347
stepResp = stepRunner.run();
349348
stepRunner.awaitCompletion();
349+
final boolean stepFailed = stepResp.getStatus() != null && stepResp.getStatus().startsWith("failed");
350+
if (stepFailed && runningFlow.isStopOnError()) {
351+
stopJobOnError(runningJobId);
352+
}
350353
}
351354
catch (Exception e) {
352355
stepResp = RunStepResponse.withFlow(flow).withStep(stepNum);
@@ -370,8 +373,7 @@ public void run() {
370373
}
371374

372375
if(runningFlow.isStopOnError()) {
373-
jobStoppedOnError.set(true);
374-
stopJob(runningJobId);
376+
stopJobOnError(runningJobId);
375377
}
376378
}
377379
finally {
@@ -450,6 +452,12 @@ else if (!isJobSuccess.get()) {
450452
}
451453
}
452454
}
455+
456+
private void stopJobOnError(String jobId) {
457+
jobStoppedOnError.set(true);
458+
stopJob(jobId);
459+
}
460+
453461
}
454462

455463
public void awaitCompletion() {

marklogic-data-hub/src/test/java/com/marklogic/hub/flow/impl/FlowRunnerTest.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void customStepReferencesModulePathThatDoesntExist() {
130130
List<String> errors = stepResponse.getStepOutput();
131131
assertEquals(1, errors.size(), "Expecting an error due to the missing module");
132132
assertTrue(errors.get(0).contains("Unable to access module: /custom-modules/custom/value-step/main.sjs. " +
133-
"Verify that this module is in your modules database and that your user account has a role that grants read and execute permission to this module"),
133+
"Verify that this module is in your modules database and that your user account has a role that grants read and execute permission to this module"),
134134
"Did not find expected message in error; error: " + errors.get(0));
135135

136136
assertEquals(2, stepResponse.getTotalEvents(), "Expecting 2, as there are 2 URIs matching the collection query");
@@ -235,7 +235,7 @@ public void testIngestCSVasXML() {
235235
"Per DHFPROD-6665, the DHF file ingester now includes a no-namespaced 'root' element around each of the elements constructed from a " +
236236
"row in the delimited file. This matches what MLCP does, thereby avoiding confusion from a user getting different results from DHF " +
237237
"and from MLCP.");
238-
}
238+
}
239239

240240
@SuppressWarnings("deprecation")
241241
protected RunFlowResponse runFlow(String flowName, String commaDelimitedSteps, String jobId, Map<String, Object> options, Map<String, Object> stepConfig) {
@@ -286,15 +286,18 @@ public void testSourceQueryLimit(){
286286
verifyJobFinished(resp);
287287
assertEquals(2, getDocCount(HubConfig.DEFAULT_FINAL_NAME, "xml-map"));
288288

289+
final String stopOnErrorReason = "Per DHFPROD-7570, if the sourceQuery fails for any reason, and the flow has stopOnError=true, then " +
290+
"the flow should be stopped and the job status should be stop-on-error.";
291+
289292
opts.put("sourceQueryLimit", -2);
290293
resp = runFlow("testFlow", "6", UUID.randomUUID().toString(), opts, new HashMap<>());
291294
flowRunner.awaitCompletion();
292-
assertEquals("failed", resp.getJobStatus(), "Unexpected job status: " + resp.toJson());
295+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), resp.getJobStatus(), stopOnErrorReason);
293296

294297
opts.put("sourceQueryLimit", "invalidValue");
295298
resp = runFlow("testFlow", "6", UUID.randomUUID().toString(), opts, new HashMap<>());
296299
flowRunner.awaitCompletion();
297-
assertEquals("failed", resp.getJobStatus(), "Unexpected job status: " + resp.toJson());
300+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), resp.getJobStatus(), stopOnErrorReason);
298301
}
299302

300303
@Test
@@ -343,8 +346,8 @@ public void testInvalidQueryCollector(){
343346

344347
RunFlowResponse resp = runFlow("testFlow", "1,6", UUID.randomUUID().toString(), opts, null);
345348
flowRunner.awaitCompletion();
346-
assertEquals(JobStatus.FINISHED_WITH_ERRORS.toString(), resp.getJobStatus(), "Since one step completed and " +
347-
"the other failed, the status should be finished with errors: " + resp.toJson());
349+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), resp.getJobStatus(), "Per DHFPROD-7570, since the sourceQuery failed and " +
350+
"stopOnError=true, the job status should be stop-on-error, even though one step did complete successfully");
348351
assertEquals(1, getDocCount(HubConfig.DEFAULT_STAGING_NAME, "xml-coll"));
349352
assertEquals(0, getDocCount(HubConfig.DEFAULT_FINAL_NAME, "xml-map"));
350353

@@ -355,8 +358,9 @@ public void testInvalidQueryCollector(){
355358

356359
resp = runFlow("testFlow", "6", UUID.randomUUID().toString(), opts, null);
357360
flowRunner.awaitCompletion();
358-
assertEquals(JobStatus.FAILED.toString(), resp.getJobStatus(), "Since all steps failed (there was just one step), " +
359-
"the status should be failed: " + resp.toJson());
361+
362+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), resp.getJobStatus(), "Per DHFPROD-7570, since the sourceQuery failed " +
363+
"and stopOnError=true, the job status should be stop-on-error");
360364
assertEquals(0, getDocCount(HubConfig.DEFAULT_FINAL_NAME, "xml-map"));
361365
stepResponse = resp.getStepResponses().get("6");
362366
assertEquals("failed step 6", stepResponse.getStatus());
@@ -496,7 +500,7 @@ void stopOnErrorWhenStepFailsItem() {
496500
.withOption("throwErrorOnPurpose", true)
497501
);
498502

499-
assertEquals("stop-on-error", response.getJobStatus());
503+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), response.getJobStatus());
500504
assertEquals("1", response.getLastAttemptedStep());
501505
assertEquals("0", response.getLastCompletedStep());
502506

@@ -512,6 +516,35 @@ void stopOnErrorWhenStepFailsItem() {
512516
"The second step should not have been run since stopOnError=true");
513517
}
514518

519+
@Test
520+
void stopOnErrorIsTrueAndSourceQueryThrowsError() {
521+
installReferenceModelProject().createRawCustomer(1, "Jane");
522+
523+
RunFlowResponse response = runFlow(new FlowInputs("customHookFlow", "1", "2")
524+
.withOption("stopOnError", true)
525+
.withOption("sourceQuery", "cts.collectionQuery('this should throw an error")
526+
);
527+
528+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), response.getJobStatus());
529+
assertEquals("1", response.getLastAttemptedStep());
530+
assertEquals("0", response.getLastCompletedStep());
531+
assertEquals(1, response.getStepResponses().keySet().size(),
532+
"The second step should not have been run since stopOnError=true");
533+
534+
RunStepResponse stepResponse = response.getStepResponses().get("1");
535+
assertEquals(0, stepResponse.getTotalEvents(), "Total events is zero because the collector failed");
536+
assertEquals(0, stepResponse.getSuccessfulEvents());
537+
assertEquals(0, stepResponse.getFailedEvents());
538+
assertEquals(0, stepResponse.getSuccessfulBatches());
539+
assertEquals(0, stepResponse.getFailedBatches());
540+
assertFalse(stepResponse.isSuccess());
541+
assertEquals("failed step 1", stepResponse.getStatus(), "The step status is 'failed' since the sourceQuery could not be evaluated");
542+
assertEquals(1, stepResponse.getStepOutput().size());
543+
assertTrue(stepResponse.getStepOutput().get(0).contains("Unable to collect items to process for flow customHookFlow and step 1"));
544+
assertTrue(stepResponse.getStepOutput().get(0).contains("Invalid or unexpected token"),
545+
"The stepOutput should have a stacktrace that includes the ML-generated error message");
546+
}
547+
515548
@Test
516549
public void testIngestBinaryAndTxt(){
517550
runAsDataHubOperator();
@@ -587,7 +620,7 @@ public void testStopJob() {
587620
final String status = resp.getJobStatus() != null ? resp.getJobStatus().toLowerCase() : "";
588621
assertEquals(JobStatus.CANCELED.toString().toLowerCase(), status,
589622
"Expected the response status to be canceled, since the job was stopped before it finished, but was instead: " + status
590-
+ ". If this failed in Jenkins, it likely can be ignored because we don't have a firm idea of how long the " +
623+
+ ". If this failed in Jenkins, it likely can be ignored because we don't have a firm idea of how long the " +
591624
"thread that stops the job should wait until it stops the job; response: " + resp.toJson());
592625
}
593626

0 commit comments

Comments
 (0)