1818import java .io .File ;
1919import java .io .IOException ;
2020import java .util .ArrayList ;
21+ import java .util .Iterator ;
2122import java .util .List ;
2223
24+ import org .codehaus .jettison .json .JSONArray ;
25+ import org .codehaus .jettison .json .JSONException ;
26+ import org .codehaus .jettison .json .JSONObject ;
2327import org .slf4j .Logger ;
2428import org .slf4j .LoggerFactory ;
2529
2630import com .marklogic .client .io .Format ;
2731
2832public class Mlcp {
29- private static final Logger LOGGER = LoggerFactory .getLogger (Mlcp .class );
30-
31- private final static String DEFAULT_HADOOP_HOME_DIR = "./hadoop/" ;
33+
34+ public static final String DOCUMENT_TYPE_KEY = "-document_type" ;
35+ public static final String INPUT_FILE_PATH_KEY = "-input_file_path" ;
36+ public static final String INPUT_FILE_TYPE_KEY = "-input_file_type" ;
37+ public static final String OUTPUT_URI_REPLACE_KEY = "-output_uri_replace" ;
38+ public static final String MODE_KEY = "-mode" ;
39+ public static final String HOST_KEY = "-host" ;
40+ public static final String PORT_KEY = "-port" ;
41+ public static final String USERNAME_KEY = "-username" ;
42+ public static final String PASSWORD_KEY = "-password" ;
43+
44+ private static final Logger LOGGER = LoggerFactory .getLogger (Mlcp .class );
45+ private static final String DEFAULT_HADOOP_HOME_DIR = "./hadoop/" ;
3246
3347 private List <MlcpSource > sources = new ArrayList <>();
3448
@@ -54,26 +68,10 @@ public void addSourceDirectory(String directoryPath, SourceOptions options) {
5468 sources .add (source );
5569 }
5670
57- public void loadContent () throws IOException {
71+ public void loadContent () throws IOException , JSONException {
5872 for (MlcpSource source : sources ) {
5973 try {
60- List <String > arguments = new ArrayList <>();
61-
62- arguments .add ("import" );
63- arguments .add ("-mode" );
64- arguments .add ("local" );
65- arguments .add ("-host" );
66- arguments .add (host );
67- arguments .add ("-port" );
68- arguments .add (Integer .toString (port ));
69- arguments .add ("-username" );
70- arguments .add (user );
71- arguments .add ("-password" );
72- arguments .add (password );
73-
74- // add arguments related to the source
75- List <String > sourceArguments = source .getMlcpArguments ();
76- arguments .addAll (sourceArguments );
74+ List <String > arguments = getMlcpOptions (source );
7775
7876 LOGGER .info (arguments .toString ());
7977 DataHubContentPump contentPump = new DataHubContentPump (arguments );
@@ -93,7 +91,7 @@ protected void setHadoopHomeDir() throws IOException {
9391 System .setProperty ("hadoop.home.dir" , new File (home ).getCanonicalPath ());
9492 }
9593
96- private static class MlcpSource {
94+ public static class MlcpSource {
9795 private String sourcePath ;
9896 private SourceOptions sourceOptions ;
9997
@@ -106,68 +104,48 @@ public String getSourcePath() {
106104 return sourcePath ;
107105 }
108106
109- public List <String > getMlcpArguments () throws IOException {
107+ public List <String > getMlcpArguments () throws IOException , JSONException {
110108 File file = new File (sourcePath );
111109 String canonicalPath = file .getCanonicalPath ();
112110
113111 List <String > arguments = new ArrayList <>();
114- arguments .add ("-generate_uri" );
115- arguments .add ("true" );
116-
117- arguments .add ("-input_file_path" );
112+
113+ arguments .add (INPUT_FILE_PATH_KEY );
118114 arguments .add (canonicalPath );
119- arguments .add ("-input_file_type" );
120- if (sourceOptions .getInputFileType () == null ) {
121- arguments .add ("documents" );
122- } else {
123- arguments .add (sourceOptions .getInputFileType ());
124- }
125-
126- if (sourceOptions .getInputFilePattern () != null ) {
127- arguments .add ("-input_file_pattern" );
128- arguments .add (sourceOptions .getInputFilePattern ());
129- }
130-
131- String collections = this .getOutputCollections ();
132- arguments .add ("-output_collections" );
133- arguments .add ("\" " + collections + "\" " );
134-
135- if (sourceOptions .getInputCompressed ()) {
136- arguments .add ("-input_compressed" );
115+
116+ arguments .add (OUTPUT_URI_REPLACE_KEY );
117+ arguments .add ("\" " +canonicalPath +",''\" " );
118+
119+ arguments .add (INPUT_FILE_TYPE_KEY );
120+ arguments .add (sourceOptions .getInputFileType ());
121+
122+ addOtherArguments (arguments , sourceOptions .getOtherOptions ());
123+
124+ //add document type only if it does not exist in the list
125+ if (!arguments .contains (DOCUMENT_TYPE_KEY )) {
126+ arguments .add (DOCUMENT_TYPE_KEY );
127+ arguments .add (sourceOptions .getDataFormat ());
137128 }
138-
139- // by default, cut the source directory path to make URIs shorter
140- String uriReplace = canonicalPath + ",''" ;
141- uriReplace = uriReplace .replaceAll ("\\ \\ " , "/" );
142-
143- arguments .add ("-output_uri_replace" );
144- arguments .add ("\" " + uriReplace + "\" " );
145-
146- arguments .add ("-document_type" );
147- arguments .add (sourceOptions .getDataFormat ());
148-
149- arguments .add ("-transform_module" );
150- arguments .add ("/com.marklogic.hub/mlcp-flow-transform.xqy" );
151- arguments .add ("-transform_namespace" );
152- arguments .add ("http://marklogic.com/data-hub/mlcp-flow-transform" );
153- arguments .add ("-transform_param" );
154- arguments .add ("\" " + sourceOptions .getTransformParams () + "\" " );
129+
155130 return arguments ;
156131 }
157132
158- private String getOutputCollections () {
159- StringBuilder collectionsBuilder = new StringBuilder ();
160- collectionsBuilder .append (sourceOptions .getEntityName ());
161- collectionsBuilder .append ("," );
162- collectionsBuilder .append (sourceOptions .getFlowName ());
163- collectionsBuilder .append ("," );
164- collectionsBuilder .append (sourceOptions .getFlowType ());
165- if (sourceOptions .getCollection () != null ) {
166- collectionsBuilder .append ("," );
167- collectionsBuilder .append (sourceOptions .getCollection ());
168- }
169- return collectionsBuilder .toString ();
170- }
133+ private void addOtherArguments (List <String > arguments ,
134+ String otherOptions ) throws JSONException {
135+ JSONArray jsonArray = new JSONArray (otherOptions );
136+ for (int i = 0 ; i < jsonArray .length (); i ++) {
137+ JSONObject jsonObject = jsonArray .getJSONObject (i );
138+ @ SuppressWarnings ("rawtypes" )
139+ Iterator keysIterator = jsonObject .keys ();
140+ while (keysIterator .hasNext ()) {
141+ String key = (String )keysIterator .next ();
142+ arguments .add (key );
143+ arguments .add (jsonObject .getString (key ));
144+ }
145+
146+ }
147+
148+ }
171149 }
172150
173151 public static class SourceOptions {
@@ -176,9 +154,7 @@ public static class SourceOptions {
176154 private String flowType ;
177155 private String dataFormat = "json" ;
178156 private String inputFileType ;
179- private String inputFilePattern ;
180- private String collection ;
181- private boolean inputCompressed = false ;
157+ private String otherOptions ;
182158
183159 public SourceOptions (String entityName , String flowName , String flowType , Format dataFormat ) {
184160 this .entityName = entityName ;
@@ -215,35 +191,34 @@ public String getInputFileType() {
215191 public void setInputFileType (String inputFileType ) {
216192 this .inputFileType = inputFileType ;
217193 }
194+
195+ public String getOtherOptions () {
196+ return otherOptions ;
197+ }
218198
219- public String getInputFilePattern () {
220- return inputFilePattern ;
221- }
222-
223- public void setInputFilePattern (String inputFilePattern ) {
224- this .inputFilePattern = inputFilePattern ;
225- }
226-
227- public String getCollection () {
228- return collection ;
229- }
230-
231- public void setCollection (String collection ) {
232- this .collection = collection ;
233- }
234-
235- public void setInputCompressed (boolean inputCompressed ) {
236- this .inputCompressed = inputCompressed ;
237- }
238-
239- public boolean getInputCompressed () {
240- return this .inputCompressed ;
241- }
242-
243- protected String getTransformParams () {
244- return String .format (
245- "<params><entity-name>%s</entity-name><flow-name>%s</flow-name><flow-type>%s</flow-type></params>" ,
246- entityName , flowName , flowType );
247- }
199+ public void setOtherOptions (String otherOptions ) {
200+ this .otherOptions = otherOptions ;
201+ }
248202 }
203+
204+ public List <String > getMlcpOptions (MlcpSource source ) throws IOException , JSONException {
205+ List <String > mlcpOptions = new ArrayList <>();
206+
207+ mlcpOptions .add ("import" );
208+ mlcpOptions .add (MODE_KEY );
209+ mlcpOptions .add ("local" );
210+ mlcpOptions .add (HOST_KEY );
211+ mlcpOptions .add (host );
212+ mlcpOptions .add (PORT_KEY );
213+ mlcpOptions .add (Integer .toString (port ));
214+ mlcpOptions .add (USERNAME_KEY );
215+ mlcpOptions .add (user );
216+ mlcpOptions .add (PASSWORD_KEY );
217+ mlcpOptions .add (password );
218+
219+ List <String > sourceArguments = source .getMlcpArguments ();
220+ mlcpOptions .addAll (sourceArguments );
221+
222+ return mlcpOptions ;
223+ }
249224}
0 commit comments