Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit 9f81d59

Browse files
committed
Can now configure all parts of a QueryBatcher
Added testing of QBT properties
1 parent 1efe41a commit 9f81d59

File tree

2 files changed

+82
-34
lines changed

2 files changed

+82
-34
lines changed

src/main/java/com/marklogic/client/ext/datamovement/QueryBatcherTemplate.java

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package com.marklogic.client.ext.datamovement;
22

33
import com.marklogic.client.DatabaseClient;
4-
import com.marklogic.client.datamovement.DataMovementManager;
5-
import com.marklogic.client.datamovement.JobTicket;
6-
import com.marklogic.client.datamovement.QueryBatchListener;
7-
import com.marklogic.client.datamovement.QueryBatcher;
4+
import com.marklogic.client.datamovement.*;
85
import com.marklogic.client.ext.helper.LoggingObject;
96
import com.marklogic.client.query.RawCombinedQueryDefinition;
107
import com.marklogic.client.query.RawStructuredQueryDefinition;
@@ -26,6 +23,10 @@ public class QueryBatcherTemplate extends LoggingObject {
2623
private boolean applyConsistentSnapshot = true;
2724
private boolean awaitCompletion = true;
2825
private boolean stopJob = true;
26+
private String jobName;
27+
private ForestConfiguration forestConfig;
28+
private QueryFailureListener[] queryFailureListeners;
29+
private QueryBatchListener[] urisReadyListeners;
2930

3031
public QueryBatcherTemplate(DatabaseClient databaseClient) {
3132
this.databaseClient = databaseClient;
@@ -35,107 +36,111 @@ public QueryBatcherTemplate(DatabaseClient databaseClient) {
3536
/**
3637
* Apply the given listener on batches of documents from the given set of collection URIs.
3738
*
38-
* @param listener
39+
* @param urisReadyListener
3940
* @param collectionUris
4041
* @return
4142
*/
42-
public QueryBatcherJobTicket applyOnCollections(QueryBatchListener listener, String... collectionUris) {
43-
return apply(listener, new CollectionsQueryBatcherBuilder(collectionUris));
43+
public QueryBatcherJobTicket applyOnCollections(QueryBatchListener urisReadyListener, String... collectionUris) {
44+
return apply(urisReadyListener, new CollectionsQueryBatcherBuilder(collectionUris));
4445
}
4546

4647
/**
4748
* Apply the given listener on batches of documents from the given set of document URIs.
4849
*
49-
* @param listener
50+
* @param urisReadyListener
5051
* @param documentUris
5152
* @return
5253
*/
53-
public QueryBatcherJobTicket applyOnDocuments(QueryBatchListener listener, String... documentUris) {
54-
return apply(listener, new UrisQueryBatcherBuilder(documentUris));
54+
public QueryBatcherJobTicket applyOnDocuments(QueryBatchListener urisReadyListener, String... documentUris) {
55+
return apply(urisReadyListener, new UrisQueryBatcherBuilder(documentUris));
5556
}
5657

5758
/**
5859
* Apply the given listener on batches of documents with URIs matching the given URI pattern.
5960
*
60-
* @param listener
61+
* @param urisReadyListener
6162
* @param uriPattern
6263
* @return
6364
*/
64-
public QueryBatcherJobTicket applyOnUriPattern(QueryBatchListener listener, String uriPattern) {
65-
return apply(listener, new UriPatternQueryBatcherBuilder(uriPattern));
65+
public QueryBatcherJobTicket applyOnUriPattern(QueryBatchListener urisReadyListener, String uriPattern) {
66+
return apply(urisReadyListener, new UriPatternQueryBatcherBuilder(uriPattern));
6667
}
6768

6869
/**
6970
* Apply the given listener on batches on documents matching the given structured query.
7071
*
71-
* @param listener
72+
* @param urisReadyListener
7273
* @param queryDefinition
7374
* @return
7475
*/
75-
public QueryBatcherJobTicket applyOnStructuredQuery(QueryBatchListener listener, StructuredQueryDefinition queryDefinition) {
76-
return apply(listener, dataMovementManager.newQueryBatcher(queryDefinition));
76+
public QueryBatcherJobTicket applyOnStructuredQuery(QueryBatchListener urisReadyListener, StructuredQueryDefinition queryDefinition) {
77+
return apply(urisReadyListener, dataMovementManager.newQueryBatcher(queryDefinition));
7778
}
7879

7980
/**
8081
* Apply the given listener on batches on documents matching the given raw structured query.
8182
*
82-
* @param listener
83+
* @param urisReadyListener
8384
* @param queryDefinition
8485
* @return
8586
*/
86-
public QueryBatcherJobTicket applyOnRawStructuredQuery(QueryBatchListener listener, RawStructuredQueryDefinition queryDefinition) {
87-
return apply(listener, dataMovementManager.newQueryBatcher(queryDefinition));
87+
public QueryBatcherJobTicket applyOnRawStructuredQuery(QueryBatchListener urisReadyListener, RawStructuredQueryDefinition queryDefinition) {
88+
return apply(urisReadyListener, dataMovementManager.newQueryBatcher(queryDefinition));
8889
}
8990

9091
/**
9192
* Apply the given listener on batches on documents matching the given string query.
9293
*
93-
* @param listener
94+
* @param urisReadyListener
9495
* @param queryDefinition
9596
* @return
9697
*/
97-
public QueryBatcherJobTicket applyOnStringQuery(QueryBatchListener listener, StringQueryDefinition queryDefinition) {
98-
return apply(listener, dataMovementManager.newQueryBatcher(queryDefinition));
98+
public QueryBatcherJobTicket applyOnStringQuery(QueryBatchListener urisReadyListener, StringQueryDefinition queryDefinition) {
99+
return apply(urisReadyListener, dataMovementManager.newQueryBatcher(queryDefinition));
99100
}
100101

101102
/**
102103
* Apply the given listener on batches on documents matching the given raw combined query.
103104
*
104-
* @param listener
105+
* @param urisReadyListener
105106
* @param queryDefinition
106107
* @return
107108
*/
108-
public QueryBatcherJobTicket applyOnRawCombinedQuery(QueryBatchListener listener, RawCombinedQueryDefinition queryDefinition) {
109-
return apply(listener, dataMovementManager.newQueryBatcher(queryDefinition));
109+
public QueryBatcherJobTicket applyOnRawCombinedQuery(QueryBatchListener urisReadyListener, RawCombinedQueryDefinition queryDefinition) {
110+
return apply(urisReadyListener, dataMovementManager.newQueryBatcher(queryDefinition));
110111
}
111112

112113
/**
113114
* Apply the given listener on batches on documents matching the URIs from the given iterator.
114115
*
115-
* @param listener
116+
* @param urisReadyListener
116117
* @param uriIterator
117118
* @return
118119
*/
119-
public QueryBatcherJobTicket applyOnIterator(QueryBatchListener listener, Iterator<String> uriIterator) {
120-
return apply(listener, dataMovementManager.newQueryBatcher(uriIterator));
120+
public QueryBatcherJobTicket applyOnIterator(QueryBatchListener urisReadyListener, Iterator<String> uriIterator) {
121+
return apply(urisReadyListener, dataMovementManager.newQueryBatcher(uriIterator));
121122
}
122123

123124
/**
124125
* Apply the given listener on batches of documents returning by the QueryBatcher that's constructed by the
125126
* given QueryBatcherBuilder.
126127
*
127-
* @param listener
128+
* @param urisReadyListener
128129
* @param queryBatcherBuilder
129130
* @return
130131
*/
131-
public QueryBatcherJobTicket apply(QueryBatchListener listener, QueryBatcherBuilder queryBatcherBuilder) {
132-
return apply(listener, queryBatcherBuilder.buildQueryBatcher(databaseClient, dataMovementManager));
132+
public QueryBatcherJobTicket apply(QueryBatchListener urisReadyListener, QueryBatcherBuilder queryBatcherBuilder) {
133+
return apply(urisReadyListener, queryBatcherBuilder.buildQueryBatcher(databaseClient, dataMovementManager));
133134
}
134135

135136
/**
136137
* Apply the given listener with the given QueryBatcher. The QueryBatcher should have been constructed via the
137138
* DatabaseClient that was used to instantiate this class.
138139
* <p>
140+
* The given listener can be null. In such a scenario, it's expected that listeners have been defined on this class
141+
* via the setQueryBatchListeners method.
142+
* </p>
143+
* <p>
139144
* Notes on how the job is run:
140145
* <ol>
141146
* <li>If awaitCompletion is set to true (the default), then awaitCompletion() is invoked on the QueryBatcher.</li>
@@ -147,11 +152,11 @@ public QueryBatcherJobTicket apply(QueryBatchListener listener, QueryBatcherBuil
147152
* likely be stopped with URIs that have not be processed yet.</li>
148153
* </ol>
149154
*
150-
* @param listener
155+
* @param urisReadyListener
151156
* @param queryBatcher
152157
* @return
153158
*/
154-
public QueryBatcherJobTicket apply(QueryBatchListener listener, QueryBatcher queryBatcher) {
159+
public QueryBatcherJobTicket apply(QueryBatchListener urisReadyListener, QueryBatcher queryBatcher) {
155160
if (threadCount > 0) {
156161
queryBatcher.withThreadCount(threadCount);
157162
}
@@ -161,8 +166,23 @@ public QueryBatcherJobTicket apply(QueryBatchListener listener, QueryBatcher que
161166
if (applyConsistentSnapshot) {
162167
queryBatcher.withConsistentSnapshot();
163168
}
169+
if (jobName != null) {
170+
queryBatcher.withJobName(jobName);
171+
}
172+
if (forestConfig != null) {
173+
queryBatcher.withForestConfig(forestConfig);
174+
}
175+
if (urisReadyListeners != null) {
176+
queryBatcher.setUrisReadyListeners(urisReadyListeners);
177+
}
178+
if (queryFailureListeners != null) {
179+
queryBatcher.setQueryFailureListeners(queryFailureListeners);
180+
}
181+
182+
if (urisReadyListener != null) {
183+
queryBatcher.onUrisReady(urisReadyListener);
184+
}
164185

165-
queryBatcher.onUrisReady(listener);
166186
JobTicket jobTicket = dataMovementManager.startJob(queryBatcher);
167187

168188
if (awaitCompletion) {
@@ -233,4 +253,20 @@ public DataMovementManager getDataMovementManager() {
233253
public DatabaseClient getDatabaseClient() {
234254
return databaseClient;
235255
}
256+
257+
public void setJobName(String jobName) {
258+
this.jobName = jobName;
259+
}
260+
261+
public void setForestConfig(ForestConfiguration forestConfig) {
262+
this.forestConfig = forestConfig;
263+
}
264+
265+
public void setQueryFailureListeners(QueryFailureListener... queryFailureListeners) {
266+
this.queryFailureListeners = queryFailureListeners;
267+
}
268+
269+
public void setUrisReadyListeners(QueryBatchListener... urisReadyListeners) {
270+
this.urisReadyListeners = urisReadyListeners;
271+
}
236272
}

src/test/java/com/marklogic/client/ext/datamovement/listener/ManageCollectionsTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.marklogic.client.ext.datamovement.listener;
22

3+
import com.marklogic.client.datamovement.QueryBatch;
4+
import com.marklogic.client.datamovement.QueryBatchListener;
35
import com.marklogic.client.ext.AbstractIntegrationTest;
46
import com.marklogic.client.ext.batch.RestBatchWriter;
57
import com.marklogic.client.ext.batch.SimpleDocumentWriteOperation;
@@ -21,6 +23,16 @@ public void setThenAddThenRemove() {
2123
String secondUri = "dmsdk-test-2.xml";
2224

2325
QueryBatcherTemplate qbt = new QueryBatcherTemplate(newClient("Documents"));
26+
qbt.setJobName("manage-collections-test");
27+
qbt.setBatchSize(1);
28+
qbt.setThreadCount(2);
29+
30+
qbt.setUrisReadyListeners(new QueryBatchListener() {
31+
@Override
32+
public void processEvent(QueryBatch batch) {
33+
System.out.println("Testing, job batch number: " + batch.getJobBatchNumber() + "; " + batch.getJobTicket().getJobId());
34+
}
35+
});
2436

2537
// Clear out the test documents
2638
qbt.applyOnDocuments(new DeleteListener(), firstUri, secondUri);

0 commit comments

Comments
 (0)