Skip to content

Commit 46ffd38

Browse files
committed
Merge branch 'feature/speed-it-up' into develop
2 parents 878a1d7 + 6a9752c commit 46ffd38

30 files changed

+188
-74
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,10 @@ private ConfigurableApplicationContext buildApplicationContext(Flow flow, JobSta
146146
return ctx;
147147
}
148148

149-
private JobParameters buildJobParameters(Flow flow, int batchSize) {
149+
private JobParameters buildJobParameters(Flow flow, int batchSize, int threadCount) {
150150
JobParametersBuilder jpb = new JobParametersBuilder();
151-
jpb.addLong("chunk", Integer.toUnsignedLong(batchSize));
151+
jpb.addLong("batchSize", Integer.toUnsignedLong(batchSize));
152+
jpb.addLong("threadCount", Integer.toUnsignedLong(threadCount));
152153
jpb.addString("uid", UUID.randomUUID().toString());
153154
jpb.addString("flowType", flow.getType().toString());
154155
jpb.addString("entity", flow.getEntityName());
@@ -163,12 +164,12 @@ private JobParameters buildJobParameters(Flow flow, int batchSize) {
163164
* @param statusListener - the callback to receive job status updates
164165
* @return a JobExecution instance
165166
*/
166-
public JobExecution runFlow(Flow flow, int batchSize, JobStatusListener statusListener) {
167+
public JobExecution runFlow(Flow flow, int batchSize, int threadCount, JobStatusListener statusListener) {
167168
JobExecution result = null;
168169
try {
169170
ConfigurableApplicationContext ctx = buildApplicationContext(flow, statusListener);
170171

171-
JobParameters params = buildJobParameters(flow, batchSize);
172+
JobParameters params = buildJobParameters(flow, batchSize, threadCount);
172173
JobLauncher launcher = ctx.getBean(JobLauncher.class);
173174
Job job = ctx.getBean(Job.class);
174175
result = launcher.run(job, params);

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/FlowWriter.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.util.List;
44

5+
import com.fasterxml.jackson.core.JsonProcessingException;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
57
import org.springframework.batch.item.ItemWriter;
68

79
import com.marklogic.client.DatabaseClient;
@@ -18,36 +20,28 @@ public class FlowWriter extends ResourceManager implements ItemWriter<String> {
1820

1921
private DatabaseClient client;
2022
private Flow flow;
23+
StringHandle handle;
24+
RequestParameters params = new RequestParameters();
2125

2226
public FlowWriter(DatabaseClient client, Flow flow) {
2327
super();
2428
this.flow = flow;
2529
this.client = client;
2630
this.client.init(NAME, this);
31+
handle = new StringHandle(flow.serialize(true));
32+
handle.setFormat(Format.XML);
2733
}
2834

2935
@Override
3036
public void write(List<? extends String> items) {
3137

32-
Transaction transaction = null;
3338
try {
34-
transaction = client.openTransaction();
35-
for (String id: items) {
36-
RequestParameters params = new RequestParameters();
37-
params.add("identifier", id);
38-
39-
StringHandle handle = new StringHandle(flow.serialize(true));
40-
handle.setFormat(Format.XML);
41-
this.getServices().post(params, handle, transaction);
42-
}
43-
transaction.commit();
39+
params.put("identifier", items.toArray(new String[items.size()]));
40+
this.getServices().post(params, handle);
4441
}
4542
catch(Exception e) {
46-
if (transaction != null) {
47-
transaction.rollback();
48-
}
4943
e.printStackTrace();
50-
throw e;
51-
}
44+
throw e;
45+
}
5246
}
5347
}

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/HubJobRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.springframework.batch.core.launch.JobLauncher;
1212
import org.springframework.context.ConfigurableApplicationContext;
1313
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
14-
import org.springframework.core.env.JOptCommandLinePropertySource;
1514

1615
import java.util.Arrays;
1716
import java.util.Date;

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/RunHarmonizeFlowConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import org.springframework.batch.item.support.PassThroughItemProcessor;
1212
import org.springframework.beans.factory.annotation.Autowired;
1313
import org.springframework.beans.factory.annotation.Qualifier;
14+
import org.springframework.beans.factory.annotation.Value;
1415
import org.springframework.context.annotation.Bean;
1516
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
1618

1719
import java.util.List;
1820

@@ -48,17 +50,19 @@ public void afterJob(JobExecution jobExecution) {
4850

4951
@Bean
5052
@JobScope
51-
protected Step step1() {
53+
protected Step step1(@Value("#{jobParameters['batchSize']}") int batchSize, @Value("#{jobParameters['threadCount']}") int threadCount) {
5254

5355
Collector c = flow.getCollector();
5456
if (c instanceof ServerCollector) {
5557
((ServerCollector)c).setClient(getDatabaseClient());
5658
}
5759

5860
ItemProcessor<String, String> ip = new PassThroughItemProcessor<>();
61+
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
62+
taskExecutor.setConcurrencyLimit(threadCount);
5963

6064
return stepBuilderFactory.get("step1")
61-
.<String, String>chunk(getChunkSize())
65+
.<String, String>chunk(batchSize)
6266
.reader(new CollectorReader(c))
6367
.processor(ip)
6468
.listener(new ItemWriteListener<String>() {
@@ -101,6 +105,7 @@ public void afterChunkError(ChunkContext context) {
101105

102106
}
103107
})
108+
.taskExecutor(taskExecutor)
104109
.build();
105110
}
106111
}

marklogic-data-hub/src/main/resources/ml-modules/root/com.marklogic.hub/lib/debug-lib.xqy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ declare function debug:dump-env($name as xs:string?)
8080
if ($request-path = '/MarkLogic/rest-api/endpoints/resource-service-query.xqy') then
8181
let $params := fn:string-join(
8282
for $f in xdmp:get-request-field-names()[fn:starts-with(., "rs:")]
83+
let $value := xdmp:get-request-field($f)
8384
return
84-
$f || "=" || xdmp:get-request-field($f),
85+
$f || "=" || fn:string-join($value, ", "),
8586
"&amp;")
8687
return
8788
"/v1/resources/" || xdmp:get-request-field("name") || "?" || $params
@@ -108,9 +109,9 @@ declare function debug:dump-env($name as xs:string?)
108109
" " || $h || " => " || xdmp:get-request-header($h),
109110
"",
110111
" [Request Params]",
111-
for $p in xdmp:get-request-field-names()
112+
for $p in xdmp:get-request-field-names()[fn:not(fn:starts-with(., "rs:"))]
112113
return
113-
" " || $p || " => " || xdmp:get-request-field($p),
114+
" " || $p || " => " || fn:string-join(xdmp:get-request-field($p), ", "),
114115
let $body := xdmp:get-request-body()
115116
return
116117
if (fn:exists($body)) then

marklogic-data-hub/src/main/resources/ml-modules/root/com.marklogic.hub/lib/trace-lib.xqy

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ declare variable $FORMAT-JSON := "json";
3333

3434
declare %private variable $current-trace-settings := map:map();
3535

36-
declare %private variable $current-trace := map:new((
37-
map:entry("traceId", xdmp:random()),
38-
map:entry("created", fn:current-dateTime())
39-
));
36+
declare %private variable $current-trace := trace:new-trace();
37+
38+
declare function trace:new-trace()
39+
{
40+
map:new((
41+
map:entry("traceId", xdmp:random()),
42+
map:entry("created", fn:current-dateTime())
43+
))
44+
};
4045

4146
declare function trace:enable-tracing($enabled as xs:boolean)
4247
{
@@ -77,7 +82,7 @@ declare function trace:init-trace($format as xs:string)
7782

7883
declare function trace:write-trace()
7984
{
80-
if (trace:enabled() or trace:has-errors()) then
85+
if (trace:enabled() or trace:has-errors()) then (
8186
let $format := map:get($current-trace-settings, "data-format")
8287
let $trace :=
8388
if ($format eq $FORMAT-JSON) then
@@ -145,7 +150,9 @@ declare function trace:write-trace()
145150
map:new((
146151
map:entry("database", xdmp:database($config:TRACING-DATABASE)),
147152
map:entry("transactionMode", "update-auto-commit")
148-
)))
153+
))),
154+
xdmp:set($current-trace, trace:new-trace())
155+
)
149156
else ()
150157
};
151158

marklogic-data-hub/src/main/resources/ml-modules/services/flow.xqy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ declare %rapi:transaction-mode("update") function post(
7777

7878
perf:log('/v1/resources/flow:post', function() {
7979
let $flow as element(hub:flow) := $input/hub:flow
80-
let $identifier := map:get($params, "identifier")
8180
let $options := map:new((
8281
map:entry("entity", $flow/hub:entity/fn:data()),
8382
map:entry("flow", $flow/hub:name/fn:data()),
8483
map:entry("flowType", $flow/hub:type/fn:data())
8584
))
85+
for $identifier in map:get($params, "identifier")
8686
let $_ := flow:run-flow($flow, $identifier, $options)
8787
return
8888
document { () }

marklogic-data-hub/src/test/java/com/marklogic/hub/EndToEndTestSjsJson.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void runFlows() throws IOException, ParserConfigurationException, SAXExce
6666
stagingDocMgr.write("/input.json", new JacksonHandle(getJsonFromResource("e2e-test/staged.json")));
6767

6868
JobFinishedListener harmonizeFlowListener = new JobFinishedListener();
69-
fm.runFlow(harmonizeFlow, 10, harmonizeFlowListener);
69+
fm.runFlow(harmonizeFlow, 10, 1, harmonizeFlowListener);
7070
harmonizeFlowListener.waitForFinish();
7171
String expected = getResource("e2e-test/final.json");
7272
String actual = finalDocMgr.read("/input.json").next().getContent(new StringHandle()).get();

marklogic-data-hub/src/test/java/com/marklogic/hub/EndToEndTestSjsXml.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void runFlows() throws IOException, ParserConfigurationException, SAXExce
7070
stagingDocMgr.write("/input.xml", new DOMHandle(getXmlFromResource("e2e-test/staged.xml")));
7171

7272
JobFinishedListener harmonizeFlowListener = new JobFinishedListener();
73-
fm.runFlow(harmonizeFlow, 10, harmonizeFlowListener);
73+
fm.runFlow(harmonizeFlow, 10, 1, harmonizeFlowListener);
7474
harmonizeFlowListener.waitForFinish();
7575
assertXMLEqual(getXmlFromResource("e2e-test/final.xml"), finalDocMgr.read("/input.xml").next().getContent(new DOMHandle()).get());
7676
}

marklogic-data-hub/src/test/java/com/marklogic/hub/EndToEndTestXqyJson.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void runFlowwithTriplesNodeStar() throws IOException, ParserConfiguration
6565
stagingDocMgr.write("/input.json", new JacksonHandle(getJsonFromResource("e2e-test/staged.json")));
6666

6767
JobFinishedListener harmonizeFlowListener = new JobFinishedListener();
68-
fm.runFlow(harmonizeFlow, 10, harmonizeFlowListener);
68+
fm.runFlow(harmonizeFlow, 10, 1, harmonizeFlowListener);
6969
harmonizeFlowListener.waitForFinish();
7070
String expected = getResource("e2e-test/final.json");
7171
String actual = finalDocMgr.read("/input.json").next().getContent(new StringHandle()).get();
@@ -82,7 +82,7 @@ public void runFlowWithTriplesJsonArray() throws IOException, ParserConfiguratio
8282
stagingDocMgr.write("/input.json", new JacksonHandle(getJsonFromResource("e2e-test/staged.json")));
8383

8484
JobFinishedListener harmonizeFlowListener = new JobFinishedListener();
85-
fm.runFlow(harmonizeFlow, 10, harmonizeFlowListener);
85+
fm.runFlow(harmonizeFlow, 10, 1, harmonizeFlowListener);
8686
harmonizeFlowListener.waitForFinish();
8787
String expected = getResource("e2e-test/final.json");
8888
String actual = finalDocMgr.read("/input.json").next().getContent(new StringHandle()).get();

0 commit comments

Comments
 (0)