Skip to content

Commit a5dc71f

Browse files
ryanjdewSameeraPriyathamTadikonda
authored andcommitted
DHFPROD-5305: Provide privileges required for MLCP copy (#4176)
1 parent ca26948 commit a5dc71f

File tree

5 files changed

+130
-36
lines changed

5 files changed

+130
-36
lines changed

marklogic-data-hub/build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ dependencies {
7272
testCompile 'org.easymock:easymock:3.4'
7373
testCompile 'ch.qos.logback:logback-classic:1.1.11'
7474
testCompile 'org.slf4j:log4j-over-slf4j:1.7.13'
75-
testCompile("com.marklogic:mlcp-util:0.3.0")
76-
testCompile("com.marklogic:mlcp:10.0.1") {
75+
testCompile("com.marklogic:mlcp:10.0.4.2") {
7776
exclude group: 'org.apache.avro', module: 'avro-tools'
7877
exclude group: 'org.apache.commons', module: 'commons-csv'
7978
}

marklogic-data-hub/src/main/java/com/marklogic/hub/util/MlcpRunner.java

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public class MlcpRunner extends ProcessRunner {
5050
private DatabaseClient databaseClient;
5151
private String database = null;
5252

53+
public MlcpRunner(String mainClass, HubConfig hubConfig, JsonNode mlcpOptions) {
54+
this(null, mainClass, hubConfig, null, null, mlcpOptions, null);
55+
}
56+
5357
public MlcpRunner(String mlcpPath, String mainClass, HubConfig hubConfig, LegacyFlow flow, DatabaseClient databaseClient, JsonNode mlcpOptions, LegacyFlowStatusListener statusListener) {
5458
super();
5559

@@ -73,26 +77,40 @@ public String getJobId() {
7377
public void run() {
7478
HubConfig hubConfig = getHubConfig();
7579

76-
Job job = Job.withFlow(flow)
77-
.withJobId(jobId);
78-
jobManager.saveJob(job);
80+
Job job = null;
81+
if (flow != null) {
82+
job = Job.withFlow(flow)
83+
.withJobId(jobId);
84+
jobManager.saveJob(job);
85+
}
7986

8087
try {
8188
MlcpBean bean = new ObjectMapper().readerFor(MlcpBean.class).readValue(mlcpOptions);
82-
bean.setHost(databaseClient.getHost());
83-
bean.setPort(databaseClient.getPort());
89+
if (databaseClient != null) {
90+
bean.setHost(databaseClient.getHost());
91+
bean.setPort(databaseClient.getPort());
92+
}
8493
if (database != null) {
8594
bean.setDatabase(database);
8695
}
8796

88-
// Assume that the HTTP credentials will work for mlcp
89-
bean.setUsername(hubConfig.getAppConfig().getAppServicesUsername());
90-
bean.setPassword(hubConfig.getAppConfig().getAppServicesPassword());
9197

92-
File file = new File(mlcpOptions.get("input_file_path").asText());
93-
String canonicalPath = file.getCanonicalPath();
94-
bean.setInput_file_path(canonicalPath);
95-
bean.setTransform_param("\"" + bean.getTransform_param() + ",job-id=" + jobId + "\"");
98+
if (!"copy".equals(bean.getCommand().toLowerCase())) {
99+
// Assume that the HTTP credentials will work for mlcp
100+
bean.setUsername(hubConfig.getAppConfig().getAppServicesUsername());
101+
bean.setPassword(hubConfig.getAppConfig().getAppServicesPassword());
102+
}
103+
104+
if (mlcpOptions.has("input_file_path")) {
105+
File file = new File(mlcpOptions.get("input_file_path").asText());
106+
String canonicalPath = file.getCanonicalPath();
107+
bean.setInput_file_path(canonicalPath);
108+
}
109+
110+
if (job != null) {
111+
bean.setTransform_param("\"" + bean.getTransform_param() + ",job-id=" + jobId + "\"");
112+
}
113+
96114
bean.setModules_root("/");
97115

98116
if (hubConfig.getIsHostLoadBalancer()) {
@@ -108,28 +126,30 @@ public void run() {
108126
}
109127

110128
} catch (Exception e) {
111-
job.withStatus(JobStatus.FAILED)
112-
.withEndTime(new Date());
113-
jobManager.saveJob(job);
129+
if (job != null) {
130+
job.withStatus(JobStatus.FAILED)
131+
.withEndTime(new Date());
132+
jobManager.saveJob(job);
133+
}
114134
throw new RuntimeException(e);
115135
} finally {
116-
JobStatus status;
117-
if (failedEvents.get() > 0 && successfulEvents.get() > 0) {
118-
status = JobStatus.FINISHED_WITH_ERRORS;
119-
}
120-
else if (failedEvents.get() == 0 && successfulEvents.get() > 0) {
121-
status = JobStatus.FINISHED;
122-
}
123-
else {
124-
status = JobStatus.FAILED;
125-
}
136+
if (job != null) {
137+
JobStatus status;
138+
if (failedEvents.get() > 0 && successfulEvents.get() > 0) {
139+
status = JobStatus.FINISHED_WITH_ERRORS;
140+
} else if (failedEvents.get() == 0 && successfulEvents.get() > 0) {
141+
status = JobStatus.FINISHED;
142+
} else {
143+
status = JobStatus.FAILED;
144+
}
126145

127-
// store the thing in MarkLogic
128-
job.withJobOutput(getProcessOutput())
129-
.withStatus(status)
130-
.setCounts(successfulEvents.get(), failedEvents.get(), 0, 0)
131-
.withEndTime(new Date());
132-
jobManager.saveJob(job);
146+
// store the thing in MarkLogic
147+
job.withJobOutput(getProcessOutput())
148+
.withStatus(status)
149+
.setCounts(successfulEvents.get(), failedEvents.get(), 0, 0)
150+
.withEndTime(new Date());
151+
jobManager.saveJob(job);
152+
}
133153
}
134154
}
135155

@@ -152,8 +172,8 @@ private String buildLoggerconfig() {
152172
" <logger name=\"com.marklogic.client.impl.DatabaseClientImpl\" level=\"WARN\"/>\n" +
153173
" <logger name=\"com.marklogic\" level=\"INFO\"/>\n" +
154174
" <logger name=\"com.marklogic.appdeployer\" level=\"INFO\"/>\n" +
155-
" <logger name=\"com.marklogic.hub\" level=\"INFO\"/>\n" +
156-
" <logger name=\"com.marklogic.contentpump\" level=\"INFO\"/>\n" +
175+
" <logger name=\"com.marklogic.hub\" level=\"DEBUG\"/>\n" +
176+
" <logger name=\"com.marklogic.contentpump\" level=\"DEBUG\"/>\n" +
157177
" <logger name=\"org.apache.catalina.webresources.Cache\" level=\"ERROR\"/>\n" +
158178
" <logger name=\"org.apache.hadoop.util.Shell\" level=\"OFF\"/>\n" +
159179
" <logger name=\"org.apache.hadoop.util.NativeCodeLoader\" level=\"ERROR\"/>\n" +
@@ -211,7 +231,8 @@ else if (!mlcpFile.canExecute()) {
211231
u.contains("guava") ||
212232
u.contains("apache") ||
213233
u.contains("commons") ||
214-
u.contains("hadoop"))
234+
u.contains("hadoop") ||
235+
u.contains("thoughtworks"))
215236
).collect(Collectors.joining(File.pathSeparator));
216237

217238
//logger.warn("Classpath filtered to: " + filteredClasspathEntries);

marklogic-data-hub/src/main/resources/hub-internal-config/security/roles/data-hub-operator.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@
127127
"privilege-name": "sem:sparql",
128128
"action": "http://marklogic.com/xdmp/privileges/sem-sparql",
129129
"kind": "execute"
130+
},
131+
{
132+
"privilege-name": "xdmp:with-namespaces",
133+
"action": "http://marklogic.com/xdmp/privileges/xdmp-with-namespaces",
134+
"kind": "execute"
130135
}
131136
]
132137
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.marklogic.hub.mlcp;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.fasterxml.jackson.databind.node.ObjectNode;
5+
import com.marklogic.client.io.DocumentMetadataHandle;
6+
import com.marklogic.hub.ApplicationConfig;
7+
import com.marklogic.hub.DatabaseKind;
8+
import com.marklogic.hub.HubTestBase;
9+
import com.marklogic.hub.impl.HubConfigImpl;
10+
import com.marklogic.hub.util.MlcpRunner;
11+
import org.apache.commons.lang3.StringUtils;
12+
import org.apache.commons.lang3.exception.ExceptionUtils;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.springframework.test.context.ContextConfiguration;
16+
import org.springframework.test.context.junit.jupiter.SpringExtension;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
23+
@ExtendWith(SpringExtension.class)
24+
@ContextConfiguration(classes = ApplicationConfig.class)
25+
public class MlcpCopyTest extends HubTestBase {
26+
@Test
27+
public void testCopyAsOperator() {
28+
HubConfigImpl hubConfig = runAsFlowOperator();
29+
String username = hubConfig.getMlUsername();
30+
String password = hubConfig.getMlPassword();
31+
installStagingDoc("/test-doc.json", new DocumentMetadataHandle().withCollections("testMlcpCopy").withPermission(hubConfig.getFlowOperatorRoleName(), DocumentMetadataHandle.Capability.INSERT, DocumentMetadataHandle.Capability.UPDATE), "mlcp-test/test-doc.json");
32+
ObjectNode mlcpOptions = new ObjectMapper().createObjectNode();
33+
mlcpOptions.put("mode", "local");
34+
mlcpOptions.put("command", "copy");
35+
mlcpOptions.put("input_host", hubConfig.getHost());
36+
mlcpOptions.put("input_port", hubConfig.getPort(DatabaseKind.STAGING));
37+
mlcpOptions.put("input_database", hubConfig.getDbName(DatabaseKind.STAGING));
38+
mlcpOptions.put("input_username", username);
39+
mlcpOptions.put("input_password", password);
40+
mlcpOptions.put("output_host", hubConfig.getHost());
41+
mlcpOptions.put("output_port", hubConfig.getPort(DatabaseKind.FINAL));
42+
mlcpOptions.put("output_database", hubConfig.getDbName(DatabaseKind.FINAL));
43+
mlcpOptions.put("output_username", username);
44+
mlcpOptions.put("output_password", password);
45+
mlcpOptions.put("collection_filter", "testMlcpCopy");
46+
MlcpRunner mlcpRunner = new MlcpRunner("com.marklogic.hub.util.MlcpMain", hubConfig, mlcpOptions);
47+
final List<String> exceptionStackTraces = new ArrayList<>();
48+
mlcpRunner.setUncaughtExceptionHandler((th, ex) ->{
49+
logger.info("MLCP exception caught! " + ExceptionUtils.getMessage(ex));
50+
exceptionStackTraces.add(ExceptionUtils.getStackTrace(ex));
51+
});
52+
mlcpRunner.start();
53+
try {
54+
mlcpRunner.join();
55+
} catch (InterruptedException e) {
56+
throw new RuntimeException(e);
57+
}
58+
String processOutput = mlcpRunner.getProcessOutput();
59+
String message = "MLCP copy failed! Output: " + processOutput;
60+
if (exceptionStackTraces.size() > 0) {
61+
message += "\n" + StringUtils.join(exceptionStackTraces.toArray(), ',');
62+
}
63+
assertTrue(processOutput.contains("OUTPUT_RECORDS_COMMITTED: 1"), message);
64+
assertTrue(processOutput.contains("OUTPUT_RECORDS_FAILED: 0"), message);
65+
}
66+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"testing": "mlcp"
3+
}

0 commit comments

Comments
 (0)