Skip to content

Commit c3189f4

Browse files
ryanjdewMarkLogic Builder
authored andcommitted
DHFPROD-7979: Allow ingest to write to DB other than STAGING or FINAL
1 parent 9dfb6a7 commit c3189f4

File tree

4 files changed

+77
-1
lines changed

4 files changed

+77
-1
lines changed

marklogic-data-hub-api/src/main/java/com/marklogic/hub/HubClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ static HubClient withHubClientConfig(HubClientConfig hubClientConfig) {
3232

3333
DatabaseClient getStagingClient();
3434

35+
DatabaseClient getStagingClient(String databaseName);
36+
3537
DatabaseClient getFinalClient();
3638

3739
DatabaseClient getJobsClient();

marklogic-data-hub-api/src/main/java/com/marklogic/hub/impl/HubClientImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
public class HubClientImpl implements HubClient {
2727

28+
private final HubClientConfig hubClientConfig;
2829
private String username;
2930
private DatabaseClient stagingClient;
3031
private DatabaseClient finalClient;
@@ -34,6 +35,7 @@ public class HubClientImpl implements HubClient {
3435
private ManageClient manageClient;
3536

3637
public HubClientImpl(HubClientConfig hubClientConfig) {
38+
this.hubClientConfig = hubClientConfig;
3739
username = hubClientConfig.getUsername();
3840
stagingClient = hubClientConfig.newStagingClient(null);
3941
finalClient = hubClientConfig.newFinalClient(null);
@@ -58,6 +60,11 @@ public DatabaseClient getStagingClient() {
5860
return stagingClient;
5961
}
6062

63+
@Override
64+
public DatabaseClient getStagingClient(String databaseName) {
65+
return hubClientConfig.newStagingClient(databaseName);
66+
}
67+
6168
@Override
6269
public DatabaseClient getFinalClient() {
6370
return finalClient;

marklogic-data-hub/src/main/java/com/marklogic/hub/step/impl/WriteStepRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,10 @@ private RunStepResponse runIngester(RunStepResponse runStepResponse, Collection<
447447

448448
Vector<String> errorMessages = new Vector<>();
449449

450+
// Optimize for Hub databases associated with Hub app servers, but allow other values
450451
dataMovementManager = destinationDatabase.equals(hubClient.getDbName(DatabaseKind.FINAL)) ?
451452
hubClient.getFinalClient().newDataMovementManager() :
452-
hubClient.getStagingClient().newDataMovementManager();
453+
hubClient.getStagingClient(destinationDatabase).newDataMovementManager();
453454

454455
HashMap<String, JobTicket> ticketWrapper = new HashMap<>();
455456

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.marklogic.hub.flow;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.marklogic.client.io.JacksonHandle;
6+
import com.marklogic.hub.AbstractHubCoreTest;
7+
import com.marklogic.hub.DatabaseKind;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertTrue;
15+
16+
public class IngestToJobsTest extends AbstractHubCoreTest {
17+
final String docUri = "/customers/customer1.json";
18+
/**
19+
* This is a test cover the scenario of ingesting to a database that isn't a Data Hub content DB (e.g., STAGING or FINAL).
20+
* We use JOBS, since it is a database we know must exist.
21+
*/
22+
@Test
23+
void ingestToJobs() throws JsonProcessingException {
24+
final String flowName = "ingestToFinal";
25+
26+
removeJobsDocument();
27+
installReferenceModelProject();
28+
29+
copyArtifactsToJobs();
30+
31+
makeInputFilePathsAbsoluteInFlow(flowName);
32+
FlowInputs flowInputs = new FlowInputs(flowName).withSteps("1");
33+
// Using runtime options, change the target DB to JOBS
34+
Map<String,Object> options = new HashMap<>();
35+
flowInputs.withOption("targetDatabase", getHubConfig().getDbName(DatabaseKind.JOB));
36+
RunFlowResponse runFlowResponse = runFlow(flowInputs);
37+
assertEquals("finished", runFlowResponse.getJobStatus());
38+
JsonNode rawDoc = getHubClient().getJobsClient().newJSONDocumentManager().read(docUri, new JacksonHandle()).get();
39+
assertEquals("1", rawDoc.get("envelope").get("instance").get("CustomerID").asText(),
40+
"Verifying that the single doc was ingested into the jobs database");
41+
}
42+
43+
private void copyArtifactsToJobs() {
44+
retryIfNecessary(() -> {
45+
getClientByName(getHubClient().getDbName(DatabaseKind.JOB)).newServerEval().addVariable("source-db", getHubClient().getDbName(DatabaseKind.FINAL)).xquery(
46+
"declare variable $source-db external;\n" +
47+
"for $collection in ('http://marklogic.com/data-hub/flow','http://marklogic.com/data-hub/step-definition')" +
48+
"let $artifacts := xdmp:invoke-function(function() {\n" +
49+
" map:new(\n" +
50+
" fn:collection($collection) ! (map:entry(xdmp:node-uri(.), .))\n" +
51+
" )\n" +
52+
" }, map:entry('database', xdmp:database($source-db)))\n" +
53+
"for $uri in map:keys($artifacts)\n" +
54+
"return xdmp:document-insert($uri, map:get($artifacts, $uri), map:entry('collections', $collection))"
55+
).eval();
56+
});
57+
}
58+
59+
private void removeJobsDocument() {
60+
try {
61+
getClientByName(DatabaseKind.getName(DatabaseKind.JOB)).newDocumentManager().delete(docUri);
62+
} catch (Exception e) {
63+
// ignoring exception, since the document may not exist
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)