Skip to content

Commit e1797d7

Browse files
committed
Merge pull request #119 from maeisabelle/67-InputFlowReportFailure
67 input flow report failure
2 parents a95b17a + 6d26deb commit e1797d7

File tree

8 files changed

+168
-57
lines changed

8 files changed

+168
-57
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/DataHubContentPump.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
package com.marklogic.hub;
22

3+
import java.io.BufferedReader;
4+
import java.io.File;
5+
import java.io.FileInputStream;
6+
import java.io.FileNotFoundException;
7+
import java.io.IOException;
8+
import java.io.InputStreamReader;
9+
import java.io.PrintStream;
310
import java.util.List;
11+
import java.util.regex.Matcher;
12+
import java.util.regex.Pattern;
13+
import java.util.regex.PatternSyntaxException;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
417

518
import com.marklogic.contentpump.ContentPump;
619
import com.marklogic.contentpump.utilities.OptionsFileUtil;
720

821
public class DataHubContentPump extends ContentPump {
22+
private final static Logger LOGGER = LoggerFactory.getLogger(DataHubContentPump.class);
923

1024
private String[] arguments;
1125

@@ -21,19 +35,80 @@ public DataHubContentPump(String[] arguments) {
2135
* Run the Content Pump.
2236
*
2337
* @return true if the content pump executed successfully, false otherwise.
38+
* @throws IOException
2439
*/
25-
public boolean execute() {
40+
public void execute() throws IOException {
2641
String[] expandedArgs = null;
27-
int rc = 1;
42+
43+
PrintStream sysout = System.out;
44+
PrintStream mlcpOutputStream = null;
45+
BufferedReader mlcpBufferedReader = null;
46+
File mlcpOutputFile = null;
2847
try {
48+
// redirect standard output
49+
mlcpOutputFile = File.createTempFile("mlcp", ".txt");
50+
mlcpOutputStream = new PrintStream(mlcpOutputFile);
51+
System.setOut(mlcpOutputStream);
52+
53+
// run mlcp
2954
expandedArgs = OptionsFileUtil.expandArguments(arguments);
30-
rc = runCommand(expandedArgs);
55+
runCommand(expandedArgs);
3156
} catch (Exception ex) {
3257
LOG.error("Error while expanding arguments", ex);
3358
System.err.println(ex.getMessage());
3459
System.err.println("Try 'mlcp help' for usage.");
60+
} finally {
61+
// close the mlcp output stream
62+
if (mlcpOutputStream != null) {
63+
mlcpOutputStream.close();
64+
}
65+
66+
// revert to the original standard output
67+
System.setOut(sysout);
3568
}
69+
70+
// read the mlcp output and get any error message
71+
StringBuilder errorMessage = new StringBuilder();
72+
try {
73+
String regex = "([^\\s]*) \\s (\\[ [^ \\] ]*? \\]) \\s ([^\\s]*) \\s ([^\\s]*) \\s - \\s (.*)";
74+
Pattern pattern = Pattern.compile(regex, Pattern.COMMENTS);
3675

37-
return rc == 0;
76+
mlcpBufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(mlcpOutputFile)));
77+
String line = null;
78+
while ((line = mlcpBufferedReader.readLine()) != null) {
79+
Matcher matcher = pattern.matcher(line);
80+
if (matcher.matches()) {
81+
String logLevel = matcher.groupCount() >= 3 ? matcher.group(3) : "";
82+
String message = matcher.groupCount() >= 5 ? matcher.group(5) : "";
83+
if (logLevel.toLowerCase().equals("error")) {
84+
if (errorMessage.length() > 0) {
85+
errorMessage.append("\r\n");
86+
}
87+
errorMessage.append(message);
88+
}
89+
}
90+
}
91+
} catch (PatternSyntaxException e) {
92+
LOGGER.error("Unexpected error", e);
93+
} catch (FileNotFoundException e) {
94+
LOGGER.error("Unexpected error", e);
95+
} catch (IOException e) {
96+
LOGGER.error("Unexpected error", e);
97+
} finally {
98+
if (mlcpBufferedReader != null) {
99+
try {
100+
mlcpBufferedReader.close();
101+
} catch (IOException e) {
102+
// intentionally empty
103+
}
104+
}
105+
106+
// delete the temporary file
107+
mlcpOutputFile.delete();
108+
}
109+
110+
if (errorMessage.length() > 0) {
111+
throw new IOException("Load data failed with:\r\n" + errorMessage.toString());
112+
}
38113
}
39114
}

marklogic-data-hub/src/main/java/com/marklogic/hub/Mlcp.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,11 @@ public void addSourceDirectory(String directoryPath, SourceOptions options) {
5252
sources.add(source);
5353
}
5454

55-
public void loadContent() {
55+
public void loadContent() throws IOException {
5656
for (MlcpSource source : sources) {
57-
Thread inputThread = null;
58-
Thread errorThread = null;
5957
try {
6058
List<String> arguments = new ArrayList<>();
6159

62-
// arguments.add(mlcpPath);
6360
arguments.add("import");
6461
arguments.add("-mode");
6562
arguments.add("local");
@@ -78,17 +75,8 @@ public void loadContent() {
7875

7976
DataHubContentPump contentPump = new DataHubContentPump(arguments);
8077
contentPump.execute();
81-
}
82-
catch (Exception e) {
83-
LOGGER.error("Failed to load {}", source.getSourcePath(), e);
84-
}
85-
finally {
86-
if (inputThread != null) {
87-
inputThread.interrupt();
88-
}
89-
if (errorThread != null) {
90-
errorThread.interrupt();
91-
}
78+
} catch (IOException e) {
79+
throw new IOException("Cannot load data from: " + source.getSourcePath() + " due to: " + e.getMessage());
9280
}
9381
}
9482
}

marklogic-data-hub/src/main/java/com/marklogic/hub/service/TaskManagerService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public BigInteger addTask(CancellableTask task) {
3838
return taskId;
3939
}
4040

41-
public Object waitTask(BigInteger taskId) {
41+
public Object waitTask(BigInteger taskId) throws Exception {
4242
TaskWrapper task = taskMap.get(taskId);
4343
if (task != null) {
4444
return task.awaitCompletion();
@@ -86,13 +86,14 @@ public void stopOrCancelTask() {
8686
taskResult.cancel();
8787
}
8888

89-
public Object awaitCompletion() {
89+
public Object awaitCompletion() throws Exception {
9090
try {
9191
return taskResult.get();
92-
} catch (InterruptedException e) {
93-
return null;
92+
}
93+
catch (InterruptedException e) {
94+
throw new Exception(e.getMessage(), e);
9495
} catch (ExecutionException e) {
95-
return null;
96+
throw new Exception(e.getMessage(), e);
9697
}
9798
}
9899

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/CollectorReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.marklogic.spring.batch.hub;
22

3+
import java.util.ArrayList;
34
import java.util.List;
45

56
import org.springframework.batch.item.ExecutionContext;
@@ -26,7 +27,12 @@ public CollectorReader(Collector collector) {
2627

2728
@Override
2829
public void open(ExecutionContext executionContext) throws ItemStreamException {
29-
this.results = collector.run();
30+
if (collector != null) {
31+
this.results = collector.run();
32+
}
33+
else {
34+
this.results = new ArrayList<>();
35+
}
3036
}
3137

3238
@Override

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/FlowWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public void write(List<? extends String> items) throws Exception {
4646
if (transaction != null) {
4747
transaction.rollback();
4848
}
49+
throw e;
4950
}
5051
}
5152
}

quick-start/src/main/java/com/marklogic/hub/web/controller/api/FlowApiController.java

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.http.concurrent.BasicFuture;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
14+
import org.springframework.batch.core.ExitStatus;
1415
import org.springframework.batch.core.JobExecution;
1516
import org.springframework.batch.core.JobExecutionListener;
1617
import org.springframework.beans.factory.annotation.Autowired;
@@ -124,11 +125,11 @@ public void cancel(BasicFuture<?> resultFuture) {
124125

125126
@Override
126127
public void run(BasicFuture<?> resultFuture) {
127-
final String entityName = request.getParameter("entityName");
128-
final String flowName = request.getParameter("flowName");
129-
final Flow flow = flowManagerService.getFlow(entityName, flowName);
128+
final String entityName = request.getParameter("entityName");
129+
final String flowName = request.getParameter("flowName");
130+
final Flow flow = flowManagerService.getFlow(entityName, flowName);
130131
this.jobExecution = flowManagerService.testFlow(flow);
131-
}
132+
}
132133
};
133134

134135
return taskManagerService.addTask(task);
@@ -149,19 +150,36 @@ public void cancel(BasicFuture<?> resultFuture) {
149150

150151
@Override
151152
public void run(BasicFuture<?> resultFuture) {
152-
final Flow flow = flowManagerService.getFlow(runFlow.getEntityName(),
153-
runFlow.getFlowName());
154-
// TODO update and move BATCH SIZE TO a constant or config - confirm
155-
// desired behavior
153+
final Flow flow = flowManagerService.getFlow(runFlow.getEntityName(), runFlow.getFlowName());
154+
// TODO update and move BATCH SIZE TO a constant or config - confirm
155+
// desired behavior
156156
this.jobExecution = flowManagerService.runFlow(flow, 100, new JobExecutionListener() {
157157

158158
@Override
159159
public void beforeJob(JobExecution jobExecution) {
160-
}
160+
}
161161

162162
@Override
163163
public void afterJob(JobExecution jobExecution) {
164-
resultFuture.completed(null);
164+
ExitStatus status = jobExecution.getExitStatus();
165+
if (ExitStatus.FAILED.getExitCode().equals(status.getExitCode())) {
166+
List<Throwable> errors = jobExecution.getAllFailureExceptions();
167+
if (errors.size() > 0) {
168+
Throwable throwable = errors.get(0);
169+
if (Exception.class.isInstance(throwable)) {
170+
resultFuture.failed((Exception) throwable);
171+
}
172+
else {
173+
resultFuture.failed(new Exception(errors.get(0)));
174+
}
175+
}
176+
else {
177+
resultFuture.failed(null);
178+
}
179+
}
180+
else {
181+
resultFuture.completed(null);
182+
}
165183
}
166184
});
167185
}
@@ -181,30 +199,31 @@ public void cancel(BasicFuture<?> resultFuture) {
181199

182200
@Override
183201
public void run(BasicFuture<?> resultFuture) {
184-
try {
185-
Mlcp mlcp = new Mlcp(
202+
try {
203+
Mlcp mlcp = new Mlcp(
186204
environmentConfiguration.getMLHost()
187205
,Integer.parseInt(environmentConfiguration.getMLStagingRestPort())
188206
,environmentConfiguration.getMLUsername()
189207
,environmentConfiguration.getMLPassword()
190-
);
208+
);
191209

192-
SourceOptions sourceOptions = new SourceOptions(
193-
runFlow.getEntityName(), runFlow.getFlowName(),
194-
FlowType.INPUT.toString());
195-
sourceOptions.setInputFileType(runFlow.getDataFormat());
196-
sourceOptions.setCollection(runFlow.getCollection());
197-
mlcp.addSourceDirectory(runFlow.getInputPath(), sourceOptions);
198-
mlcp.loadContent();
210+
SourceOptions sourceOptions = new SourceOptions(
211+
runFlow.getEntityName(), runFlow.getFlowName(),
212+
FlowType.INPUT.toString());
213+
sourceOptions.setInputFileType(runFlow.getDataFormat());
214+
sourceOptions.setCollection(runFlow.getCollection());
215+
mlcp.addSourceDirectory(runFlow.getInputPath(), sourceOptions);
216+
mlcp.loadContent();
199217

200218
resultFuture.completed(null);
201-
}
202-
catch (IOException e) {
203-
LOGGER.error("Error encountered while trying to run flow: "
204-
+ runFlow.getEntityName() + " > " + runFlow.getFlowName(),
205-
e);
206-
}
207-
}
219+
}
220+
catch (IOException e) {
221+
LOGGER.error("Error encountered while trying to run flow: "
222+
+ runFlow.getEntityName() + " > " + runFlow.getFlowName(),
223+
e);
224+
resultFuture.failed(e);
225+
}
226+
}
208227
};
209228
return taskManagerService.addTask(task);
210229
}

quick-start/src/main/java/com/marklogic/hub/web/controller/api/TaskManagerApiController.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.marklogic.hub.web.controller.api;
22

33
import java.math.BigInteger;
4+
import java.util.HashMap;
5+
import java.util.Map;
46

57
import javax.servlet.http.HttpServletRequest;
68
import javax.servlet.http.HttpServletResponse;
@@ -25,15 +27,24 @@ public class TaskManagerApiController {
2527
public Object waitTask(HttpServletRequest request, HttpServletResponse response) {
2628
String taskIdStr = request.getParameter("taskId");
2729

30+
Map<String, Object> resultMap = new HashMap<>();
31+
resultMap.put("taskId", taskIdStr);
32+
resultMap.put("success", false);
33+
2834
try {
2935
BigInteger taskId = new BigInteger(taskIdStr);
30-
return taskManagerService.waitTask(taskId);
36+
Object result = taskManagerService.waitTask(taskId);
37+
38+
resultMap.put("result", result);
39+
resultMap.put("success", true);
3140
}
3241
catch (NumberFormatException e) {
33-
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
42+
resultMap.put("errorMessage", "Invalid task id");
43+
} catch (Exception e) {
44+
resultMap.put("errorMessage", e.getMessage());
3445
}
3546

36-
return null;
47+
return resultMap;
3748
}
3849

3950
@RequestMapping(value="/stop", method = RequestMethod.POST)

quick-start/src/main/resources/static/top/topController.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@
6565
TaskManager.waitForTask(flow.inputFlowTaskId)
6666
.success(function (result) {
6767
if (!flow.inputFlowCancelled) {
68-
DataHub.displayMessage('Load data successful.', 'success', 'notification', false);
68+
if (result.success) {
69+
DataHub.displayMessage('Load data successful.', 'success', 'notification', false);
70+
}
71+
else {
72+
DataHub.displayMessage(result.errorMessage, 'error', 'notification', false);
73+
}
6974
}
7075
})
7176
.error(function () {
@@ -101,7 +106,12 @@
101106
TaskManager.waitForTask(flow.runFlowTaskId)
102107
.success(function (result) {
103108
if (!flow.runFlowCancelled) {
104-
DataHub.displayMessage('Flow run is successful.', 'success', 'notification', false);
109+
if (result.success) {
110+
DataHub.displayMessage('Flow run is successful.', 'success', 'notification', false);
111+
}
112+
else {
113+
DataHub.displayMessage(result.errorMessage, 'error', 'notification', false);
114+
}
105115
}
106116
})
107117
.error(function () {

0 commit comments

Comments
 (0)