Skip to content

Commit f9b48d5

Browse files
committed
speeding up tests
1 parent bda34fd commit f9b48d5

29 files changed

+720
-324
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.marklogic.hub.flow;
2+
3+
public interface BatchCompleteListener {
4+
void onBatchFailed();
5+
void onBatchSucceeded();
6+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package com.marklogic.hub.flow;
22

33
public interface FlowItemCompleteListener {
4-
void processCompletion(long jobId, String itemId);
4+
void processCompletion(String jobId, String itemId);
55
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package com.marklogic.hub.flow;
22

33
public interface FlowItemFailureListener {
4-
void processFailure(long jobId, String itemId);
4+
void processFailure(String jobId, String itemId);
55
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package com.marklogic.hub.flow;
22

33
public interface FlowStatusListener {
4-
void onStatusChange(long jobId, int percentComplete, String message);
4+
void onStatusChange(String jobId, int percentComplete, String message);
55
}

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/RunFlowResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@ public class RunFlowResponse {
99
public long errorCount = 0;
1010
public List<String> completedItems;
1111
public List<String> failedItems;
12+
13+
public String toString() {
14+
return "{totalCount: " + totalCount + ", errorCount: " + errorCount + ", completedItems: " + completedItems.size() + ", failedItems: " + failedItems.size() + "}";
15+
}
1216
}

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.marklogic.hub.HubConfig;
1111
import com.marklogic.hub.HubDatabase;
1212
import com.marklogic.hub.flow.*;
13+
import com.marklogic.hub.job.JobManager;
1314
import com.marklogic.spring.batch.hub.FlowConfig;
1415
import com.marklogic.spring.batch.hub.RunHarmonizeFlowConfig;
1516
import com.marklogic.spring.batch.hub.StagingConfig;
@@ -23,6 +24,7 @@
2324

2425
import java.util.*;
2526
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicLong;
2628

2729
public class FlowRunnerImpl implements FlowRunner {
2830

@@ -39,9 +41,13 @@ public class FlowRunnerImpl implements FlowRunner {
3941
private List<FlowItemFailureListener> flowItemFailureListeners = new ArrayList<>();
4042
private List<FlowStatusListener> flowStatusListeners = new ArrayList<>();
4143
private List<FlowFinishedListener> flowFinishedListeners = new ArrayList<>();
44+
private List<BatchCompleteListener> batchCompleteListeners = new ArrayList<>();
4245

4346
private HubConfig hubConfig;
4447
private Thread runningThread = null;
48+
private boolean isFinished = false;
49+
private JobExecution result = null;
50+
4551

4652
public FlowRunnerImpl(HubConfig hubConfig) {
4753
this.hubConfig = hubConfig;
@@ -118,15 +124,21 @@ public void awaitCompletion() {
118124
@Override
119125
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
120126
runningThread.join(unit.convert(timeout, TimeUnit.MILLISECONDS));
127+
result.stop();
121128
}
122129

123130
@Override
124131
public JobExecution run() {
125-
JobExecution result = null;
132+
result = null;
126133
if (options == null) {
127134
options = new HashMap<>();
128135
}
129136
flow.setOptions(options);
137+
AtomicLong successfulEvents = new AtomicLong(0);
138+
AtomicLong failedEvents = new AtomicLong(0);
139+
AtomicLong successfulBatches = new AtomicLong(0);
140+
AtomicLong failedBatches = new AtomicLong(0);
141+
JobManager jobManager = new JobManager(hubConfig.newJobDbClient());
130142
try {
131143
DatabaseClient srcClient;
132144
if (sourceDatabase.equals(HubDatabase.STAGING)) {
@@ -146,6 +158,37 @@ public JobExecution run() {
146158

147159
ConfigurableApplicationContext ctx = buildApplicationContext(flow, srcClient);
148160

161+
flowItemCompleteListeners.add((jobId, itemId) -> {
162+
successfulEvents.addAndGet(1);
163+
});
164+
165+
flowItemFailureListeners.add((jobId, itemId) -> {
166+
failedEvents.addAndGet(1);
167+
});
168+
169+
flowFinishedListeners.add(() -> {
170+
// store the thing in MarkLogic
171+
com.marklogic.hub.job.Job job = com.marklogic.hub.job.Job.withFlow(flow)
172+
173+
.withJobId(Long.toString(result.getJobId()))
174+
.setCounts(successfulEvents.get(), failedEvents.get(), successfulBatches.get(), failedBatches.get())
175+
.withEndTime(new Date());
176+
jobManager.saveJob(job);
177+
isFinished = true;
178+
});
179+
180+
batchCompleteListeners.add(new BatchCompleteListener() {
181+
@Override
182+
public void onBatchFailed() {
183+
failedBatches.addAndGet(1);
184+
}
185+
186+
@Override
187+
public void onBatchSucceeded() {
188+
successfulBatches.addAndGet(1);
189+
}
190+
});
191+
149192
JobParameters params = buildJobParameters(flow, batchSize, threadCount, targetDatabase);
150193
JobLauncher launcher = ctx.getBean(JobLauncher.class);
151194
Job job = ctx.getBean(Job.class);
@@ -154,28 +197,21 @@ public JobExecution run() {
154197
e.printStackTrace();
155198
}
156199

157-
runningThread = new Thread(new Runnable() {
158-
private boolean isFinished = false;
159-
160-
@Override
161-
public void run() {
162-
flowFinishedListeners.add(() -> {
163-
isFinished = true;
164-
});
165-
166-
while(true) {
167-
if (isFinished) {
168-
break;
169-
}
170-
Thread.yield();
200+
runningThread = new Thread(() -> {
201+
while(true) {
202+
if (isFinished) {
203+
break;
171204
}
172-
205+
Thread.yield();
173206
}
174-
175207
});
176208

177209
runningThread.start();
178210

211+
jobManager.saveJob(com.marklogic.hub.job.Job.withFlow(flow)
212+
.withJobId(Long.toString(result.getJobId()))
213+
);
214+
179215
return result;
180216
}
181217

@@ -197,6 +233,7 @@ private ConfigurableApplicationContext buildApplicationContext(Flow flow, Databa
197233
});
198234

199235
ctx.getBeanFactory().registerSingleton("finishedListener", (FlowFinishedListener) () -> {
236+
System.out.println("FIRING finished listeners");
200237
flowFinishedListeners.forEach((FlowFinishedListener listener) -> {
201238
listener.onFlowFinished();
202239
});
@@ -213,6 +250,22 @@ private ConfigurableApplicationContext buildApplicationContext(Flow flow, Databa
213250
listener.processFailure(jobId, itemId);
214251
});
215252
});
253+
254+
ctx.getBeanFactory().registerSingleton("batchCompleteListener", new BatchCompleteListener() {
255+
@Override
256+
public void onBatchFailed() {
257+
batchCompleteListeners.forEach((BatchCompleteListener listener) -> {
258+
listener.onBatchFailed();
259+
});
260+
}
261+
262+
@Override
263+
public void onBatchSucceeded() {
264+
batchCompleteListeners.forEach((BatchCompleteListener listener) -> {
265+
listener.onBatchSucceeded();
266+
});
267+
}
268+
});
216269
ctx.refresh();
217270
return ctx;
218271
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.marklogic.hub.job;
2+
3+
import com.marklogic.client.pojo.annotation.Id;
4+
import com.marklogic.hub.flow.Flow;
5+
6+
import java.util.Date;
7+
8+
public class Job {
9+
private String jobId;
10+
private String flowType;
11+
private String flowName;
12+
private String entityName;
13+
private String jobName;
14+
private Date startTime;
15+
private Date endTime;
16+
private String jobOutput;
17+
18+
private long successfulEvents = 0;
19+
private long failedEvents = 0;
20+
private long successfulBatches = 0;
21+
private long failedBatches = 0;
22+
23+
private Job() {
24+
this.startTime = new Date();
25+
}
26+
27+
public Job withJobId(String jobId) {
28+
this.jobId = jobId;
29+
return this;
30+
}
31+
32+
public static Job withFlow(Flow flow) {
33+
Job job = new Job();
34+
job.flowType = flow.getType().toString();
35+
job.flowName = flow.getName();
36+
job.entityName = flow.getEntityName();
37+
return job;
38+
}
39+
40+
public Job withJobName(String jobName) {
41+
this.jobName = jobName;
42+
return this;
43+
}
44+
45+
public Job withJobOutput(String jobOutput) {
46+
this.jobOutput = jobOutput;
47+
return this;
48+
}
49+
50+
public Job withEndTime(Date endTime) {
51+
this.endTime = endTime;
52+
return this;
53+
}
54+
55+
public Job setCounts(long successfulEvents, long failedEvents, long successfulBatches, long failedBatches) {
56+
this.successfulEvents = successfulEvents;
57+
this.failedEvents = failedEvents;
58+
this.successfulBatches = successfulBatches;
59+
this.failedBatches = failedBatches;
60+
return this;
61+
}
62+
63+
@Id
64+
public String getJobId() {
65+
return jobId;
66+
}
67+
68+
public String getFlowType() {
69+
return flowType;
70+
}
71+
72+
public String getFlowName() {
73+
return flowName;
74+
}
75+
76+
public String getEntityName() {
77+
return entityName;
78+
}
79+
80+
public String getJobName() {
81+
return jobName;
82+
}
83+
84+
public Date getStartTime() {
85+
return this.startTime;
86+
}
87+
88+
public Date getEndTime() {
89+
return this.endTime;
90+
}
91+
92+
public JobStatus getStatus() {
93+
JobStatus status = JobStatus.STARTED;
94+
if (failedEvents > 0 && successfulEvents > 0 && endTime != null) {
95+
status = JobStatus.FINISHED_WITH_ERRORS;
96+
}
97+
else if (failedEvents == 0 && successfulEvents > 0 && endTime != null) {
98+
status = JobStatus.FINISHED;
99+
}
100+
else if (endTime != null) {
101+
status = JobStatus.FAILED;
102+
}
103+
return status;
104+
}
105+
106+
public String getJobOutput() {
107+
return jobOutput;
108+
}
109+
110+
public long getSuccessfulEvents() {
111+
return successfulEvents;
112+
}
113+
114+
public long getFailedEvents() {
115+
return failedEvents;
116+
}
117+
118+
public long getSuccessfulBatches() {
119+
return successfulBatches;
120+
}
121+
122+
public long getFailedBatches() {
123+
return failedBatches;
124+
}
125+
}

0 commit comments

Comments
 (0)