33import com .marklogic .client .DatabaseClient ;
44import com .marklogic .client .datamovement .*;
55import com .marklogic .client .ext .datamovement .*;
6+ import com .marklogic .client .ext .datamovement .listener .SimpleBatchLoggingListener ;
67
78import java .util .ArrayList ;
89import java .util .Arrays ;
910import java .util .List ;
11+ import java .util .Properties ;
12+ import java .util .function .Consumer ;
1013
1114/**
1215 * Provides basic plumbing for implementing QueryBatcherJob.
1316 */
14- public abstract class AbstractQueryBatcherJob extends BatcherConfig implements QueryBatcherJob {
17+ public abstract class AbstractQueryBatcherJob extends BatcherConfig implements QueryBatcherJob , ConfigurableJob {
18+
19+ private List <JobProperty > jobProperties = new ArrayList <>();
1520
1621 private List <QueryBatchListener > urisReadyListeners ;
1722 private List <QueryFailureListener > queryFailureListeners ;
1823
19- private boolean applyConsistentSnapshot = false ;
24+ private boolean consistentSnapshot = false ;
2025 private boolean awaitCompletion = true ;
2126 private boolean stopJobAfterCompletion = true ;
2227
@@ -30,12 +35,17 @@ public abstract class AbstractQueryBatcherJob extends BatcherConfig implements Q
3035 private String [] whereCollections ;
3136 private String whereUriPattern ;
3237 private String whereUrisQuery ;
38+ private boolean requireWhereProperty = true ;
3339
3440 /**
3541 * @return a description of the job that is useful for logging purposes.
3642 */
3743 protected abstract String getJobDescription ();
3844
45+ protected AbstractQueryBatcherJob () {
46+ addQueryBatcherJobProperties ();
47+ }
48+
3949 @ Override
4050 public QueryBatcherJobTicket run (DatabaseClient databaseClient ) {
4151 DataMovementManager dmm = this .dataMovementManager != null ? this .dataMovementManager : databaseClient .newDataMovementManager ();
@@ -65,6 +75,77 @@ public QueryBatcherJobTicket run(DatabaseClient databaseClient) {
6575 return new QueryBatcherJobTicket (dmm , queryBatcher , jobTicket );
6676 }
6777
78+ @ Override
79+ public List <String > configureJob (Properties props ) {
80+ List <String > messages = new ArrayList <>();
81+
82+ for (JobProperty jobProperty : this .jobProperties ) {
83+ String name = jobProperty .getPropertyName ();
84+ String value = props .getProperty (name );
85+ if (value != null && value .trim ().length () > 0 ) {
86+ jobProperty .getPropertyValueConsumer ().accept (value );
87+ } else if (jobProperty .isRequired ()) {
88+ messages .add ("The property '" + name + "' is required" );
89+ }
90+ }
91+
92+ if (requireWhereProperty && !isWherePropertySet () && queryBatcherBuilder == null ) {
93+ messages .add ("At least one 'where' property must be set for selecting records to process" );
94+ }
95+
96+ return messages ;
97+ }
98+
99+ @ Override
100+ public List <JobProperty > getJobProperties () {
101+ return jobProperties ;
102+ }
103+
104+ protected void addQueryBatcherJobProperties () {
105+ addJobProperty ("batchSize" , "Number of records to process at once; defaults to " + DEFAULT_BATCH_SIZE ,
106+ value -> setBatchSize (Integer .parseInt (value )));
107+
108+ addJobProperty ("consistentSnapshot" , "Whether or not to apply a consistent snapshot to the query for records; defaults to false" ,
109+ value -> setConsistentSnapshot (Boolean .parseBoolean (value )));
110+
111+ addJobProperty ("jobName" , "Optional name for the Data Movement job" , value -> setJobName (value ));
112+
113+ addJobProperty ("logBatches" , "Log each batch to stdout as it's processed" ,
114+ value -> addUrisReadyListener (new SimpleBatchLoggingListener ()));
115+
116+ addJobProperty ("logBatchesWithLogger" , "Log each batch as it's processed at the info-level using SLF4J" ,
117+ value -> addUrisReadyListener (new SimpleBatchLoggingListener (true )));
118+
119+ addJobProperty ("threadCount" , "Number of threads to process records with; default to " + DEFAULT_THREAD_COUNT ,
120+ value -> setThreadCount (Integer .parseInt (value )));
121+
122+ addWhereJobProperties ();
123+ }
124+
125+ protected void addWhereJobProperties () {
126+ addJobProperty ("whereCollections" , "Comma-delimited list of collections for selecting records to process" ,
127+ value -> setWhereCollections (value .split ("," )));
128+
129+ addJobProperty ("whereUriPattern" , "URI pattern for selecting records to process" ,
130+ value -> setWhereUriPattern (value ));
131+
132+ addJobProperty ("whereUris" , "Comma-delimited list of URIs for selecting records to process" ,
133+ value -> setWhereUris (value .split ("," )));
134+
135+ addJobProperty ("whereUrisQuery" , "CTS URIs query for selecting records to process" ,
136+ value -> setWhereUrisQuery (value ));
137+ }
138+
139+ protected void addJobProperty (String name , String description , Consumer <String > propertyValueConsumer ) {
140+ jobProperties .add (new SimpleJobProperty (name , description , propertyValueConsumer ));
141+ }
142+
143+ protected void addRequiredJobProperty (String name , String description , Consumer <String > propertyValueConsumer ) {
144+ SimpleJobProperty prop = new SimpleJobProperty (name , description , propertyValueConsumer );
145+ prop .setRequired (true );
146+ jobProperties .add (prop );
147+ }
148+
68149 /**
69150 * Can be overridden by the subclass to prepare the QueryBatcher before the job is started.
70151 *
@@ -73,7 +154,7 @@ public QueryBatcherJobTicket run(DatabaseClient databaseClient) {
73154 protected void prepareQueryBatcher (QueryBatcher queryBatcher ) {
74155 super .prepareBatcher (queryBatcher );
75156
76- if (applyConsistentSnapshot ) {
157+ if (consistentSnapshot ) {
77158 queryBatcher .withConsistentSnapshot ();
78159 }
79160
@@ -91,30 +172,26 @@ protected void prepareQueryBatcher(QueryBatcher queryBatcher) {
91172 }
92173
93174 /**
94- * For subclasses to use to construct a QueryBatcherBuilder based on the "where" properties that have been set.
95- *
96175 * @return
97176 */
98177 protected QueryBatcherBuilder newQueryBatcherBuilder () {
99178 if (queryBatcherBuilder != null ) {
100179 return queryBatcherBuilder ;
101180 }
102181
103- if (isWherePropertySet ()) {
104- if (whereUris != null && whereUris .length > 0 ) {
105- return new DocumentUrisQueryBatcherBuilder (whereUris );
106- }
107- if (whereCollections != null ) {
108- return new CollectionsQueryBatcherBuilder (whereCollections );
109- }
110- if (whereUriPattern != null ) {
111- return new UriPatternQueryBatcherBuilder (whereUriPattern );
112- }
113- if (whereUrisQuery != null ) {
114- return new UrisQueryQueryBatcherBuilder (whereUrisQuery );
115- }
182+ if (whereUris != null && whereUris .length > 0 ) {
183+ return new DocumentUrisQueryBatcherBuilder (whereUris );
184+ }
185+ if (whereCollections != null ) {
186+ return new CollectionsQueryBatcherBuilder (whereCollections );
187+ }
188+ if (whereUriPattern != null ) {
189+ return new UriPatternQueryBatcherBuilder (whereUriPattern );
190+ }
191+ if (whereUrisQuery != null ) {
192+ return new UrisQueryQueryBatcherBuilder (whereUrisQuery );
116193 }
117- throw new IllegalArgumentException ( "No 'where' property has been set, unable to construct a QueryBatcherBuilder" ) ;
194+ return null ;
118195 }
119196
120197 /**
@@ -126,20 +203,20 @@ protected String getQueryDescription() {
126203 return "with custom query" ;
127204 }
128205
129- if (isWherePropertySet ()) {
130- if (whereUris != null && whereUris .length > 0 ) {
131- return "with URIs " + Arrays .asList (whereUris );
132- } else if (whereCollections != null && whereCollections .length > 0 ) {
133- return "in collections " + Arrays .asList (this .whereCollections );
134- } else if (whereUriPattern != null ) {
135- return "matching URI pattern [" + whereUriPattern + "]" ;
136- } else if (whereUrisQuery != null ) {
137- return "matching URIs query [" + whereUrisQuery + "]" ;
138- }
206+ if (whereUris != null && whereUris .length > 0 ) {
207+ return "with URIs " + Arrays .asList (whereUris );
208+ } else if (whereCollections != null && whereCollections .length > 0 ) {
209+ return "in collections " + Arrays .asList (this .whereCollections );
210+ } else if (whereUriPattern != null ) {
211+ return "matching URI pattern [" + whereUriPattern + "]" ;
212+ } else if (whereUrisQuery != null ) {
213+ return "matching URIs query [" + whereUrisQuery + "]" ;
139214 }
140- throw new IllegalArgumentException ("No 'where' property has been set, unable to construct a description of the query" );
215+
216+ return null ;
141217 }
142218
219+
143220 protected boolean isWherePropertySet () {
144221 return
145222 (whereUris != null && whereUris .length > 0 )
@@ -217,12 +294,12 @@ public AbstractQueryBatcherJob setQueryFailureListeners(List<QueryFailureListene
217294 return this ;
218295 }
219296
220- public boolean isApplyConsistentSnapshot () {
221- return applyConsistentSnapshot ;
297+ public boolean isConsistentSnapshot () {
298+ return consistentSnapshot ;
222299 }
223300
224- public AbstractQueryBatcherJob setApplyConsistentSnapshot (boolean applyConsistentSnapshot ) {
225- this .applyConsistentSnapshot = applyConsistentSnapshot ;
301+ public AbstractQueryBatcherJob setConsistentSnapshot (boolean consistentSnapshot ) {
302+ this .consistentSnapshot = consistentSnapshot ;
226303 return this ;
227304 }
228305
@@ -253,4 +330,8 @@ public AbstractQueryBatcherJob setQueryBatcherBuilder(QueryBatcherBuilder queryB
253330 this .queryBatcherBuilder = queryBatcherBuilder ;
254331 return this ;
255332 }
333+
334+ public void setRequireWhereProperty (boolean requireWhereProperty ) {
335+ this .requireWhereProperty = requireWhereProperty ;
336+ }
256337}
0 commit comments