Skip to content

Commit f2dcd9e

Browse files
committed
Merge branch 'master' into 53-ReplaceDomainsWithEntities
Conflicts: quick-start/src/main/java/com/marklogic/hub/web/controller/api/FlowApiController.java quick-start/src/main/webapp/WEB-INF/static/top/flows.html quick-start/src/main/webapp/WEB-INF/static/top/topController.js
2 parents b303c9c + d5b10ac commit f2dcd9e

File tree

13 files changed

+477
-106
lines changed

13 files changed

+477
-106
lines changed

data-hub/src/main/java/com/marklogic/hub/FlowManager.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import org.springframework.batch.core.Job;
27+
import org.springframework.batch.core.JobExecution;
2728
import org.springframework.batch.core.JobExecutionListener;
2829
import org.springframework.batch.core.JobParameters;
2930
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@@ -186,8 +187,8 @@ public void uninstallFlow(String flowName) {
186187

187188
}
188189

189-
public void runFlow(Flow flow, int batchSize) {
190-
runFlow(flow, batchSize, null);
190+
public JobExecution runFlow(Flow flow, int batchSize) {
191+
return runFlow(flow, batchSize, null);
191192
}
192193

193194
// might want to add Job tracking support
@@ -198,8 +199,9 @@ public void runFlow(Flow flow, int batchSize) {
198199
* @param flow - the flow to run
199200
* @param batchSize - the size to use for batching transactions
200201
* @param listener - the JobExecutionListener to receive status updates about the job
202+
* @return
201203
*/
202-
public void runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
204+
public JobExecution runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
203205
Collector c = flow.getCollector();
204206
if (c instanceof ServerCollector) {
205207
((ServerCollector)c).setClient(client);
@@ -220,7 +222,7 @@ public void runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
220222
Job job = builder.build();
221223

222224
try {
223-
jobLauncher.run(job, new JobParameters());
225+
return jobLauncher.run(job, new JobParameters());
224226
}
225227
catch (Exception e) {
226228
throw new RuntimeException(e);
@@ -296,7 +298,7 @@ public void run(Flow flow, String identifier, Transaction transaction) {
296298
this.getServices().post(params, handle, transaction);
297299
}
298300
}
299-
public void testFlow(Flow flow) {
300-
301+
public JobExecution testFlow(Flow flow) {
302+
return null;
301303
}
302304
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.marklogic.hub.service;
2+
3+
import org.apache.http.concurrent.BasicFuture;
4+
5+
public interface CancellableTask {
6+
7+
void run(BasicFuture<?> resultFuture);
8+
9+
void cancel(BasicFuture<?> resultFuture);
10+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package com.marklogic.hub.service;
2+
3+
import java.math.BigInteger;
4+
import java.util.Collections;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
11+
import org.apache.http.concurrent.BasicFuture;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.springframework.stereotype.Service;
15+
16+
@Service
17+
public class TaskManagerService {
18+
19+
private final static Logger LOGGER = LoggerFactory.getLogger(TaskManagerService.class);
20+
21+
private ExecutorService executorService = Executors.newCachedThreadPool();
22+
23+
private BigInteger lastTaskId = BigInteger.ZERO;
24+
25+
private Map<BigInteger, TaskWrapper> taskMap = Collections.synchronizedMap(new HashMap<>());
26+
27+
public TaskManagerService() {
28+
}
29+
30+
public BigInteger addTask(CancellableTask task) {
31+
BigInteger taskId = fetchNextTaskId();
32+
33+
TaskWrapper taskRunner = new TaskWrapper(task);
34+
taskMap.put(taskId, taskRunner);
35+
36+
executorService.submit(taskRunner);
37+
38+
return taskId;
39+
}
40+
41+
public Object waitTask(BigInteger taskId) {
42+
TaskWrapper task = taskMap.get(taskId);
43+
if (task != null) {
44+
return task.awaitCompletion();
45+
}
46+
47+
return null;
48+
}
49+
50+
public void stopTask(BigInteger taskId) {
51+
TaskWrapper task = taskMap.get(taskId);
52+
if (task != null) {
53+
task.stopOrCancelTask();
54+
}
55+
}
56+
57+
public void removeTask(BigInteger taskId) {
58+
taskMap.remove(taskId);
59+
}
60+
61+
public boolean isTaskFinished(BigInteger taskId) {
62+
TaskWrapper task = taskMap.get(taskId);
63+
return task == null;
64+
}
65+
66+
protected synchronized BigInteger fetchNextTaskId() {
67+
this.lastTaskId = lastTaskId.add(BigInteger.ONE);
68+
return lastTaskId;
69+
}
70+
71+
private class TaskWrapper extends Thread {
72+
73+
private CancellableTask task;
74+
75+
private BasicFuture<Object> taskResult = new BasicFuture<>(null);
76+
77+
public TaskWrapper(CancellableTask task) {
78+
this.task = task;
79+
}
80+
81+
public void stopOrCancelTask() {
82+
task.cancel(taskResult);
83+
84+
// flag cancellation
85+
// this will notify anyone waiting for this task
86+
taskResult.cancel();
87+
}
88+
89+
public Object awaitCompletion() {
90+
try {
91+
return taskResult.get();
92+
} catch (InterruptedException e) {
93+
return null;
94+
} catch (ExecutionException e) {
95+
return null;
96+
}
97+
}
98+
99+
@Override
100+
public void run() {
101+
try {
102+
if (!taskResult.isDone()) {
103+
task.run(taskResult);
104+
}
105+
}
106+
catch (Exception e) {
107+
LOGGER.error("Task encountered an error.", e);
108+
109+
// the task has failed
110+
taskResult.failed(e);
111+
}
112+
}
113+
}
114+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.marklogic.hub.model;
2+
3+
public class TaskManagerModel {
4+
private String taskId;
5+
6+
public String getTaskId() {
7+
return taskId;
8+
}
9+
10+
public void setTaskId(String taskId) {
11+
this.taskId = taskId;
12+
}
13+
}

quick-start/src/main/java/com/marklogic/hub/service/FlowManagerService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
9+
import org.springframework.batch.core.JobExecution;
10+
import org.springframework.batch.core.JobExecutionListener;
911
import org.springframework.beans.factory.annotation.Autowired;
1012
import org.springframework.stereotype.Service;
1113

@@ -19,9 +21,8 @@
1921
import com.marklogic.hub.exception.FlowManagerException;
2022
import com.marklogic.hub.factory.FlowModelFactory;
2123
import com.marklogic.hub.flow.Flow;
22-
import com.marklogic.hub.model.FlowModel;
2324
import com.marklogic.hub.flow.FlowType;
24-
import com.marklogic.hub.util.FileUtil;
25+
import com.marklogic.hub.model.FlowModel;
2526

2627
@Service
2728
public class FlowManagerService {
@@ -65,14 +66,18 @@ public void uninstallFlow(String flowName) {
6566
flowManager.uninstallFlow(flowName);
6667
}
6768

68-
public void testFlow(Flow flow) {
69+
public JobExecution testFlow(Flow flow) {
6970
FlowManager flowManager = getFlowManager();
70-
flowManager.testFlow(flow);
71+
return flowManager.testFlow(flow);
7172
}
7273

73-
public void runFlow(Flow flow, int batchSize) {
74+
public JobExecution runFlow(Flow flow, int batchSize) {
75+
return runFlow(flow, batchSize, null);
76+
}
77+
78+
public JobExecution runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
7479
FlowManager flowManager = getFlowManager();
75-
flowManager.runFlow(flow, batchSize);
80+
return flowManager.runFlow(flow, batchSize, listener);
7681
}
7782

7883
public void runFlowsInParallel(Flow... flows) {

quick-start/src/main/java/com/marklogic/hub/web/controller/api/FlowApiController.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package com.marklogic.hub.web.controller.api;
22

33
import java.io.IOException;
4+
import java.math.BigInteger;
45
import java.util.ArrayList;
56
import java.util.List;
67

78
import javax.servlet.http.HttpServletRequest;
89
import javax.servlet.http.HttpSession;
910

11+
import org.apache.http.concurrent.BasicFuture;
12+
import org.apache.http.concurrent.Cancellable;
1013
import org.slf4j.Logger;
1114
import org.slf4j.LoggerFactory;
15+
import org.springframework.batch.core.JobExecution;
16+
import org.springframework.batch.core.JobExecutionListener;
1217
import org.springframework.beans.factory.annotation.Autowired;
1318
import org.springframework.http.MediaType;
1419
import org.springframework.validation.BindingResult;
@@ -26,7 +31,9 @@
2631
import com.marklogic.hub.model.EntityModel;
2732
import com.marklogic.hub.model.FlowModel;
2833
import com.marklogic.hub.model.RunFlowModel;
34+
import com.marklogic.hub.service.CancellableTask;
2935
import com.marklogic.hub.service.FlowManagerService;
36+
import com.marklogic.hub.service.TaskManagerService;
3037
import com.marklogic.hub.web.controller.BaseController;
3138
import com.marklogic.hub.web.form.FlowForm;
3239
import com.marklogic.hub.web.form.LoginForm;
@@ -44,6 +51,9 @@ public class FlowApiController extends BaseController {
4451
@Autowired
4552
private FlowManagerService flowManagerService;
4653

54+
@Autowired
55+
private TaskManagerService taskManagerService;
56+
4757
@RequestMapping(value = "/flow", method = RequestMethod.GET)
4858
@ResponseBody
4959
public Flow getFlow(HttpServletRequest request) {
@@ -101,24 +111,77 @@ public void uninstallFlow(HttpServletRequest request) {
101111
}
102112

103113
@RequestMapping(value = "/test", method = RequestMethod.POST)
104-
public void testFlow(HttpServletRequest request) {
114+
public BigInteger testFlow(HttpServletRequest request) {
115+
CancellableTask task = new CancellableTask() {
116+
117+
private JobExecution jobExecution;
118+
119+
@Override
120+
public void cancel(BasicFuture<?> resultFuture) {
121+
if (jobExecution != null) {
122+
jobExecution.stop();
123+
}
124+
}
125+
126+
@Override
127+
public void run(BasicFuture<?> resultFuture) {
105128
final String entityName = request.getParameter("entityName");
106129
final String flowName = request.getParameter("flowName");
107130
final Flow flow = flowManagerService.getFlow(entityName, flowName);
108-
flowManagerService.testFlow(flow);
131+
this.jobExecution = flowManagerService.testFlow(flow);
132+
}
133+
};
134+
135+
return taskManagerService.addTask(task);
109136
}
110137

111138
@RequestMapping(value = "/run", method = RequestMethod.POST)
112-
public void runFlow(@RequestBody RunFlowModel runFlow) {
139+
public BigInteger runFlow(@RequestBody RunFlowModel runFlow) {
140+
CancellableTask task = new CancellableTask() {
141+
142+
private JobExecution jobExecution;
143+
144+
@Override
145+
public void cancel(BasicFuture<?> resultFuture) {
146+
if (jobExecution != null) {
147+
jobExecution.stop();
148+
}
149+
}
150+
151+
@Override
152+
public void run(BasicFuture<?> resultFuture) {
113153
final Flow flow = flowManagerService.getFlow(runFlow.getEntityName(),
114154
runFlow.getFlowName());
115155
// TODO update and move BATCH SIZE TO a constant or config - confirm
116156
// desired behavior
117-
flowManagerService.runFlow(flow, 100);
157+
this.jobExecution = flowManagerService.runFlow(flow, 100, new JobExecutionListener() {
158+
159+
@Override
160+
public void beforeJob(JobExecution jobExecution) {
161+
}
162+
163+
@Override
164+
public void afterJob(JobExecution jobExecution) {
165+
resultFuture.completed(null);
166+
}
167+
});
168+
}
169+
};
170+
171+
return taskManagerService.addTask(task);
118172
}
119173

120174
@RequestMapping(value="/run/input", method = RequestMethod.POST)
121-
public void runInputFlow(@RequestBody RunFlowModel runFlow) {
175+
public BigInteger runInputFlow(@RequestBody RunFlowModel runFlow) {
176+
CancellableTask task = new CancellableTask() {
177+
178+
@Override
179+
public void cancel(BasicFuture<?> resultFuture) {
180+
// TODO: stop MLCP. We don't have a way to do this yet.
181+
}
182+
183+
@Override
184+
public void run(BasicFuture<?> resultFuture) {
122185
try {
123186
Mlcp mlcp = new Mlcp(
124187
environmentConfiguration.getMLHost()
@@ -132,13 +195,18 @@ public void runInputFlow(@RequestBody RunFlowModel runFlow) {
132195
FlowType.INPUT.toString());
133196
mlcp.addSourceDirectory(runFlow.getInputPath(), sourceOptions);
134197
mlcp.loadContent();
198+
199+
resultFuture.completed(null);
135200
}
136201
catch (IOException e) {
137202
LOGGER.error("Error encountered while trying to run flow: "
138203
+ runFlow.getEntityName() + " > " + runFlow.getFlowName(),
139204
e);
140205
}
141206
}
207+
};
208+
return taskManagerService.addTask(task);
209+
}
142210

143211
@RequestMapping(value = "/runInParallel", method = RequestMethod.POST)
144212
public void runFlowsInParallel(HttpServletRequest request) {

0 commit comments

Comments
 (0)