Skip to content

Commit 21492d5

Browse files
rjrudinMarkLogic Builder
authored andcommitted
DHFPROD-7701: Restoring deleted code that DHHCE needs
1 parent 00c9112 commit 21492d5

File tree

10 files changed

+436
-11
lines changed

10 files changed

+436
-11
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/FlowRunner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,12 @@ public interface FlowRunner {
129129
* @throws TimeoutException if times out
130130
*/
131131
void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
132+
133+
/**
134+
* Sets the status change listener on the flowrunner object
135+
* @param listener - the listener for when the status changes
136+
* @return the flow runner object
137+
*/
138+
FlowRunner onStatusChanged(FlowStatusListener listener);
139+
132140
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.marklogic.hub.flow;
2+
3+
4+
import com.marklogic.hub.step.impl.Step;
5+
6+
/**
7+
* This is no longer used in DHF, but is still required by DHCCE.
8+
*/
9+
public interface FlowStatusListener {
10+
11+
/**
12+
*
13+
* @param jobId - the id of the running job
14+
* @param step - the current running step
15+
* @param jobStatus - status of the running job
16+
* @param percentComplete - the percentage of completeness expressed as an int
17+
* @param successfulEvents - counter for success
18+
* @param failedEvents - counter for failure
19+
* @param message - the message you'd like to send along with it
20+
*/
21+
void onStatusChanged(String jobId, Step step, String jobStatus, int percentComplete, long successfulEvents, long failedEvents, String message) ;
22+
23+
}

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public class FlowRunnerImpl implements FlowRunner {
6868

6969
private Queue<String> jobQueue = new ConcurrentLinkedQueue<>();
7070

71+
private List<FlowStatusListener> flowStatusListeners = new ArrayList<>();
72+
7173
private ThreadPoolExecutor threadPool;
7274

7375
public FlowRunnerImpl() {
@@ -117,6 +119,12 @@ public FlowRunnerImpl(HubConfig hubConfig) {
117119
this(hubConfig.newHubClient());
118120
}
119121

122+
@Override
123+
public FlowRunner onStatusChanged(FlowStatusListener listener) {
124+
this.flowStatusListeners.add(listener);
125+
return this;
126+
}
127+
120128
@Deprecated
121129
public RunFlowResponse runFlow(String flowName) {
122130
return runFlow(flowName, null, null, new HashMap<>(), new HashMap<>());
@@ -317,6 +325,9 @@ public void run() {
317325
Map<String, RunStepResponse> stepOutputs = new HashMap<>();
318326
String stepNum = null;
319327

328+
final long[] currSuccessfulEvents = {0};
329+
final long[] currFailedEvents = {0};
330+
final int[] currPercentComplete = {0};
320331
while (! stepQueue.isEmpty()) {
321332
stepNum = stepQueue.poll();
322333
runningStep = runningFlow.getSteps().get(stepNum);
@@ -339,8 +350,16 @@ public void run() {
339350
if(flow.isStopOnError()){
340351
stopJobOnError(jobId);
341352
}
353+
})
354+
.onStatusChanged((jobId, percentComplete, jobStatus, successfulEvents, failedEvents, message) ->{
355+
flowStatusListeners.forEach((FlowStatusListener listener) -> {
356+
currSuccessfulEvents[0] = successfulEvents;
357+
currFailedEvents[0] = failedEvents;
358+
currPercentComplete[0] = percentComplete;
359+
listener.onStatusChanged(jobId, runningStep, jobStatus, percentComplete, successfulEvents, failedEvents, runningStep.getName() + " : " + message);
360+
});
342361
});
343-
362+
//If property values are overriden in UI, use those values over any other.
344363
if(flow.getOverrideStepConfig() != null) {
345364
stepRunner.withStepConfig(flow.getOverrideStepConfig());
346365
}
@@ -371,7 +390,15 @@ public void run() {
371390
else{
372391
stepResp.withStatus(JobStatus.FAILED_PREFIX + stepNum);
373392
}
374-
393+
RunStepResponse finalStepResp = stepResp;
394+
try {
395+
flowStatusListeners.forEach((FlowStatusListener listener) -> {
396+
listener.onStatusChanged(jobId, runningStep, JobStatus.FAILED.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0],
397+
runningStep.getName() + " " + Arrays.toString(finalStepResp.stepOutput.toArray()));
398+
});
399+
} catch (Exception ex) {
400+
logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
401+
}
375402
if(runningFlow.isStopOnError()) {
376403
stopJobOnError(runningJobId);
377404
}
@@ -438,6 +465,24 @@ else if (!isJobSuccess.get()) {
438465
}
439466
}
440467

468+
if (!isJobSuccess.get()) {
469+
try {
470+
flowStatusListeners.forEach((FlowStatusListener listener) -> {
471+
listener.onStatusChanged(jobId, runningStep, jobStatus.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0], JobStatus.FAILED.toString());
472+
});
473+
} catch (Exception ex) {
474+
logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
475+
}
476+
} else {
477+
try {
478+
flowStatusListeners.forEach((FlowStatusListener listener) -> {
479+
listener.onStatusChanged(jobId, runningStep, jobStatus.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0], JobStatus.FINISHED.toString());
480+
});
481+
} catch (Exception ex) {
482+
logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
483+
}
484+
}
485+
441486
jobQueue.remove();
442487
stepsMap.remove(jobId);
443488
flowMap.remove(jobId);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.marklogic.hub.mapping;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.marklogic.client.DatabaseClient;
5+
import com.marklogic.client.extensions.ResourceManager;
6+
import com.marklogic.client.io.Format;
7+
import com.marklogic.client.io.JacksonHandle;
8+
import com.marklogic.client.io.StringHandle;
9+
import com.marklogic.client.util.RequestParameters;
10+
11+
/**
12+
* This is no longer used in DHF, but is still required by DHCCE.
13+
*/
14+
public class MappingValidator extends ResourceManager {
15+
16+
public MappingValidator(DatabaseClient client) {
17+
super();
18+
client.init("mlMappingValidator", this);
19+
}
20+
21+
public JsonNode validateJsonMapping(String jsonMapping, String uri) {
22+
RequestParameters params = new RequestParameters();
23+
params.add("uri", uri);
24+
return getServices().post(params,
25+
new StringHandle(jsonMapping).withFormat(Format.JSON), new JacksonHandle()).get();
26+
}
27+
28+
}

marklogic-data-hub/src/main/java/com/marklogic/hub/step/StepRunner.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ public interface StepRunner {
102102
*/
103103
StepRunner onItemFailed(StepItemFailureListener listener);
104104

105+
/**
106+
* Sets the status change listener on the flowrunner object
107+
* @param listener - the listener for when the status changes
108+
* @return the step runner object
109+
*/
110+
StepRunner onStatusChanged(StepStatusListener listener);
111+
105112
/**
106113
* Blocks until the step execution is complete.
107114
*/

marklogic-data-hub/src/main/java/com/marklogic/hub/step/impl/QueryStepRunner.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class QueryStepRunner extends LoggingObject implements StepRunner {
4646
private int batchSize;
4747
private int threadCount;
4848
private Map<String, Object> combinedOptions;
49+
private int previousPercentComplete;
4950
private boolean stopOnFailure = false;
5051
private String jobId;
5152
private boolean isFullOutput = false;
@@ -54,6 +55,7 @@ public class QueryStepRunner extends LoggingObject implements StepRunner {
5455

5556
private List<StepItemCompleteListener> stepItemCompleteListeners = new ArrayList<>();
5657
private List<StepItemFailureListener> stepItemFailureListeners = new ArrayList<>();
58+
private List<StepStatusListener> stepStatusListeners = new ArrayList<>();
5759
private Map<String, Object> stepConfig = new HashMap<>();
5860
private HubClient hubClient;
5961
private Thread runningThread = null;
@@ -132,6 +134,12 @@ public StepRunner onItemFailed(StepItemFailureListener listener) {
132134
return this;
133135
}
134136

137+
@Override
138+
public StepRunner onStatusChanged(StepStatusListener listener) {
139+
this.stepStatusListeners.add(listener);
140+
return this;
141+
}
142+
135143
@Override
136144
public void awaitCompletion() {
137145
try {
@@ -243,18 +251,32 @@ public int getBatchSize(){
243251

244252
private DiskQueue<String> runCollector(String sourceDatabase) {
245253
CollectorImpl collector = new CollectorImpl(hubClient, sourceDatabase);
254+
255+
stepStatusListeners.forEach((StepStatusListener listener) -> {
256+
listener.onStatusChange(this.jobId, 0, JobStatus.RUNNING_PREFIX + step, 0, 0, "running collector");
257+
});
258+
246259
return !isStopped.get() ? collector.run(this.flow.getName(), step, combinedOptions) : null;
247260
}
248261

249262
private RunStepResponse runHarmonizer(RunStepResponse runStepResponse, Collection<String> uris) {
250263
StepMetrics stepMetrics = new StepMetrics();
251264
final int urisCount = uris != null ? uris.size() : 0;
265+
266+
stepStatusListeners.forEach((StepStatusListener listener) -> {
267+
listener.onStatusChange(runStepResponse.getJobId(), 0, JobStatus.RUNNING_PREFIX + step, 0,0, "starting step execution");
268+
});
269+
252270
if (urisCount == 0) {
253271
logger.info("No items found to process");
254272
final String stepStatus = isStopped.get() ?
255273
JobStatus.CANCELED_PREFIX + step :
256274
JobStatus.COMPLETED_PREFIX + step;
257275

276+
stepStatusListeners.forEach((StepStatusListener listener) -> {
277+
listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, 0, 0,
278+
(stepStatus.contains(JobStatus.COMPLETED_PREFIX) ? "collector returned 0 items" : "job was stopped"));
279+
});
258280
runStepResponse.setCounts(0,0,0,0,0);
259281
runStepResponse.withStatus(stepStatus);
260282

@@ -346,6 +368,15 @@ private RunStepResponse runHarmonizer(RunStepResponse runStepResponse, Collectio
346368
stepMetrics.getFailedBatches().addAndGet(1);
347369
}
348370

371+
int percentComplete = (int) (((double) stepMetrics.getSuccessfulBatchesCount() / batchCount) * 100.0);
372+
373+
if (percentComplete != previousPercentComplete && (percentComplete % 5 == 0)) {
374+
previousPercentComplete = percentComplete;
375+
stepStatusListeners.forEach((StepStatusListener listener) -> {
376+
listener.onStatusChange(runStepResponse.getJobId(), percentComplete, JobStatus.RUNNING_PREFIX + step, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), "");
377+
});
378+
}
379+
349380
if (stepItemCompleteListeners.size() > 0) {
350381
response.completedItems.forEach((String item) -> {
351382
stepItemCompleteListeners.forEach((StepItemCompleteListener listener) -> {
@@ -412,6 +443,10 @@ private RunStepResponse runHarmonizer(RunStepResponse runStepResponse, Collectio
412443

413444
String stepStatus = determineStepStatus(stepMetrics);
414445

446+
stepStatusListeners.forEach((StepStatusListener listener) -> {
447+
listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), "");
448+
});
449+
415450
dataMovementManager.stopJob(queryBatcher);
416451

417452
runStepResponse.setCounts(urisCount, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), stepMetrics.getSuccessfulBatchesCount(), stepMetrics.getFailedBatchesCount());

0 commit comments

Comments
 (0)