Skip to content

Commit b702b5d

Browse files
rjrudinMarkLogic Builder
authored andcommitted
DHFPROD-7289: Handling custom hook errors for 5.4
1 parent e375f90 commit b702b5d

File tree

4 files changed

+103
-18
lines changed

4 files changed

+103
-18
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/step/impl/QueryStepRunner.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,17 @@ private RunStepResponse runHarmonizer(RunStepResponse runStepResponse, Collectio
428428
// if exception is thrown update the failed related metrics
429429
stepMetrics.getFailedBatches().addAndGet(1);
430430
stepMetrics.getFailedEvents().addAndGet(batch.getItems().length);
431+
432+
if (flow != null && flow.isStopOnError()) {
433+
// Stop the job, and then we need to call processFailure to force the FlowRunner to stop the flow
434+
JobTicket jobTicket = ticketWrapper.get("jobTicket");
435+
if (jobTicket != null) {
436+
dataMovementManager.stopJob(jobTicket);
437+
}
438+
stepItemFailureListeners.forEach((StepItemFailureListener listener) -> {
439+
listener.processFailure(runStepResponse.getJobId(), null);
440+
});
441+
}
431442
}
432443
})
433444
.onQueryFailure((QueryBatchException failure) -> {

marklogic-data-hub/src/main/resources/ml-modules/root/data-hub/5/impl/flow.sjs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -262,20 +262,27 @@ class Flow {
262262
delete this.datahub.flow;
263263
}
264264

265-
if (this.isContextDB(this.globalContext.sourceDatabase) && !combinedOptions.stepUpdate) {
266-
this.runStep(items, content, combinedOptions, flowName, stepNumber, flowStep);
267-
} else {
268-
const flowInstance = this;
269-
xdmp.invoke(
270-
'/data-hub/5/impl/invoke-step.sjs',
271-
{flowInstance, items, content, combinedOptions, flowName, flowStep, stepNumber},
272-
{
273-
database: this.globalContext.sourceDatabase ? xdmp.database(this.globalContext.sourceDatabase) : xdmp.database(),
274-
update: combinedOptions.stepUpdate ? 'true': 'false',
275-
commit: 'auto',
276-
ignoreAmps: true
277-
}
278-
);
265+
try {
266+
if (this.isContextDB(this.globalContext.sourceDatabase) && !combinedOptions.stepUpdate) {
267+
this.runStep(items, content, combinedOptions, flowName, stepNumber, flowStep);
268+
} else {
269+
const flowInstance = this;
270+
xdmp.invoke(
271+
'/data-hub/5/impl/invoke-step.sjs',
272+
{flowInstance, items, content, combinedOptions, flowName, flowStep, stepNumber},
273+
{
274+
database: this.globalContext.sourceDatabase ? xdmp.database(this.globalContext.sourceDatabase) : xdmp.database(),
275+
update: combinedOptions.stepUpdate ? 'true': 'false',
276+
commit: 'auto',
277+
ignoreAmps: true
278+
}
279+
);
280+
}
281+
} catch (error) {
282+
this.globalContext.failedItems = items;
283+
this.globalContext.batchErrors.push(error);
284+
this.updateBatchDocument(flowName, flowStep, combinedOptions, items, {});
285+
throw error;
279286
}
280287

281288
let writeTransactionInfo = {};

marklogic-data-hub/src/test/java/com/marklogic/hub/flow/RunStepWithCustomHookTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,26 @@
33
import com.marklogic.hub.AbstractHubCoreTest;
44
import com.marklogic.hub.HubClient;
55
import com.marklogic.hub.job.JobStatus;
6+
import com.marklogic.hub.step.RunStepResponse;
67
import com.marklogic.hub.test.ReferenceModelProject;
78
import org.junit.jupiter.api.BeforeEach;
89
import org.junit.jupiter.api.Test;
910

11+
import java.util.HashMap;
12+
import java.util.Map;
13+
1014
import static org.junit.jupiter.api.Assertions.*;
1115

1216
public class RunStepWithCustomHookTest extends AbstractHubCoreTest {
1317

1418
private final static String URI_CREATED_BY_HOOK = "/insertedByHook/customer1.json";
1519

1620
HubClient client;
21+
ReferenceModelProject project;
1722

1823
@BeforeEach
1924
void beforeEach() {
20-
ReferenceModelProject project = installReferenceModelProject();
25+
project = installReferenceModelProject();
2126
runAsDataHubOperator();
2227
client = getHubClient();
2328
project.createRawCustomer(1, "Jane");
@@ -46,4 +51,61 @@ void afterHook() {
4651
assertNull(client.getStagingClient().newDocumentManager().exists(URI_CREATED_BY_HOOK));
4752
assertNotNull(client.getFinalClient().newDocumentManager().exists(URI_CREATED_BY_HOOK));
4853
}
54+
55+
@Test
56+
void beforeHookThrowsErrorAndStopOnErrorIsTrue() {
57+
project.createRawCustomer(2, "Janet");
58+
59+
Map<String, Object> options = new HashMap<>();
60+
options.put("stopOnError", "true");
61+
options.put("throwErrorForStepNumber", "1");
62+
RunFlowResponse response = runFlow(new FlowInputs("customHookFlow", "1", "2").withOptions(options));
63+
64+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), response.getJobStatus());
65+
assertEquals("1", response.getLastAttemptedStep());
66+
assertEquals("0", response.getLastCompletedStep());
67+
assertEquals(1, response.getStepResponses().keySet().size(),
68+
"The second step should not have been run since stopOnError=true");
69+
70+
RunStepResponse stepResponse = response.getStepResponses().get("1");
71+
assertEquals("canceled step 1", stepResponse.getStatus());
72+
assertEquals(2, stepResponse.getTotalEvents());
73+
assertEquals(0, stepResponse.getSuccessfulEvents());
74+
assertEquals(2, stepResponse.getFailedEvents(), "Both items are considered to have failed since the " +
75+
"entire batch failed due to a custom hook error");
76+
assertEquals(0, stepResponse.getSuccessfulBatches());
77+
assertEquals(1, stepResponse.getFailedBatches());
78+
assertEquals(1, stepResponse.getStepOutput().size());
79+
assertTrue(stepResponse.getStepOutput().get(0).contains("Throwing error on purpose for step number"),
80+
"Unexpected step error: " + stepResponse.getStepOutput().get(0));
81+
assertFalse(stepResponse.isSuccess());
82+
}
83+
84+
@Test
85+
void afterHookThrowsErrorAndStopOnErrorIsTrue() {
86+
project.createRawCustomer(2, "Janet");
87+
88+
Map<String, Object> options = new HashMap<>();
89+
options.put("stopOnError", "true");
90+
options.put("throwErrorForStepNumber", "2");
91+
RunFlowResponse response = runFlow(new FlowInputs("customHookFlow", "2", "1").withOptions(options));
92+
93+
assertEquals(JobStatus.STOP_ON_ERROR.toString(), response.getJobStatus());
94+
assertEquals("2", response.getLastAttemptedStep());
95+
assertEquals("0", response.getLastCompletedStep());
96+
assertEquals(1, response.getStepResponses().keySet().size(),
97+
"The second step (step 1 in this scenario) should not have been run since stopOnError=true");
98+
99+
RunStepResponse stepResponse = response.getStepResponses().get("2");
100+
assertEquals("canceled step 2", stepResponse.getStatus());
101+
assertEquals(2, stepResponse.getTotalEvents());
102+
assertEquals(0, stepResponse.getSuccessfulEvents());
103+
assertEquals(2, stepResponse.getFailedEvents());
104+
assertEquals(0, stepResponse.getSuccessfulBatches());
105+
assertEquals(1, stepResponse.getFailedBatches());
106+
assertEquals(1, stepResponse.getStepOutput().size());
107+
assertTrue(stepResponse.getStepOutput().get(0).contains("Throwing error on purpose for step number"),
108+
"Unexpected step error: " + stepResponse.getStepOutput().get(0));
109+
assertFalse(stepResponse.isSuccess());
110+
}
49111
}
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
declareUpdate();
22

33
var uris;
4-
var content;
4+
var content; // expected to be an array
55
var options;
66
var flowName;
77
var stepNumber;
88
var step;
99

10-
for (const contentObject of content) {
11-
xdmp.documentInsert("/insertedByHook" + contentObject.uri, contentObject.value, contentObject.context.permissions);
10+
if (options.throwErrorForStepNumber === stepNumber) {
11+
throw Error("Throwing error on purpose for step number: " + stepNumber);
1212
}
13+
14+
// Using an array function to verify that an array is passed to the hook
15+
content.forEach(contentObject => {
16+
xdmp.documentInsert("/insertedByHook" + contentObject.uri, contentObject.value, contentObject.context.permissions);
17+
});

0 commit comments

Comments
 (0)