Skip to content

Commit 1c652a2

Browse files
committed
Merge pull request #204 from maeisabelle/164-LoadDataImprovements
164 - Load data improvements
2 parents bdf26c3 + eb78e1d commit 1c652a2

File tree

13 files changed

+781
-287
lines changed

13 files changed

+781
-287
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import javax.xml.stream.XMLStreamException;
2323

24+
import org.codehaus.jettison.json.JSONException;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627
import org.springframework.batch.core.Job;
@@ -239,7 +240,7 @@ public void runInputFlow(Flow flow, HubConfig config) {
239240
mlcp.addSourceDirectory(config.modulesPath, sourceOptions);
240241
mlcp.loadContent();
241242
}
242-
catch (IOException e) {
243+
catch (IOException | JSONException e) {
243244
LOGGER.error(
244245
"Error encountered while trying to run flow: "
245246
+ flow.getEntityName() + " > " + flow.getName(), e);

marklogic-data-hub/src/main/java/com/marklogic/hub/Mlcp.java

Lines changed: 82 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,31 @@
1818
import java.io.File;
1919
import java.io.IOException;
2020
import java.util.ArrayList;
21+
import java.util.Iterator;
2122
import java.util.List;
2223

24+
import org.codehaus.jettison.json.JSONArray;
25+
import org.codehaus.jettison.json.JSONException;
26+
import org.codehaus.jettison.json.JSONObject;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
2529

2630
import com.marklogic.client.io.Format;
2731

2832
public class Mlcp {
29-
private static final Logger LOGGER = LoggerFactory.getLogger(Mlcp.class);
30-
31-
private final static String DEFAULT_HADOOP_HOME_DIR = "./hadoop/";
33+
34+
public static final String DOCUMENT_TYPE_KEY = "-document_type";
35+
public static final String INPUT_FILE_PATH_KEY = "-input_file_path";
36+
public static final String INPUT_FILE_TYPE_KEY = "-input_file_type";
37+
public static final String OUTPUT_URI_REPLACE_KEY = "-output_uri_replace";
38+
public static final String MODE_KEY = "-mode";
39+
public static final String HOST_KEY = "-host";
40+
public static final String PORT_KEY = "-port";
41+
public static final String USERNAME_KEY = "-username";
42+
public static final String PASSWORD_KEY = "-password";
43+
44+
private static final Logger LOGGER = LoggerFactory.getLogger(Mlcp.class);
45+
private static final String DEFAULT_HADOOP_HOME_DIR = "./hadoop/";
3246

3347
private List<MlcpSource> sources = new ArrayList<>();
3448

@@ -54,26 +68,10 @@ public void addSourceDirectory(String directoryPath, SourceOptions options) {
5468
sources.add(source);
5569
}
5670

57-
public void loadContent() throws IOException {
71+
public void loadContent() throws IOException, JSONException {
5872
for (MlcpSource source : sources) {
5973
try {
60-
List<String> arguments = new ArrayList<>();
61-
62-
arguments.add("import");
63-
arguments.add("-mode");
64-
arguments.add("local");
65-
arguments.add("-host");
66-
arguments.add(host);
67-
arguments.add("-port");
68-
arguments.add(Integer.toString(port));
69-
arguments.add("-username");
70-
arguments.add(user);
71-
arguments.add("-password");
72-
arguments.add(password);
73-
74-
// add arguments related to the source
75-
List<String> sourceArguments = source.getMlcpArguments();
76-
arguments.addAll(sourceArguments);
74+
List<String> arguments = getMlcpOptions(source);
7775

7876
LOGGER.info(arguments.toString());
7977
DataHubContentPump contentPump = new DataHubContentPump(arguments);
@@ -93,7 +91,7 @@ protected void setHadoopHomeDir() throws IOException {
9391
System.setProperty("hadoop.home.dir", new File(home).getCanonicalPath());
9492
}
9593

96-
private static class MlcpSource {
94+
public static class MlcpSource {
9795
private String sourcePath;
9896
private SourceOptions sourceOptions;
9997

@@ -106,68 +104,48 @@ public String getSourcePath() {
106104
return sourcePath;
107105
}
108106

109-
public List<String> getMlcpArguments() throws IOException {
107+
public List<String> getMlcpArguments() throws IOException, JSONException {
110108
File file = new File(sourcePath);
111109
String canonicalPath = file.getCanonicalPath();
112110

113111
List<String> arguments = new ArrayList<>();
114-
arguments.add("-generate_uri");
115-
arguments.add("true");
116-
117-
arguments.add("-input_file_path");
112+
113+
arguments.add(INPUT_FILE_PATH_KEY);
118114
arguments.add(canonicalPath);
119-
arguments.add("-input_file_type");
120-
if (sourceOptions.getInputFileType() == null) {
121-
arguments.add("documents");
122-
} else {
123-
arguments.add(sourceOptions.getInputFileType());
124-
}
125-
126-
if (sourceOptions.getInputFilePattern() != null) {
127-
arguments.add("-input_file_pattern");
128-
arguments.add(sourceOptions.getInputFilePattern());
129-
}
130-
131-
String collections = this.getOutputCollections();
132-
arguments.add("-output_collections");
133-
arguments.add("\"" + collections + "\"");
134-
135-
if (sourceOptions.getInputCompressed()) {
136-
arguments.add("-input_compressed");
115+
116+
arguments.add(OUTPUT_URI_REPLACE_KEY);
117+
arguments.add("\""+canonicalPath+",''\"");
118+
119+
arguments.add(INPUT_FILE_TYPE_KEY);
120+
arguments.add(sourceOptions.getInputFileType());
121+
122+
addOtherArguments(arguments, sourceOptions.getOtherOptions());
123+
124+
//add document type only if it does not exist in the list
125+
if(!arguments.contains(DOCUMENT_TYPE_KEY)) {
126+
arguments.add(DOCUMENT_TYPE_KEY);
127+
arguments.add(sourceOptions.getDataFormat());
137128
}
138-
139-
// by default, cut the source directory path to make URIs shorter
140-
String uriReplace = canonicalPath + ",''";
141-
uriReplace = uriReplace.replaceAll("\\\\", "/");
142-
143-
arguments.add("-output_uri_replace");
144-
arguments.add("\"" + uriReplace + "\"");
145-
146-
arguments.add("-document_type");
147-
arguments.add(sourceOptions.getDataFormat());
148-
149-
arguments.add("-transform_module");
150-
arguments.add("/com.marklogic.hub/mlcp-flow-transform.xqy");
151-
arguments.add("-transform_namespace");
152-
arguments.add("http://marklogic.com/data-hub/mlcp-flow-transform");
153-
arguments.add("-transform_param");
154-
arguments.add("\"" + sourceOptions.getTransformParams() + "\"");
129+
155130
return arguments;
156131
}
157132

158-
private String getOutputCollections() {
159-
StringBuilder collectionsBuilder = new StringBuilder();
160-
collectionsBuilder.append(sourceOptions.getEntityName());
161-
collectionsBuilder.append(",");
162-
collectionsBuilder.append(sourceOptions.getFlowName());
163-
collectionsBuilder.append(",");
164-
collectionsBuilder.append(sourceOptions.getFlowType());
165-
if (sourceOptions.getCollection() != null) {
166-
collectionsBuilder.append(",");
167-
collectionsBuilder.append(sourceOptions.getCollection());
168-
}
169-
return collectionsBuilder.toString();
170-
}
133+
private void addOtherArguments(List<String> arguments,
134+
String otherOptions) throws JSONException {
135+
JSONArray jsonArray = new JSONArray(otherOptions);
136+
for (int i = 0; i < jsonArray.length(); i++) {
137+
JSONObject jsonObject = jsonArray.getJSONObject(i);
138+
@SuppressWarnings("rawtypes")
139+
Iterator keysIterator = jsonObject.keys();
140+
while(keysIterator.hasNext()) {
141+
String key = (String)keysIterator.next();
142+
arguments.add(key);
143+
arguments.add(jsonObject.getString(key));
144+
}
145+
146+
}
147+
148+
}
171149
}
172150

173151
public static class SourceOptions {
@@ -176,9 +154,7 @@ public static class SourceOptions {
176154
private String flowType;
177155
private String dataFormat = "json";
178156
private String inputFileType;
179-
private String inputFilePattern;
180-
private String collection;
181-
private boolean inputCompressed = false;
157+
private String otherOptions;
182158

183159
public SourceOptions(String entityName, String flowName, String flowType, Format dataFormat) {
184160
this.entityName = entityName;
@@ -215,35 +191,34 @@ public String getInputFileType() {
215191
public void setInputFileType(String inputFileType) {
216192
this.inputFileType = inputFileType;
217193
}
194+
195+
public String getOtherOptions() {
196+
return otherOptions;
197+
}
218198

219-
public String getInputFilePattern() {
220-
return inputFilePattern;
221-
}
222-
223-
public void setInputFilePattern(String inputFilePattern) {
224-
this.inputFilePattern = inputFilePattern;
225-
}
226-
227-
public String getCollection() {
228-
return collection;
229-
}
230-
231-
public void setCollection(String collection) {
232-
this.collection = collection;
233-
}
234-
235-
public void setInputCompressed(boolean inputCompressed) {
236-
this.inputCompressed = inputCompressed;
237-
}
238-
239-
public boolean getInputCompressed() {
240-
return this.inputCompressed;
241-
}
242-
243-
protected String getTransformParams() {
244-
return String.format(
245-
"<params><entity-name>%s</entity-name><flow-name>%s</flow-name><flow-type>%s</flow-type></params>",
246-
entityName, flowName, flowType);
247-
}
199+
public void setOtherOptions(String otherOptions) {
200+
this.otherOptions = otherOptions;
201+
}
248202
}
203+
204+
public List<String> getMlcpOptions(MlcpSource source) throws IOException, JSONException {
205+
List<String> mlcpOptions = new ArrayList<>();
206+
207+
mlcpOptions.add("import");
208+
mlcpOptions.add(MODE_KEY);
209+
mlcpOptions.add("local");
210+
mlcpOptions.add(HOST_KEY);
211+
mlcpOptions.add(host);
212+
mlcpOptions.add(PORT_KEY);
213+
mlcpOptions.add(Integer.toString(port));
214+
mlcpOptions.add(USERNAME_KEY);
215+
mlcpOptions.add(user);
216+
mlcpOptions.add(PASSWORD_KEY);
217+
mlcpOptions.add(password);
218+
219+
List<String> sourceArguments = source.getMlcpArguments();
220+
mlcpOptions.addAll(sourceArguments);
221+
222+
return mlcpOptions;
223+
}
249224
}

quick-start/src/main/java/com/marklogic/hub/config/EnvironmentConfiguration.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.marklogic.hub.config;
22

3+
import java.io.BufferedWriter;
34
import java.io.File;
45
import java.io.FileInputStream;
56
import java.io.FileNotFoundException;
67
import java.io.FileOutputStream;
8+
import java.io.FileWriter;
79
import java.io.IOException;
810
import java.io.InputStream;
911
import java.io.OutputStream;
@@ -15,7 +17,8 @@
1517
import org.springframework.core.env.Environment;
1618
import org.springframework.stereotype.Component;
1719

18-
import com.marklogic.hub.DataHub;
20+
import com.google.common.base.Charsets;
21+
import com.google.common.io.Files;
1922
import com.marklogic.hub.HubConfig;
2023

2124
/***
@@ -31,7 +34,6 @@ public class EnvironmentConfiguration {
3134
.getLogger(EnvironmentConfiguration.class);
3235

3336
private static final String ENVIRONMENT_PROPERTIES_FILENAME = "environment.properties";
34-
private static final String FLOW_PROPERTIES_FILENAME = "flow.properties";
3537
private static final String DEFAULT_SUFFIX = ".default";
3638
private static final String SERVER_PORT = "server.port";
3739
private static final String ML_HOST = "mlHost";
@@ -43,12 +45,12 @@ public class EnvironmentConfiguration {
4345
private static final String ML_AUTH = "mlAuth";
4446
private static final String USER_PLUGIN_DIR = "userPluginDir";
4547
private static final String ASSET_INSTALL_TIME_FILE = "assetInstallTimeFile";
48+
private static final String MLCP_OPTIONS_DIR = "mlcp-options";
4649

4750
@Autowired
4851
private Environment environment;
4952

5053
private Properties environmentProperties = new Properties();
51-
private Properties flowProperties = new Properties();
5254

5355
public String getServerPort() {
5456
return this.environment.getProperty(SERVER_PORT);
@@ -205,7 +207,6 @@ public void setAssetInstallTimeFilePath(String assetInstallTimeFilePath) {
205207

206208
public void loadConfigurationFromFiles() {
207209
loadConfigurationFromFile(environmentProperties, ENVIRONMENT_PROPERTIES_FILENAME);
208-
loadConfigurationFromFile(environmentProperties, FLOW_PROPERTIES_FILENAME);
209210
}
210211

211212
public void loadConfigurationFromFile(Properties configProperties, String fileName) {
@@ -245,13 +246,25 @@ private void saveConfigurationToFile(Properties configProperties, String fileNam
245246
}
246247
}
247248

248-
public void saveOrUpdateFlowInputPath(String entityName, String flowName, String inputPath) {
249-
this.flowProperties.setProperty(entityName + "-" + flowName, inputPath);
250-
saveConfigurationToFile(flowProperties, FLOW_PROPERTIES_FILENAME);
249+
public void saveOrUpdateFlowMlcpOptionsToFile(String entityName, String flowName, String mlcpOptionsFileContent) throws IOException {
250+
String filePath = getMlcpOptionsFilePath(entityName, flowName);
251+
FileWriter fw = new FileWriter(filePath);
252+
BufferedWriter bw = new BufferedWriter(fw);
253+
bw.write(mlcpOptionsFileContent);
254+
bw.close();
251255
}
252256

253-
public String getFlowInputPath(String entityName, String flowName) {
254-
return this.flowProperties.getProperty(entityName + "-" + flowName);
257+
private String getMlcpOptionsFilePath(String entityName, String flowName) {
258+
return "." + File.separator + MLCP_OPTIONS_DIR + File.separator + entityName + "-" + flowName + ".txt";
259+
}
260+
261+
public String getFlowMlcpOptionsFromFile(String entityName, String flowName) throws IOException {
262+
String filePath = getMlcpOptionsFilePath(entityName, flowName);
263+
File file = new File(filePath);
264+
if(file.exists()) {
265+
return Files.toString(file, Charsets.UTF_8);
266+
}
267+
return null;
255268
}
256269

257270
public HubConfig getHubConfig() {

0 commit comments

Comments
 (0)