Skip to content

Commit 803e7c5

Browse files
committed
fixed #466
1 parent c1b2fd4 commit 803e7c5

File tree

4 files changed

+207
-78
lines changed

4 files changed

+207
-78
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.marklogic.hub.util;
2+
3+
import com.marklogic.hub.flow.FlowStatusListener;
4+
5+
import java.util.List;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
import java.util.function.Consumer;
8+
import java.util.regex.Matcher;
9+
import java.util.regex.Pattern;
10+
11+
public class MlcpConsumer implements Consumer<String> {
12+
private int currentPc = 0;
13+
private final Pattern completedPattern = Pattern.compile("^.+completed (\\d+)%$");
14+
private final Pattern successfulEventsPattern = Pattern.compile("^.+OUTPUT_RECORDS_COMMITTED:\\s+(\\d+).*$");
15+
private final Pattern failedEventsPattern = Pattern.compile("^.+OUTPUT_RECORDS_FAILED\\s+(\\d+).*$");
16+
private AtomicLong successfulEvents;
17+
private AtomicLong failedEvents;
18+
private FlowStatusListener statusListener;
19+
private String jobId;
20+
21+
public MlcpConsumer(AtomicLong successfulEvents, AtomicLong failedEvents, FlowStatusListener statusListener,
22+
String jobId)
23+
{
24+
this.successfulEvents = successfulEvents;
25+
this.failedEvents = failedEvents;
26+
this.statusListener = statusListener;
27+
this.jobId = jobId;
28+
}
29+
30+
@Override
31+
public void accept(String status) {
32+
Matcher m = completedPattern.matcher(status);
33+
if (m.matches()) {
34+
int pc = Integer.parseInt(m.group(1));
35+
36+
// don't send 100% because more stuff happens after 100% is reported here
37+
if (pc > currentPc && pc != 100) {
38+
currentPc = pc;
39+
}
40+
}
41+
42+
m = successfulEventsPattern.matcher(status);
43+
if (m.matches()) {
44+
successfulEvents.addAndGet(Long.parseLong(m.group(1)));
45+
}
46+
47+
m = failedEventsPattern.matcher(status);
48+
if (m.matches()) {
49+
failedEvents.addAndGet(Long.parseLong(m.group(1)));
50+
}
51+
52+
statusListener.onStatusChange(jobId, currentPc, status);
53+
}
54+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2012-2016 MarkLogic Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.marklogic.hub.util;
17+
18+
import com.marklogic.hub.HubConfig;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.function.Consumer;
26+
27+
public class ProcessRunner extends Thread {
28+
29+
private final Logger logger = LoggerFactory.getLogger(getClass());
30+
31+
private HubConfig hubConfig;
32+
private ArrayList<String> processOutput = new ArrayList<>();
33+
34+
private List<String> args;
35+
private Consumer<String> consumer;
36+
37+
public HubConfig getHubConfig() {
38+
return hubConfig;
39+
}
40+
41+
public String getProcessOutput() {
42+
return String.join("\n", processOutput);
43+
}
44+
45+
public static ProcessRunner newRunner() {
46+
return new ProcessRunner();
47+
}
48+
49+
public ProcessRunner() {
50+
super();
51+
}
52+
53+
public ProcessRunner withArgs(List<String> args) {
54+
this.args = args;
55+
return this;
56+
}
57+
58+
public ProcessRunner withHubconfig(HubConfig hubConfig) {
59+
this.hubConfig = hubConfig;
60+
return this;
61+
}
62+
63+
public ProcessRunner withStreamConsumer(Consumer<String> consumer) {
64+
this.consumer = consumer;
65+
return this;
66+
}
67+
68+
@Override
69+
public void run() {
70+
try {
71+
runMlcp();
72+
} catch (Exception e) {
73+
throw new RuntimeException(e);
74+
}
75+
}
76+
77+
private void runMlcp() throws IOException, InterruptedException {
78+
79+
logger.debug(String.join(" ", args));
80+
ProcessBuilder pb = new ProcessBuilder(args);
81+
Process process = pb.start();
82+
83+
StreamGobbler gobbler = new StreamGobbler(process.getInputStream(), status -> {
84+
synchronized (processOutput) {
85+
processOutput.add(status);
86+
}
87+
88+
consumer.accept(status);
89+
});
90+
gobbler.start();
91+
process.waitFor();
92+
gobbler.join();
93+
}
94+
}

quick-start/src/main/java/com/marklogic/quickstart/util/StreamGobbler.java renamed to marklogic-data-hub/src/main/java/com/marklogic/hub/util/StreamGobbler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.marklogic.quickstart.util;
16+
package com.marklogic.hub.util;
1717

1818
import java.io.BufferedReader;
1919
import java.io.InputStream;

quick-start/src/main/java/com/marklogic/quickstart/service/MlcpRunner.java

Lines changed: 58 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import com.marklogic.hub.flow.FlowStatusListener;
2424
import com.marklogic.hub.job.Job;
2525
import com.marklogic.hub.job.JobManager;
26-
import com.marklogic.quickstart.util.StreamGobbler;
26+
import com.marklogic.hub.job.JobStatus;
27+
import com.marklogic.hub.util.MlcpConsumer;
28+
import com.marklogic.hub.util.ProcessRunner;
2729
import org.apache.commons.io.FileUtils;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3030

3131
import java.io.File;
3232
import java.io.IOException;
@@ -35,78 +35,84 @@
3535
import java.util.Date;
3636
import java.util.UUID;
3737
import java.util.concurrent.atomic.AtomicLong;
38-
import java.util.function.Consumer;
39-
import java.util.regex.Matcher;
40-
import java.util.regex.Pattern;
4138

42-
class MlcpRunner extends Thread {
39+
class MlcpRunner extends ProcessRunner {
4340

44-
private final Logger logger = LoggerFactory.getLogger(getClass());
45-
46-
private HubConfig hubConfig;
47-
private JsonNode mlcpOptions;
48-
private FlowStatusListener statusListener;
49-
private ArrayList<String> mlcpOutput = new ArrayList<>();
50-
private String jobId = UUID.randomUUID().toString();
5141
private JobManager jobManager;
5242
private Flow flow;
43+
private JsonNode mlcpOptions;
44+
private String jobId = UUID.randomUUID().toString();
5345
private AtomicLong successfulEvents = new AtomicLong(0);
5446
private AtomicLong failedEvents = new AtomicLong(0);
47+
FlowStatusListener flowStatusListener;
5548

5649
MlcpRunner(HubConfig hubConfig, Flow flow, JsonNode mlcpOptions, FlowStatusListener statusListener) {
5750
super();
5851

59-
this.hubConfig = hubConfig;
60-
this.mlcpOptions = mlcpOptions;
61-
this.statusListener = statusListener;
62-
this.jobManager = new JobManager(this.hubConfig.newJobDbClient());
52+
this.withHubconfig(hubConfig);
53+
54+
this.jobManager = new JobManager(hubConfig.newJobDbClient());
55+
this.flowStatusListener = statusListener;
6356
this.flow = flow;
57+
this.mlcpOptions = mlcpOptions;
6458
}
6559

6660
@Override
6761
public void run() {
62+
HubConfig hubConfig = getHubConfig();
63+
64+
Job job = Job.withFlow(flow)
65+
.withJobId(jobId);
66+
jobManager.saveJob(job);
67+
6868
try {
6969
MlcpBean bean = new ObjectMapper().readerFor(MlcpBean.class).readValue(mlcpOptions);
7070
bean.setHost(hubConfig.host);
7171
bean.setPort(hubConfig.stagingPort);
7272

73-
Job job = Job.withFlow(flow)
74-
.withJobId(jobId);
75-
jobManager.saveJob(job);
76-
7773
// Assume that the HTTP credentials will work for mlcp
7874
bean.setUsername(hubConfig.getUsername());
7975
bean.setPassword(hubConfig.getPassword());
8076

8177
File file = new File(mlcpOptions.get("input_file_path").asText());
8278
String canonicalPath = file.getCanonicalPath();
8379
bean.setInput_file_path(canonicalPath);
84-
8580
bean.setTransform_param("\"" + bean.getTransform_param().replaceAll("\"", "") + ",jobId=" + jobId + "\"");
8681

87-
runMlcp(bean);
82+
buildCommand(bean);
83+
84+
super.run();
8885

89-
statusListener.onStatusChange(jobId, 100, "");
86+
flowStatusListener.onStatusChange(jobId, 100, "");
87+
88+
} catch (Exception e) {
89+
job.withStatus(JobStatus.FAILED)
90+
.withEndTime(new Date());
91+
jobManager.saveJob(job);
92+
throw new RuntimeException(e);
93+
} finally {
94+
JobStatus status;
95+
if (failedEvents.get() > 0 && successfulEvents.get() > 0) {
96+
status = JobStatus.FINISHED_WITH_ERRORS;
97+
}
98+
else if (failedEvents.get() == 0 && successfulEvents.get() > 0) {
99+
status = JobStatus.FINISHED;
100+
}
101+
else {
102+
status = JobStatus.FAILED;
103+
}
90104

91105
// store the thing in MarkLogic
92-
job.withJobOutput(String.join("\n", mlcpOutput))
106+
job.withJobOutput(getProcessOutput())
107+
.withStatus(status)
93108
.setCounts(successfulEvents.get(), failedEvents.get(), 0, 0)
94109
.withEndTime(new Date());
95110
jobManager.saveJob(job);
96-
} catch (Exception e) {
97-
throw new RuntimeException(e);
98111
}
99112
}
100113

101-
private void runMlcp(MlcpBean bean) throws IOException, InterruptedException {
102-
String javaHome = System.getProperty("java.home");
103-
String javaBin = javaHome +
104-
File.separator + "bin" +
105-
File.separator + "java";
106-
String classpath = System.getProperty("java.class.path");
107-
108-
File loggerFile = File.createTempFile("mlcp-", "-logger.xml");
109-
String loggerData = "<configuration>\n" +
114+
private String buildLoggerconfig() {
115+
return "<configuration>\n" +
110116
"\n" +
111117
" <appender name=\"STDOUT\" class=\"ch.qos.logback.core.ConsoleAppender\">\n" +
112118
" <!-- encoders are assigned the type\n" +
@@ -134,7 +140,17 @@ private void runMlcp(MlcpBean bean) throws IOException, InterruptedException {
134140
" <appender-ref ref=\"STDOUT\" />\n" +
135141
" </root>\n" +
136142
"</configuration>\n";
137-
FileUtils.writeStringToFile(loggerFile, loggerData);
143+
}
144+
145+
private void buildCommand(MlcpBean bean) throws IOException, InterruptedException {
146+
String javaHome = System.getProperty("java.home");
147+
String javaBin = javaHome +
148+
File.separator + "bin" +
149+
File.separator + "java";
150+
String classpath = System.getProperty("java.class.path");
151+
152+
File loggerFile = File.createTempFile("mlcp-", "-logger.xml");
153+
FileUtils.writeStringToFile(loggerFile, buildLoggerconfig());
138154

139155
ArrayList<String> args = new ArrayList<>();
140156
args.add(javaBin);
@@ -152,44 +168,9 @@ private void runMlcp(MlcpBean bean) throws IOException, InterruptedException {
152168
args.add("mlcp");
153169
args.addAll(Arrays.asList(bean.buildArgs()));
154170

155-
logger.debug(String.join(" ", args));
156-
ProcessBuilder pb = new ProcessBuilder(args);
157-
Process process = pb.start();
158-
159-
StreamGobbler gobbler = new StreamGobbler(process.getInputStream(), new Consumer<String>() {
160-
private int currentPc = 0;
161-
private final Pattern completedPattern = Pattern.compile("^.+completed (\\d+)%$");
162-
private final Pattern successfulEventsPattern = Pattern.compile("^.+OUTPUT_RECORDS_COMMITTED:\\s+(\\d+).*$");
163-
private final Pattern failedEventsPattern = Pattern.compile("^.+OUTPUT_RECORDS_FAILED\\s+(\\d+).*$");
164-
165-
@Override
166-
public void accept(String status) {
167-
Matcher m = completedPattern.matcher(status);
168-
if (m.matches()) {
169-
int pc = Integer.parseInt(m.group(1));
170-
171-
// don't send 100% because more stuff happens after 100% is reported here
172-
if (pc > currentPc && pc != 100) {
173-
currentPc = pc;
174-
}
175-
}
176-
177-
m = successfulEventsPattern.matcher(status);
178-
if (m.matches()) {
179-
successfulEvents.addAndGet(Long.parseLong(m.group(1)));
180-
}
181-
182-
m = failedEventsPattern.matcher(status);
183-
if (m.matches()) {
184-
failedEvents.addAndGet(Long.parseLong(m.group(1)));
185-
}
186-
187-
mlcpOutput.add(status);
188-
statusListener.onStatusChange(jobId, currentPc, status);
189-
}
190-
});
191-
gobbler.start();
192-
process.waitFor();
193-
gobbler.join();
171+
this.withArgs(args);
172+
173+
this.withStreamConsumer(new MlcpConsumer(successfulEvents,
174+
failedEvents, flowStatusListener, jobId));
194175
}
195176
}

0 commit comments

Comments
 (0)