Skip to content

Commit e920dff

Browse files
committed
fixed #488
1 parent 8eb121a commit e920dff

File tree

14 files changed

+339
-58
lines changed

14 files changed

+339
-58
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public DatabaseClient newStagingClient() {
281281
return DatabaseClientFactory.newClient(
282282
host,
283283
stagingPort,
284+
stagingDbName,
284285
username,
285286
password,
286287
DatabaseClientFactory.Authentication.valueOf(stagingAuthMethod.toUpperCase()));
@@ -308,6 +309,7 @@ public DatabaseClient newFinalClient() {
308309
return DatabaseClientFactory.newClient(
309310
host,
310311
finalPort,
312+
finalDbName,
311313
username,
312314
password,
313315
DatabaseClientFactory.Authentication.valueOf(finalAuthMethod.toUpperCase()));
@@ -321,6 +323,7 @@ public DatabaseClient newJobDbClient() {
321323
return DatabaseClientFactory.newClient(
322324
host,
323325
jobPort,
326+
jobDbName,
324327
username,
325328
password,
326329
DatabaseClientFactory.Authentication.valueOf(jobAuthMethod.toUpperCase()));
@@ -348,6 +351,7 @@ public DatabaseClient newTraceDbClient() {
348351
return DatabaseClientFactory.newClient(
349352
host,
350353
tracePort,
354+
traceDbName,
351355
username,
352356
password,
353357
DatabaseClientFactory.Authentication.valueOf(stagingAuthMethod.toUpperCase()));

marklogic-data-hub/src/main/java/com/marklogic/hub/collector/ServerCollector.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919
import com.marklogic.client.DatabaseClient;
2020
import com.marklogic.hub.HubConfig;
21-
import com.marklogic.hub.HubDatabase;
2221
import com.marklogic.hub.plugin.PluginType;
2322
import com.marklogic.xcc.*;
2423
import org.slf4j.Logger;
@@ -33,7 +32,6 @@ public class ServerCollector extends AbstractCollector {
3332
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
3433
private DatabaseClient client = null;
3534
private HubConfig hubConfig = null;
36-
private HubDatabase sourceDb = null;
3735

3836
private String module;
3937

@@ -43,7 +41,6 @@ public ServerCollector(PluginType type, String module) {
4341
}
4442

4543
public void setHubConfig(HubConfig config) { this.hubConfig = config; }
46-
public void setHubDatabase(HubDatabase sourceDb) { this.sourceDb = sourceDb; }
4744

4845
public DatabaseClient getClient() {
4946
return this.client;
@@ -68,15 +65,8 @@ public void serialize(XMLStreamWriter serializer) throws XMLStreamException {
6865

6966
@Override
7067
public DiskQueue<String> run(String jobId, int threadCount, Map<String, Object> options) {
71-
int port = hubConfig.stagingPort;
72-
String dbName = hubConfig.stagingDbName;
73-
if (sourceDb.equals(HubDatabase.FINAL)) {
74-
port = hubConfig.finalPort;
75-
dbName = hubConfig.finalDbName;
76-
}
77-
7868
try {
79-
ContentSource cs = ContentSourceFactory.newContentSource(hubConfig.host, port, hubConfig.getUsername(), hubConfig.getPassword(), dbName);
69+
ContentSource cs = ContentSourceFactory.newContentSource(client.getHost(), client.getPort(), hubConfig.getUsername(), hubConfig.getPassword(), client.getDatabase());
8070
Session activeSession = cs.newSession();
8171
RequestOptions requestOptions = new RequestOptions();
8272
requestOptions.setCacheResult(false);

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/FlowRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.marklogic.hub.flow;
22

3+
import com.marklogic.client.DatabaseClient;
34
import com.marklogic.client.datamovement.JobTicket;
45
import com.marklogic.hub.HubDatabase;
56

@@ -11,8 +12,8 @@ public interface FlowRunner {
1112
FlowRunner withFlow(Flow flow);
1213
FlowRunner withBatchSize(int batchSize);
1314
FlowRunner withThreadCount(int threadCount);
14-
FlowRunner withSourceDatabase(HubDatabase sourceDatabase);
15-
FlowRunner withDestinationDatabase(HubDatabase destinationDatabase);
15+
FlowRunner withSourceClient(DatabaseClient sourceClient);
16+
FlowRunner withDestinationDatabase(String destinationDatabase);
1617
FlowRunner withOptions(Map<String, Object> options);
1718

1819
FlowRunner onItemComplete(FlowItemCompleteListener listener);

marklogic-data-hub/src/main/java/com/marklogic/hub/flow/impl/FlowRunnerImpl.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public class FlowRunnerImpl implements FlowRunner {
3232
private Flow flow;
3333
private int batchSize = DEFAULT_BATCH_SIZE;
3434
private int threadCount = DEFAULT_THREAD_COUNT;
35-
private HubDatabase sourceDatabase = HubDatabase.STAGING;
36-
private HubDatabase destinationDatabase = HubDatabase.FINAL;
35+
private DatabaseClient sourceClient;
36+
private String destinationDatabase;
3737
private Map<String, Object> options;
3838
private int previousPercentComplete;
3939

@@ -47,6 +47,8 @@ public class FlowRunnerImpl implements FlowRunner {
4747

4848
public FlowRunnerImpl(HubConfig hubConfig) {
4949
this.hubConfig = hubConfig;
50+
this.sourceClient = hubConfig.newStagingClient();
51+
this.destinationDatabase = hubConfig.finalDbName;
5052
}
5153

5254
@Override
@@ -68,13 +70,13 @@ public FlowRunner withThreadCount(int threadCount) {
6870
}
6971

7072
@Override
71-
public FlowRunner withSourceDatabase(HubDatabase sourceDatabase) {
72-
this.sourceDatabase = sourceDatabase;
73+
public FlowRunner withSourceClient(DatabaseClient sourceClient) {
74+
this.sourceClient = sourceClient;
7375
return this;
7476
}
7577

7678
@Override
77-
public FlowRunner withDestinationDatabase(HubDatabase destinationDatabase) {
79+
public FlowRunner withDestinationDatabase(String destinationDatabase) {
7880
this.destinationDatabase = destinationDatabase;
7981
return this;
8082
}
@@ -122,17 +124,6 @@ public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedExcep
122124
runningThread.join(unit.convert(timeout, TimeUnit.MILLISECONDS));
123125
}
124126

125-
private DatabaseClient getSourceClient() {
126-
DatabaseClient srcClient;
127-
if (sourceDatabase.equals(HubDatabase.STAGING)) {
128-
srcClient = hubConfig.newStagingClient();
129-
}
130-
else {
131-
srcClient = hubConfig.newFinalClient();
132-
}
133-
return srcClient;
134-
}
135-
136127
@Override
137128
public JobTicket run() {
138129
String jobId = UUID.randomUUID().toString();
@@ -142,13 +133,10 @@ public JobTicket run() {
142133
.withJobId(jobId);
143134
jobManager.saveJob(job);
144135

145-
DatabaseClient srcClient = getSourceClient();
146-
147136
Collector c = flow.getCollector();
148137
if (c instanceof ServerCollector) {
149138
((ServerCollector)c).setHubConfig(hubConfig);
150-
((ServerCollector)c).setHubDatabase(sourceDatabase);
151-
((ServerCollector)c).setClient(srcClient);
139+
((ServerCollector)c).setClient(sourceClient);
152140
}
153141

154142
AtomicLong successfulEvents = new AtomicLong(0);
@@ -176,7 +164,7 @@ public JobTicket run() {
176164

177165
ArrayList<String> errorMessages = new ArrayList<>();
178166

179-
DataMovementManager dataMovementManager = srcClient.newDataMovementManager();
167+
DataMovementManager dataMovementManager = sourceClient.newDataMovementManager();
180168

181169
double batchCount = Math.ceil((double)uris.size() / (double)batchSize);
182170

@@ -185,7 +173,7 @@ public JobTicket run() {
185173
.withThreadCount(threadCount)
186174
.onUrisReady((QueryBatch batch) -> {
187175
try {
188-
FlowResource flowRunner = new FlowResource(batch.getClient(), batch.getClient().getDatabase(), flow);
176+
FlowResource flowRunner = new FlowResource(batch.getClient(), destinationDatabase, flow);
189177
RunFlowResponse response = flowRunner.run(jobId, batch.getItems(), options);
190178
failedEvents.addAndGet(response.errorCount);
191179
successfulEvents.addAndGet(response.totalCount - response.errorCount);

marklogic-data-hub/src/test/java/com/marklogic/hub/FlowManagerTest.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ public static void setup() throws IOException {
5252

5353
installHub();
5454

55-
clearDatabases(new String[]{HubConfig.DEFAULT_STAGING_NAME, HubConfig.DEFAULT_FINAL_NAME});
55+
clearDatabases(new String[]{HubConfig.DEFAULT_MODULES_DB_NAME, HubConfig.DEFAULT_STAGING_NAME, HubConfig.DEFAULT_FINAL_NAME});
56+
57+
getDataHub().installHubModules();
58+
59+
enableDebugging();
5660

57-
DocumentMetadataHandle meta = new DocumentMetadataHandle();
58-
meta.getCollections().add("tester");
59-
installStagingDoc("/employee1.xml", meta, getResource("flow-manager-test/input/employee1.xml"));
60-
installStagingDoc("/employee2.xml", meta, getResource("flow-manager-test/input/employee2.xml"));
6161
runInModules("xdmp:directory-create(\"/entities/test/harmonize/my-test-flow1/collector/\"),"
6262
+ "xdmp:directory-create(\"/entities/test/harmonize/my-test-flow1/headers/\"),"
6363
+ "xdmp:directory-create(\"/entities/test/harmonize/my-test-flow1/triples/\"),"
@@ -77,10 +77,20 @@ public static void setup() throws IOException {
7777
installModules(modules);
7878
}
7979

80-
@After
81-
public void afterEach() {
82-
finalDocMgr.delete("/employee1.xml");
83-
finalDocMgr.delete("/employee2.xml");
80+
private static void addStagingDocs() throws IOException {
81+
clearDatabases(new String[]{HubConfig.DEFAULT_STAGING_NAME, HubConfig.DEFAULT_FINAL_NAME});
82+
DocumentMetadataHandle meta = new DocumentMetadataHandle();
83+
meta.getCollections().add("tester");
84+
installStagingDoc("/employee1.xml", meta, getResource("flow-manager-test/input/employee1.xml"));
85+
installStagingDoc("/employee2.xml", meta, getResource("flow-manager-test/input/employee2.xml"));
86+
}
87+
88+
private static void addFinalDocs() throws IOException {
89+
clearDatabases(new String[]{HubConfig.DEFAULT_STAGING_NAME, HubConfig.DEFAULT_FINAL_NAME});
90+
DocumentMetadataHandle meta = new DocumentMetadataHandle();
91+
meta.getCollections().add("tester");
92+
installFinalDoc("/employee1.xml", meta, getResource("flow-manager-test/input/employee1.xml"));
93+
installFinalDoc("/employee2.xml", meta, getResource("flow-manager-test/input/employee2.xml"));
8494
}
8595

8696
@Test
@@ -208,6 +218,7 @@ public void getTestFlow() {
208218

209219
@Test
210220
public void testRunFlow() throws SAXException, IOException, ParserConfigurationException, XMLStreamException {
221+
addStagingDocs();
211222
installModule("/entities/test/harmonize/my-test-flow1/my-test-flow1.xml", "flow-manager-test/my-test-flow1/my-test-flow1.xml");
212223
assertEquals(2, getStagingDocCount());
213224
assertEquals(0, getFinalDocCount());
@@ -225,8 +236,32 @@ public void testRunFlow() throws SAXException, IOException, ParserConfigurationE
225236
assertXMLEqual(getXmlFromResource("flow-manager-test/harmonized/harmonized2.xml"), finalDocMgr.read("/employee2.xml").next().getContent(new DOMHandle()).get());
226237
}
227238

239+
@Test
240+
public void testRunFlowWithBackwards() throws SAXException, IOException, ParserConfigurationException, XMLStreamException {
241+
addFinalDocs();
242+
243+
installModule("/entities/test/harmonize/my-test-flow1/my-test-flow1.xml", "flow-manager-test/my-test-flow1/my-test-flow1.xml");
244+
assertEquals(0, getStagingDocCount());
245+
assertEquals(2, getFinalDocCount());
246+
FlowManager fm = new FlowManager(getHubConfig());
247+
SimpleFlow flow1 = (SimpleFlow)fm.getFlow("test", "my-test-flow1");
248+
FlowRunner flowRunner = fm.newFlowRunner()
249+
.withFlow(flow1)
250+
.withBatchSize(10)
251+
.withThreadCount(1)
252+
.withSourceClient(getHubConfig().newFinalClient())
253+
.withDestinationDatabase(HubConfig.DEFAULT_STAGING_NAME);
254+
flowRunner.run();
255+
flowRunner.awaitCompletion();
256+
assertEquals(2, getStagingDocCount());
257+
assertEquals(2, getFinalDocCount());
258+
assertXMLEqual(getXmlFromResource("flow-manager-test/harmonized/harmonized1.xml"), stagingDocMgr.read("/employee1.xml").next().getContent(new DOMHandle()).get() );
259+
assertXMLEqual(getXmlFromResource("flow-manager-test/harmonized/harmonized2.xml"), stagingDocMgr.read("/employee2.xml").next().getContent(new DOMHandle()).get());
260+
}
261+
228262
@Test
229263
public void testRunFlowWithHeader() throws SAXException, IOException, ParserConfigurationException, XMLStreamException {
264+
addStagingDocs();
230265
HashMap<String, String> modules = new HashMap<>();
231266
modules.put("/entities/test/harmonize/my-test-flow-with-header/collector/collector.xqy", "flow-manager-test/my-test-flow-with-header/collector/collector.xqy");
232267
modules.put("/entities/test/harmonize/my-test-flow-with-header/headers/headers.xqy", "flow-manager-test/my-test-flow-with-header/headers/headers.xqy");
@@ -252,6 +287,7 @@ public void testRunFlowWithHeader() throws SAXException, IOException, ParserConf
252287

253288
@Test
254289
public void testRunFlowWithAll() throws SAXException, IOException, ParserConfigurationException, XMLStreamException {
290+
addStagingDocs();
255291
HashMap<String, String> modules = new HashMap<>();
256292
modules.put("/entities/test/harmonize/my-test-flow-with-all/my-test-flow-with-all.xml", "flow-manager-test/my-test-flow-with-all/my-test-flow-with-all.xml");
257293
modules.put("/entities/test/harmonize/my-test-flow-with-all/collector/collector.xqy", "flow-manager-test/my-test-flow-with-all/collector/collector.xqy");

ml-data-hub-plugin/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies {
3535

3636
testCompile localGroovy()
3737
testCompile gradleTestKit()
38+
testCompile 'xmlunit:xmlunit:1.3'
3839
testCompile('org.spockframework:spock-core:1.1-groovy-2.4-rc-2') {
3940
exclude module: 'groovy-all'
4041
}

ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/RunFlowTask.groovy

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.marklogic.gradle.task
22

3+
import com.marklogic.client.DatabaseClient
34
import com.marklogic.gradle.exception.EntityNameRequiredException
45
import com.marklogic.gradle.exception.FlowNameRequiredException
56
import com.marklogic.gradle.exception.FlowNotFoundException
@@ -30,16 +31,17 @@ class RunFlowTask extends HubTask {
3031
public Integer threadCount
3132

3233
@Input
33-
public HubDatabase sourceDB
34+
public String sourceDB
3435

3536
@Input
36-
public HubDatabase destDB
37+
public String destDB
3738

3839
@Input
3940
public Boolean showOptions
4041

4142
@TaskAction
4243
void runFlow() {
44+
println("running a flow!!")
4345
if (entityName == null) {
4446
entityName = project.hasProperty("entityName") ? project.property("entityName") : null
4547
}
@@ -64,13 +66,20 @@ class RunFlowTask extends HubTask {
6466
threadCount = project.hasProperty("threadCount") ?
6567
Integer.parseInt(project.property("threadCount")) : 4
6668
}
69+
70+
DatabaseClient sourceClient = null
6771
if (sourceDB == null) {
68-
sourceDB = project.hasProperty("sourceDB") ?
69-
HubDatabase.getHubDatabase(project.property("sourceDB")) : HubDatabase.STAGING
72+
if (project.hasProperty("sourceDB")) {
73+
sourceClient = hubConfig.newStagingClient()
74+
sourceClient.database = project.property("sourceDB")
75+
}
76+
else {
77+
sourceClient = hubConfig.newStagingClient()
78+
}
7079
}
7180
if (destDB == null) {
7281
destDB = project.hasProperty("destDB") ?
73-
HubDatabase.getHubDatabase(project.property("destDB")) : HubDatabase.FINAL
82+
project.property("destDB") : hubConfig.finalDbName
7483
}
7584
if (showOptions == null) {
7685
showOptions = project.hasProperty("showOptions") ?
@@ -96,7 +105,7 @@ class RunFlowTask extends HubTask {
96105
println("Running Flow: [" + entityName + ":" + flowName + "]" +
97106
"\n\twith batch size: " + batchSize +
98107
"\n\twith thread count: " + threadCount +
99-
"\n\twith Source DB: " + sourceDB.toString() +
108+
"\n\twith Source DB: " + sourceClient.database +
100109
"\n\twith Destination DB: " + destDB.toString())
101110

102111
if (showOptions) {
@@ -111,7 +120,7 @@ class RunFlowTask extends HubTask {
111120
.withOptions(options)
112121
.withBatchSize(batchSize)
113122
.withThreadCount(threadCount)
114-
.withSourceDatabase(sourceDB)
123+
.withSourceClient(sourceClient)
115124
.withDestinationDatabase(destDB)
116125
flowRunner.run()
117126
flowRunner.awaitCompletion()

0 commit comments

Comments
 (0)