Skip to content

Commit fefd5ed

Browse files
committed
fixed #518
fixed #519 fixed #514
1 parent 91e1ade commit fefd5ed

File tree

12 files changed

+133
-35
lines changed

12 files changed

+133
-35
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ else if (this.password != null) {
598598
return password;
599599
}
600600

601+
@JsonIgnore
601602
public AppConfig getAppConfig() {
602603
Properties properties = getProperties(this.environment);
603604
AppConfig config = new DefaultAppConfigFactory(name -> properties.getProperty(name)).newAppConfig();

marklogic-data-hub/src/main/java/com/marklogic/hub/HubProject.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public boolean isInitialized() {
126126
/**
127127
* Initializes a directory as a hub project directory.
128128
* This means putting certain files and folders in place.
129+
* @param customTokens - some custom tokens to start with
129130
*/
130131
public void init(Map<String, String> customTokens) {
131132
this.pluginsDir.toFile().mkdirs();

marklogic-data-hub/src/main/java/com/marklogic/hub/collector/DiskQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* reads/writes were happening simultaneously, once anything had spilled to
4343
* disk.
4444
*
45-
* @param <E>
45+
* @param <E> - A Serializable Class
4646
*/
4747
public class DiskQueue<E extends Serializable> extends AbstractQueue<String> {
4848

marklogic-data-hub/src/main/java/com/marklogic/hub/deploy/commands/LoadHubModulesCommand.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ private TokenReplacer buildModuleTokenReplacer(AppConfig appConfig) {
4545
});
4646
}
4747

48-
if (appConfig.getModuleTokensPropertiesSources() != null) {
49-
for (PropertiesSource ps : appConfig.getModuleTokensPropertiesSources()) {
50-
r.addPropertiesSource(ps);
51-
}
52-
}
53-
5448
return r;
5549
}
5650

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.marklogic.hub.flow;
22

3+
import com.fasterxml.jackson.databind.JsonNode;
4+
35
import java.util.List;
46

57
public class RunFlowResponse {
68
public long totalCount = 0;
79
public long errorCount = 0;
810
public List<String> completedItems;
911
public List<String> failedItems;
12+
public List<JsonNode> errors;
1013

1114
public String toString() {
12-
return "{totalCount: " + totalCount + ", errorCount: " + errorCount + ", completedItems: " + completedItems.size() + ", failedItems: " + failedItems.size() + "}";
15+
return "{totalCount: " + totalCount + ", errorCount: " + errorCount + ", completedItems: " + completedItems.size() + ", failedItems: " + failedItems.size() + ", errors: " + errors.size() + "}";
1316
}
1417
}

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.marklogic.hub.flow.impl;
1717

18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.databind.JsonNode;
1820
import com.fasterxml.jackson.databind.ObjectMapper;
1921
import com.marklogic.client.DatabaseClient;
2022
import com.marklogic.client.datamovement.*;
@@ -37,6 +39,7 @@
3739
import java.util.*;
3840
import java.util.concurrent.TimeUnit;
3941
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.stream.Collectors;
4043

4144
public class FlowRunnerImpl implements FlowRunner {
4245

@@ -189,7 +192,7 @@ public JobTicket run() {
189192
listener.onStatusChange(jobId, 0, "starting harmonization");
190193
});
191194

192-
ArrayList<String> errorMessages = new ArrayList<>();
195+
Vector<String> errorMessages = new Vector<>();
193196

194197
DataMovementManager dataMovementManager = sourceClient.newDataMovementManager();
195198

@@ -204,6 +207,11 @@ public JobTicket run() {
204207
RunFlowResponse response = flowRunner.run(jobId, batch.getItems(), options);
205208
failedEvents.addAndGet(response.errorCount);
206209
successfulEvents.addAndGet(response.totalCount - response.errorCount);
210+
if (response.errors != null) {
211+
ObjectMapper objectMapper = new ObjectMapper();
212+
213+
errorMessages.addAll(response.errors.stream().map(jsonNode -> jsonToString(jsonNode)).collect(Collectors.toList()));
214+
}
207215

208216
if (response.errorCount < response.totalCount) {
209217
successfulBatches.addAndGet(1);
@@ -281,7 +289,7 @@ else if (failedEvents.get() == 0 && successfulEvents.get() > 0) {
281289
.withEndTime(new Date());
282290

283291
if (errorMessages.size() > 0) {
284-
job.withJobOutput(String.join("\n", errorMessages));
292+
job.withJobOutput(errorMessages);
285293
}
286294
jobManager.saveJob(job);
287295
});
@@ -291,6 +299,15 @@ else if (failedEvents.get() == 0 && successfulEvents.get() > 0) {
291299
return new JobTicketImpl(jobId, JobTicket.JobType.QUERY_BATCHER).withQueryBatcher((QueryBatcherImpl)queryBatcher);
292300
}
293301

302+
private String jsonToString(JsonNode node) {
303+
try {
304+
ObjectMapper objectMapper = new ObjectMapper();
305+
return objectMapper.writeValueAsString(node);
306+
} catch (JsonProcessingException e) {
307+
throw new RuntimeException(e);
308+
}
309+
}
310+
294311
class FlowResource extends ResourceManager {
295312

296313
static final public String NAME = "flow";

marklogic-data-hub/src/main/java/com/marklogic/hub/job/Job.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.marklogic.client.pojo.annotation.Id;
44
import com.marklogic.hub.flow.Flow;
55

6+
import java.util.ArrayList;
67
import java.util.Date;
8+
import java.util.List;
79

810
public class Job {
911
private String jobId;
@@ -13,7 +15,7 @@ public class Job {
1315
private String jobName;
1416
private Date startTime;
1517
private Date endTime;
16-
private String jobOutput;
18+
private List<String> jobOutput;
1719
private JobStatus status = JobStatus.STARTED;
1820

1921
private long successfulEvents = 0;
@@ -43,11 +45,19 @@ public Job withJobName(String jobName) {
4345
return this;
4446
}
4547

46-
public Job withJobOutput(String jobOutput) {
48+
public Job withJobOutput(List<String> jobOutput) {
4749
this.jobOutput = jobOutput;
4850
return this;
4951
}
5052

53+
public Job withJobOutput(String jobOutput) {
54+
if (this.jobOutput == null) {
55+
this.jobOutput = new ArrayList<>();
56+
}
57+
this.jobOutput.add(jobOutput);
58+
return this;
59+
}
60+
5161
public Job withStartTime(Date startTime) {
5262
this.startTime = startTime;
5363
return this;
@@ -104,7 +114,7 @@ public JobStatus getStatus() {
104114
return status;
105115
}
106116

107-
public String getJobOutput() {
117+
public List<String> getJobOutput() {
108118
return jobOutput;
109119
}
110120

marklogic-data-hub/src/main/resources/ml-config/security/roles/data-hub-role.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@
4242
"action": "http://marklogic.com/xdmp/privileges/xdmp-invoke-in",
4343
"kind": "execute"
4444
},
45+
{
46+
"privilege-name": "xdbc:invoke",
47+
"action": "http://marklogic.com/xdmp/privileges/xdbc-invoke",
48+
"kind": "execute"
49+
},
50+
{
51+
"privilege-name": "xdbc:invoke-in",
52+
"action": "http://marklogic.com/xdmp/privileges/xdbc-invoke-in",
53+
"kind": "execute"
54+
},
4555
{
4656
"privilege-name": "xdmp:document-load",
4757
"action": "http://marklogic.com/xdmp/privileges/xdmp-document-load",

marklogic-data-hub/src/main/resources/ml-modules/root/com.marklogic.hub/lib/flow-lib.xqy

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -604,11 +604,7 @@ declare function flow:run-main(
604604
trace:set-plugin-label("main"),
605605
trace:error-trace($ex, xdmp:elapsed-time() - $before)
606606
),
607-
(: for input flows we want to rethrow to force a failure :)
608-
if (rfc:get-flow-type() eq $consts:INPUT_FLOW) then (
609-
xdmp:rethrow()
610-
)
611-
else ()
607+
xdmp:rethrow()
612608
}
613609
return
614610
$resp

marklogic-data-hub/src/main/resources/ml-modules/services/flow.xqy

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import module namespace consts = "http://marklogic.com/data-hub/consts"
2626
import module namespace debug = "http://marklogic.com/data-hub/debug"
2727
at "/com.marklogic.hub/lib/debug-lib.xqy";
2828

29+
import module namespace err = "http://marklogic.com/data-hub/err"
30+
at "/com.marklogic.hub/lib/error-lib.xqy";
31+
2932
import module namespace flow = "http://marklogic.com/data-hub/flow-lib"
3033
at "/com.marklogic.hub/lib/flow-lib.xqy";
3134

@@ -112,6 +115,7 @@ declare function post(
112115
flow:set-default-options($options, $flow),
113116
map:put($options, "target-database", $target-database)
114117
)
118+
let $errors := json:array()
115119
return
116120
if (fn:exists($flow)) then
117121
let $_ :=
@@ -121,16 +125,17 @@ declare function post(
121125
flow:run-flow($job-id, $flow, $identifier, $options)
122126
}
123127
catch($ex) {
124-
(: error is already logged in flow-lib:main() :)
125-
()
128+
xdmp:log(("caught error in flow.xqy")),
129+
json:array-push($errors, $ex/err:error-to-json(.))
126130
}
127131
let $resp :=
128132
document {
129133
object-node {
130134
"totalCount": fn:count($identifiers),
131135
"errorCount": trace:get-error-count(),
132136
"completedItems": trace:get-completed-items(),
133-
"failedItems": trace:get-failed-items()
137+
"failedItems": trace:get-failed-items(),
138+
"errors": $errors
134139
}
135140
}
136141
return

0 commit comments

Comments
 (0)