Skip to content

Commit e2c9861

Browse files
committed
changes to integrate TaskManagerService with SpringBatch-based implementation of FlowManager
1 parent a21c801 commit e2c9861

File tree

5 files changed

+95
-39
lines changed

5 files changed

+95
-39
lines changed

data-hub/src/main/java/com/marklogic/hub/FlowManager.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import org.springframework.batch.core.Job;
27+
import org.springframework.batch.core.JobExecution;
2728
import org.springframework.batch.core.JobExecutionListener;
2829
import org.springframework.batch.core.JobParameters;
2930
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -181,8 +182,8 @@ public void uninstallFlow(String flowName) {
181182

182183
}
183184

184-
public void runFlow(Flow flow, int batchSize) {
185-
runFlow(flow, batchSize, null);
185+
public JobExecution runFlow(Flow flow, int batchSize) {
186+
return runFlow(flow, batchSize, null);
186187
}
187188

188189
// might want to add Job tracking support
@@ -193,8 +194,9 @@ public void runFlow(Flow flow, int batchSize) {
193194
* @param flow - the flow to run
194195
* @param batchSize - the size to use for batching transactions
195196
* @param listener - the JobExecutionListener to receive status updates about the job
197+
* @return
196198
*/
197-
public void runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
199+
public JobExecution runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
198200
Collector c = flow.getCollector();
199201
if (c instanceof ServerCollector) {
200202
((ServerCollector)c).setClient(client);
@@ -214,7 +216,7 @@ public void runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
214216
Job job = builder.build();
215217

216218
try {
217-
jobLauncher.run(job, new JobParameters());
219+
return jobLauncher.run(job, new JobParameters());
218220
}
219221
catch (Exception e) {
220222
throw new RuntimeException(e);
@@ -286,7 +288,7 @@ public void run(Flow flow, String identifier, Transaction transaction) {
286288
this.getServices().post(params, handle, transaction);
287289
}
288290
}
289-
public void testFlow(Flow flow) {
290-
291+
public JobExecution testFlow(Flow flow) {
292+
return null;
291293
}
292294
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.marklogic.hub.service;
2+
3+
import org.apache.http.concurrent.BasicFuture;
4+
5+
public interface CancellableTask {
6+
7+
void run(BasicFuture<?> resultFuture);
8+
9+
void cancel(BasicFuture<?> resultFuture);
10+
}

quick-start/src/main/java/com/marklogic/hub/service/TaskManagerService.java renamed to data-hub/src/main/java/com/marklogic/hub/service/TaskManagerService.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,24 @@ public class TaskManagerService {
2222

2323
private BigInteger lastTaskId = BigInteger.ZERO;
2424

25-
private Map<BigInteger, Task> taskMap = Collections.synchronizedMap(new HashMap<>());
25+
private Map<BigInteger, TaskWrapper> taskMap = Collections.synchronizedMap(new HashMap<>());
2626

2727
public TaskManagerService() {
2828
}
2929

30-
public BigInteger addTask(Runnable runnable) {
30+
public BigInteger addTask(CancellableTask task) {
3131
BigInteger taskId = fetchNextTaskId();
3232

33-
Task task = new Task(runnable);
34-
taskMap.put(taskId, task);
33+
TaskWrapper taskRunner = new TaskWrapper(task);
34+
taskMap.put(taskId, taskRunner);
3535

36-
executorService.submit(task);
36+
executorService.submit(taskRunner);
3737

3838
return taskId;
3939
}
4040

4141
public Object waitTask(BigInteger taskId) {
42-
Task task = taskMap.get(taskId);
42+
TaskWrapper task = taskMap.get(taskId);
4343
if (task != null) {
4444
return task.awaitCompletion();
4545
}
@@ -48,7 +48,7 @@ public Object waitTask(BigInteger taskId) {
4848
}
4949

5050
public void stopTask(BigInteger taskId) {
51-
Task task = taskMap.get(taskId);
51+
TaskWrapper task = taskMap.get(taskId);
5252
if (task != null) {
5353
task.stopOrCancelTask();
5454
}
@@ -59,7 +59,7 @@ public void removeTask(BigInteger taskId) {
5959
}
6060

6161
public boolean isTaskFinished(BigInteger taskId) {
62-
Task task = taskMap.get(taskId);
62+
TaskWrapper task = taskMap.get(taskId);
6363
return task == null;
6464
}
6565

@@ -68,23 +68,22 @@ protected synchronized BigInteger fetchNextTaskId() {
6868
return lastTaskId;
6969
}
7070

71-
private class Task extends Thread {
71+
private class TaskWrapper extends Thread {
7272

73-
private Runnable runnable;
73+
private CancellableTask task;
7474

7575
private BasicFuture<Object> taskResult = new BasicFuture<>(null);
7676

77-
public Task(Runnable runnable) {
78-
this.runnable = runnable;
77+
public TaskWrapper(CancellableTask task) {
78+
this.task = task;
7979
}
8080

8181
public void stopOrCancelTask() {
82+
task.cancel(taskResult);
83+
8284
// flag cancellation
8385
// this will notify anyone waiting for this task
8486
taskResult.cancel();
85-
86-
// interrupt this thread to stop further execution
87-
this.interrupt();
8887
}
8988

9089
public Object awaitCompletion() {
@@ -101,10 +100,7 @@ public Object awaitCompletion() {
101100
public void run() {
102101
try {
103102
if (!taskResult.isDone()) {
104-
runnable.run();
105-
106-
// the task has completed
107-
taskResult.completed(null);
103+
task.run(taskResult);
108104
}
109105
}
110106
catch (Exception e) {

quick-start/src/main/java/com/marklogic/hub/service/FlowManagerService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
9+
import org.springframework.batch.core.JobExecution;
10+
import org.springframework.batch.core.JobExecutionListener;
911
import org.springframework.beans.factory.annotation.Autowired;
1012
import org.springframework.stereotype.Service;
1113

@@ -19,9 +21,8 @@
1921
import com.marklogic.hub.exception.FlowManagerException;
2022
import com.marklogic.hub.factory.FlowModelFactory;
2123
import com.marklogic.hub.flow.Flow;
22-
import com.marklogic.hub.model.FlowModel;
2324
import com.marklogic.hub.flow.FlowType;
24-
import com.marklogic.hub.util.FileUtil;
25+
import com.marklogic.hub.model.FlowModel;
2526

2627
@Service
2728
public class FlowManagerService {
@@ -65,14 +66,18 @@ public void uninstallFlow(String flowName) {
6566
flowManager.uninstallFlow(flowName);
6667
}
6768

68-
public void testFlow(Flow flow) {
69+
public JobExecution testFlow(Flow flow) {
6970
FlowManager flowManager = getFlowManager();
70-
flowManager.testFlow(flow);
71+
return flowManager.testFlow(flow);
7172
}
7273

73-
public void runFlow(Flow flow, int batchSize) {
74+
public JobExecution runFlow(Flow flow, int batchSize) {
75+
return runFlow(flow, batchSize, null);
76+
}
77+
78+
public JobExecution runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
7479
FlowManager flowManager = getFlowManager();
75-
flowManager.runFlow(flow, batchSize);
80+
return flowManager.runFlow(flow, batchSize, listener);
7681
}
7782

7883
public void runFlowsInParallel(Flow... flows) {

quick-start/src/main/java/com/marklogic/hub/web/controller/api/FlowApiController.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import javax.servlet.http.HttpServletRequest;
99
import javax.servlet.http.HttpSession;
1010

11+
import org.apache.http.concurrent.BasicFuture;
12+
import org.apache.http.concurrent.Cancellable;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
15+
import org.springframework.batch.core.JobExecution;
16+
import org.springframework.batch.core.JobExecutionListener;
1317
import org.springframework.beans.factory.annotation.Autowired;
1418
import org.springframework.http.MediaType;
1519
import org.springframework.validation.BindingResult;
@@ -27,6 +31,7 @@
2731
import com.marklogic.hub.model.DomainModel;
2832
import com.marklogic.hub.model.FlowModel;
2933
import com.marklogic.hub.model.RunFlowModel;
34+
import com.marklogic.hub.service.CancellableTask;
3035
import com.marklogic.hub.service.FlowManagerService;
3136
import com.marklogic.hub.service.TaskManagerService;
3237
import com.marklogic.hub.web.controller.BaseController;
@@ -107,14 +112,23 @@ public void uninstallFlow(HttpServletRequest request) {
107112

108113
@RequestMapping(value = "/test", method = RequestMethod.POST)
109114
public BigInteger testFlow(HttpServletRequest request) {
110-
Runnable task = new Runnable() {
115+
CancellableTask task = new CancellableTask() {
116+
117+
private JobExecution jobExecution;
118+
119+
@Override
120+
public void cancel(BasicFuture<?> resultFuture) {
121+
if (jobExecution != null) {
122+
jobExecution.stop();
123+
}
124+
}
111125

112126
@Override
113-
public void run() {
127+
public void run(BasicFuture<?> resultFuture) {
114128
final String domainName = request.getParameter("domainName");
115129
final String flowName = request.getParameter("flowName");
116130
final Flow flow = flowManagerService.getFlow(domainName, flowName);
117-
flowManagerService.testFlow(flow);
131+
this.jobExecution = flowManagerService.testFlow(flow);
118132
}
119133
};
120134

@@ -123,13 +137,33 @@ public void run() {
123137

124138
@RequestMapping(value = "/run", method = RequestMethod.POST)
125139
public BigInteger runFlow(@RequestBody RunFlowModel runFlow) {
126-
Runnable task = new Runnable() {
140+
CancellableTask task = new CancellableTask() {
141+
142+
private JobExecution jobExecution;
143+
144+
@Override
145+
public void cancel(BasicFuture<?> resultFuture) {
146+
if (jobExecution != null) {
147+
jobExecution.stop();
148+
}
149+
}
150+
127151
@Override
128-
public void run() {
152+
public void run(BasicFuture<?> resultFuture) {
129153
final Flow flow = flowManagerService.getFlow(runFlow.getDomainName(), runFlow.getFlowName());
130154
// TODO update and move BATCH SIZE TO a constant or config - confirm
131155
// desired behavior
132-
flowManagerService.runFlow(flow, 100);
156+
this.jobExecution = flowManagerService.runFlow(flow, 100, new JobExecutionListener() {
157+
158+
@Override
159+
public void beforeJob(JobExecution jobExecution) {
160+
}
161+
162+
@Override
163+
public void afterJob(JobExecution jobExecution) {
164+
resultFuture.completed(null);
165+
}
166+
});
133167
}
134168
};
135169

@@ -138,8 +172,15 @@ public void run() {
138172

139173
@RequestMapping(value="/run/input", method = RequestMethod.POST)
140174
public BigInteger runInputFlow(@RequestBody RunFlowModel runFlow) {
141-
Runnable task = new Runnable() {
142-
public void run() {
175+
CancellableTask task = new CancellableTask() {
176+
177+
@Override
178+
public void cancel(BasicFuture<?> resultFuture) {
179+
// TODO: stop MLCP. We don't have a way to do this yet.
180+
}
181+
182+
@Override
183+
public void run(BasicFuture<?> resultFuture) {
143184
try {
144185
Mlcp mlcp = new Mlcp(
145186
environmentConfiguration.getMLHost()
@@ -151,6 +192,8 @@ public void run() {
151192
SourceOptions sourceOptions = new SourceOptions(runFlow.getDomainName(), runFlow.getFlowName(), FlowType.INPUT.toString());
152193
mlcp.addSourceDirectory(runFlow.getInputPath(), sourceOptions);
153194
mlcp.loadContent();
195+
196+
resultFuture.completed(null);
154197
}
155198
catch (IOException e) {
156199
LOGGER.error("Error encountered while trying to run flow: " + runFlow.getDomainName() + " > " + runFlow.getFlowName(), e);

0 commit comments

Comments
 (0)