Skip to content

Commit 87fd3ad

Browse files
DHFPROD-2461: fix to manage flows accurately showing information (#2473)
* added ability to poll multiple flows * cleanup * DHFPROD-2461: trim the filename incase of trailing or leading space * DHFPROD-2461: case insensitive searches for step definitions * DHFPROD-2461: reset runningFlow when flow has 'completed' * DHFPROD-2494: add a default collection to mapping step in UI when set * DHFPROD-2461: Control when we should reset the latest job info and when to refresh job from DB * DHFPROD-2461: better control multiple flow jobs running at the same time * DHFPROD-2461: expose new endpoint for getting latest jobs per flow with rs:latest=true being set * add mdm-content as a default collection to mapped docs (#2471) * DHFPROD-2461: Control when we should reset the latest job info and when to refresh job from DB * DHFPROD-2461: make the query safer for latest flows * DHFPROD-2461: Allow null timestamps in the job docs rindex * Adding 'flowName' to stepResponse * change cache ttl * add delay before clicking target entity
1 parent 19b7f0a commit 87fd3ad

File tree

9 files changed

+156
-130
lines changed

9 files changed

+156
-130
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,6 @@ else if (!isJobSuccess.get()) {
384384
stepsMap.remove(jobId);
385385
flowMap.remove(jobId);
386386
flowResp.remove(runningJobId);
387-
runningFlow = null;
388387
if (!jobQueue.isEmpty()) {
389388
initializeFlow((String) jobQueue.peek());
390389
} else {
@@ -436,7 +435,6 @@ public void afterExecute(Runnable r, Throwable t) {
436435
//Run the next queued flow if stop-on-error is set or if the step queue is empty
437436
if(((FlowRunnerTask)r).getStepQueue().isEmpty() || runningFlow.isStopOnError()) {
438437
jobQueue.remove();
439-
runningFlow = null;
440438
if (!jobQueue.isEmpty()) {
441439
initializeFlow((String) jobQueue.peek());
442440
} else {

marklogic-data-hub/src/main/java/com/marklogic/hub/step/RunStepResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ public String getJobId() {
132132
return jobId;
133133
}
134134

135+
public String getFlowName() {
136+
return flowName;
137+
}
138+
135139
public Map<String, Object> getFullOutput() {
136140
return fullOutput;
137141
}

marklogic-data-hub/src/main/java/com/marklogic/hub/step/impl/WriteStepRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ private RunStepResponse runIngester(RunStepResponse runStepResponse, Collection<
444444
//TODO: There is one additional item returned, it has to be investigated
445445
successfulEvents.addAndGet(batch.getItems().length-1);
446446
successfulBatches.addAndGet(1);
447+
logger.debug(String.format("Current SuccessfulEvents: %d - FailedEvents: %d", successfulEvents.get(), failedEvents.get()));
447448
runStatusListener(successfulBatches.get()+failedBatches.get(), batchCount, successfulEvents, failedEvents);
448449
if (stepItemCompleteListeners.size() > 0) {
449450
Arrays.stream(batch.getItems()).forEach((WriteEvent e) -> {

marklogic-data-hub/src/main/resources/ml-modules/root/data-hub/5/impl/jobs.sjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class Jobs {
138138
let timeQuery = [];
139139
for(let flowName of flowNames) {
140140
let time = cts.values(cts.jsonPropertyReference("timeStarted"), null, ["descending","limit=1"], cts.jsonPropertyRangeQuery("flow", "=", flowName));
141-
timeQuery.push(cts.rangeQuery(cts.jsonPropertyReference("timeStarted"), "=", time));
141+
timeQuery.push(cts.andQuery([cts.jsonPropertyRangeQuery("flow", "=", flowName), cts.rangeQuery(cts.jsonPropertyReference("timeStarted"), "=", time)]));
142142
}
143143
let results = cts.search(cts.orQuery(timeQuery));
144144
if(results) {

web/e2e/specs/scenarios/simpleJson.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export default function(qaProjectDir) {
137137
await stepsPage.clickStepSourceCollectionOption("SimpleJSONIngest");
138138
await stepsPage.clickStepTargetEntityDropDown();
139139
browser.wait(EC.elementToBeClickable(stepsPage.stepTargetEntityOptions("SimpleJSON")));
140+
browser.sleep(5000);
140141
await stepsPage.clickStepTargetEntityOption("SimpleJSON");
141142
await stepsPage.clickStepCancelSave("save");
142143
browser.wait(EC.visibilityOf(stepsPage.stepDetailsName));
@@ -222,6 +223,7 @@ export default function(qaProjectDir) {
222223
await stepsPage.clickStepSourceCollectionOption("SimpleJSONMapping");
223224
await stepsPage.clickStepTargetEntityDropDown();
224225
browser.wait(EC.elementToBeClickable(stepsPage.stepTargetEntityOptions("SimpleJSON")));
226+
browser.sleep(5000);
225227
await stepsPage.clickStepTargetEntityOption("SimpleJSON");
226228
await stepsPage.clickStepCancelSave("save");
227229
browser.wait(EC.visibilityOf(stepsPage.stepDetailsName));

web/src/main/java/com/marklogic/hub/web/model/FlowStepModel.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111
import com.marklogic.hub.util.json.JSONObject;
1212
import com.marklogic.hub.web.model.FlowJobModel.FlowJobs;
1313
import com.marklogic.hub.web.model.FlowJobModel.LatestJob;
14-
import org.apache.commons.io.FileUtils;
15-
1614
import java.io.File;
1715
import java.io.FileWriter;
1816
import java.io.IOException;
1917
import java.io.PrintWriter;
2018
import java.nio.file.Paths;
2119
import java.util.*;
20+
import org.apache.commons.io.FileUtils;
2221

2322
public class FlowStepModel {
2423
private String id;
@@ -186,17 +185,20 @@ public void setLatestJob(LatestJob latestJob) {
186185
public void setJobs(FlowJobs flowJobs, boolean fromRunFlow) {
187186
if(flowJobs != null) {
188187
this.jobIds = flowJobs.jobIds;
189-
if (latestJob != null && latestJob.id != null && !this.jobIds.contains(latestJob.id)) {
188+
if (fromRunFlow) {
189+
//reset the latestJob info until the running flow starts with a new jobId
190+
flowJobs.latestJob = null;
191+
this.latestJob = null;
192+
return;
193+
} else if (latestJob != null && (latestJob.id != null && !this.jobIds.contains(latestJob.id))) {
190194
this.jobIds.add(latestJob.id);
191195
flowJobs.jobIds = this.jobIds;
192196
flowJobs.latestJob = latestJob;
193197
return;
194198
}
195-
if (fromRunFlow) {
196-
//reset the latestJob info for until the running flow starts with a new jobId
197-
flowJobs.latestJob = null;
199+
if (flowJobs.latestJob != null) {
200+
this.latestJob = flowJobs.latestJob;
198201
}
199-
this.latestJob = flowJobs.latestJob;
200202
}
201203
}
202204

web/src/main/java/com/marklogic/hub/web/service/FlowJobService.java

Lines changed: 102 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Optional;
3030
import java.util.concurrent.TimeUnit;
3131

32-
3332
@Service
3433
public class FlowJobService extends ResourceManager {
3534
private static final String ML_JOBS_NAME = "ml:jobs";
@@ -46,7 +45,7 @@ public class FlowJobService extends ResourceManager {
4645

4746
//use a cache with ttl 5 minutes to reduce frequency for fetching data from DB
4847
public final Cache<String, FlowJobs> cachedJobsByFlowName = CacheBuilder.newBuilder().expireAfterWrite(
49-
5, TimeUnit.MINUTES).build();
48+
5, TimeUnit.SECONDS).build();
5049

5150
public FlowJobService() {
5251
super();
@@ -56,127 +55,132 @@ public FlowJobService() {
5655
}
5756

5857
private void setupClient() {
59-
this.client = hubConfig.newJobDbClient();
58+
client = hubConfig.newJobDbClient();
6059
}
6160

62-
public FlowJobs getJobs(String flowName) {
63-
if (this.client == null) {
64-
this.setupClient();
61+
public FlowJobs getJobs(String flowName, boolean forceRefresh) {
62+
if (forceRefresh) {
63+
FlowJobs flowJobs = retrieveJobsByFlowName(flowName);
64+
cachedJobsByFlowName.put(flowName, flowJobs);
65+
return flowJobs;
6566
}
6667
try {
67-
return cachedJobsByFlowName.get(flowName, () -> {
68-
client.init(ML_JOBS_NAME, this);
69-
RequestParameters params = new RequestParameters();
70-
if (StringUtils.isNotEmpty(flowName)) {
71-
params.add("flow-name", flowName);
72-
}
68+
return cachedJobsByFlowName.get(flowName, () -> retrieveJobsByFlowName(flowName));
69+
} catch (Exception e) {
70+
logger.error(e.getMessage());
71+
} finally {
72+
if (cachedJobsByFlowName.getIfPresent(flowName) == null) {
73+
cachedJobsByFlowName.invalidate(flowName);
74+
}
75+
}
7376

74-
ServiceResultIterator resultItr = this.getServices().get(params);
75-
if (resultItr == null || !resultItr.hasNext()) {
76-
throw new RuntimeException("Unable to get job document");
77-
}
78-
ServiceResult res = resultItr.next();
79-
JsonNode jsonNode = res.getContent(new JacksonHandle()).get();
80-
81-
Flow flow = flowManager.getFlow(flowName);
82-
Map<String, Step> steps = flow.getSteps();
83-
84-
List<String> jobs = new ArrayList<>();
85-
LatestJob latestJob = new LatestJob();
86-
FlowJobs flowJobs = new FlowJobs(jobs, latestJob);
87-
88-
final JsonNode[] lastJob = {null};
89-
final String[] lastTime = {null, null}; //store last start and end time
90-
if (jsonNode.isArray()) {
91-
jsonNode.forEach(job -> {
92-
JSONObject jobJson = new JSONObject(job.get("job"));
93-
String jobId = jobJson.getString("jobId");
94-
jobs.add(jobId);
95-
String currStartTime = jobJson.getString("timeStarted", "");
96-
String currEndTime = jobJson.getString("timeEnded", "");
97-
if (StringUtils.isEmpty(lastTime[0])) {
77+
return cachedJobsByFlowName.getIfPresent(flowName);
78+
}
79+
80+
private FlowJobs retrieveJobsByFlowName(String flowName) {
81+
if (client == null) {
82+
setupClient();
83+
}
84+
client.init(ML_JOBS_NAME, this);
85+
RequestParameters params = new RequestParameters();
86+
if (StringUtils.isNotEmpty(flowName)) {
87+
params.add("flow-name", flowName);
88+
}
89+
90+
ServiceResultIterator resultItr = this.getServices().get(params);
91+
if (resultItr == null || !resultItr.hasNext()) {
92+
throw new RuntimeException("No jobs found for flow with name: " + flowName);
93+
}
94+
ServiceResult res = resultItr.next();
95+
JsonNode jsonNode = res.getContent(new JacksonHandle()).get();
96+
97+
Flow flow = flowManager.getFlow(flowName);
98+
Map<String, Step> steps = flow.getSteps();
99+
100+
List<String> jobs = new ArrayList<>();
101+
LatestJob latestJob = new LatestJob();
102+
FlowJobs flowJobs = new FlowJobs(jobs, latestJob);
103+
104+
final JsonNode[] lastJob = {null};
105+
final String[] lastTime = {null, null}; //store last start and end time
106+
if (jsonNode.isArray()) {
107+
jsonNode.forEach(job -> {
108+
JSONObject jobJson = new JSONObject(job.get("job"));
109+
String jobId = jobJson.getString("jobId");
110+
jobs.add(jobId);
111+
String currStartTime = jobJson.getString("timeStarted", "");
112+
String currEndTime = jobJson.getString("timeEnded", "");
113+
if (StringUtils.isEmpty(lastTime[0])) {
114+
lastTime[0] = currStartTime;
115+
lastTime[1] = currEndTime;
116+
lastJob[0] = jobJson.jsonNode();
117+
} else {
118+
try {
119+
int cmp = DatatypeConverter.parseDateTime(lastTime[0]).getTime().compareTo(DatatypeConverter.parseDateTime(currStartTime).getTime());
120+
if (cmp < 0) {
98121
lastTime[0] = currStartTime;
99122
lastTime[1] = currEndTime;
123+
//lastJobId[0] = jobId;
100124
lastJob[0] = jobJson.jsonNode();
101-
}
102-
else {
103-
try {
104-
int cmp = DatatypeConverter.parseDateTime(lastTime[0]).getTime().compareTo(DatatypeConverter.parseDateTime(currStartTime).getTime());
105-
if (cmp < 0) {
106-
lastTime[0] = currStartTime;
107-
lastTime[1] = currEndTime;
108-
//lastJobId[0] = jobId;
109-
lastJob[0] = jobJson.jsonNode();
110-
}
111-
else if (cmp == 0) { //compare end time just in case
112-
if (DatatypeConverter.parseDateTime(lastTime[1]).getTime().compareTo(DatatypeConverter.parseDateTime(currEndTime).getTime()) < 0) {
113-
lastTime[0] = currStartTime;
114-
lastTime[1] = currEndTime;
115-
lastJob[0] = jobJson.jsonNode();
116-
}
117-
}
118-
}
119-
catch (Exception e) {
120-
//ignore, maybe log
125+
} else if (cmp == 0) { //compare end time just in case
126+
if (DatatypeConverter.parseDateTime(lastTime[1]).getTime().compareTo(DatatypeConverter.parseDateTime(currEndTime).getTime()) < 0) {
127+
lastTime[0] = currStartTime;
128+
lastTime[1] = currEndTime;
129+
lastJob[0] = jobJson.jsonNode();
121130
}
122131
}
123-
});
124-
}
125-
126-
if (jobs.isEmpty() || lastJob[0] == null) {
127-
return flowJobs;
132+
} catch (Exception e) {
133+
logger.error(e.getMessage());
134+
}
128135
}
129-
JSONObject jobJson = new JSONObject(lastJob[0]);
130-
131-
latestJob.id = jobJson.getString("jobId");
132-
latestJob.startTime = jobJson.getString("timeStarted", "");
133-
latestJob.endTime = jobJson.getString("timeEnded", "");
134-
latestJob.status = jobJson.getString("jobStatus");
135-
136-
String completedKey = jobJson.getString("lastCompletedStep", "0");
137-
String attemptedKey = jobJson.getString("lastAttemptedStep", "0");
138-
String stepKey = Integer.compare(Integer.valueOf(completedKey), Integer.valueOf(attemptedKey)) < 0 ? attemptedKey : completedKey;
139-
Optional.ofNullable(steps.get(stepKey)).ifPresent(s -> {
140-
latestJob.stepName = s.getName();
141-
latestJob.stepId = s.getName() + "-" + s.getStepDefinitionType();
142-
});
143-
144-
JsonNode stepRes = jobJson.getNode("stepResponses");
145-
146-
if (stepRes != null) {
147-
stepRes.forEach(s -> {
148-
latestJob.successfulEvents += s.get("successfulEvents").asLong();
149-
latestJob.failedEvents += s.get("failedEvents").asLong();
150-
latestJob.output = s.get("stepOutput"); //last step output ?
151-
});
152-
}
153-
154-
return flowJobs;
155136
});
156137
}
157-
catch (Exception e) {
158138

139+
if (jobs.isEmpty() || lastJob[0] == null) {
140+
return flowJobs;
159141
}
160-
finally {
161-
if (cachedJobsByFlowName.getIfPresent(flowName) == null) {
162-
cachedJobsByFlowName.invalidate(flowName);
163-
}
164-
this.release();
142+
JSONObject jobJson = new JSONObject(lastJob[0]);
143+
144+
latestJob.id = jobJson.getString("jobId");
145+
latestJob.startTime = jobJson.getString("timeStarted", "");
146+
latestJob.endTime = jobJson.getString("timeEnded", "");
147+
if ("N/A".equals(latestJob.endTime)) {
148+
latestJob.endTime = "";
165149
}
166-
return cachedJobsByFlowName.getIfPresent(flowName);
150+
latestJob.status = jobJson.getString("jobStatus");
151+
152+
String completedKey = jobJson.getString("lastCompletedStep", "0");
153+
String attemptedKey = jobJson.getString("lastAttemptedStep", "0");
154+
String stepKey = Integer.compare(Integer.valueOf(completedKey), Integer.valueOf(attemptedKey)) < 0 ? attemptedKey : completedKey;
155+
Optional.ofNullable(steps.get(stepKey)).ifPresent(s -> {
156+
latestJob.stepName = s.getName();
157+
latestJob.stepId = s.getName() + "-" + s.getStepDefinitionType();
158+
});
159+
160+
JsonNode stepRes = jobJson.getNode("stepResponses");
161+
162+
if (stepRes != null) {
163+
stepRes.forEach(s -> {
164+
latestJob.successfulEvents += s.get("successfulEvents") != null ? s.get("successfulEvents").asLong() : 0;
165+
latestJob.failedEvents += s.get("failedEvents") != null ? s.get("failedEvents").asLong() : 0;
166+
latestJob.output = s.get("stepOutput"); //last step output ?
167+
});
168+
}
169+
this.release();
170+
171+
return flowJobs;
167172
}
168173

169-
//TODO: fix this whole client mess and properly report exceptions
170174
private void release() {
171-
if (this.client != null) {
175+
if (client != null) {
172176
try {
173-
this.client.release();
177+
client.release();
174178
}
175179
catch (Exception e) {
176180
logger.error(e.getMessage());
177181
}
178182
finally {
179-
this.client = null;
183+
client = null;
180184
}
181185
}
182186
}

0 commit comments

Comments
 (0)