Skip to content

Commit fd8817a

Browse files
(cherry picked from commit 256c44c)
1 parent 19e53c3 commit fd8817a

File tree

2 files changed

+141
-107
lines changed

2 files changed

+141
-107
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 93 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
7878
private final AtomicBoolean started = new AtomicBoolean(false);
7979
private final Object lock = new Object();
8080
private final Map<Forest,List<QueryTask>> blackListedTasks = new HashMap<>();
81+
private boolean isSingleThreaded = false;
8182
private JobTicket jobTicket;
8283
private Thread runJobCompletionListeners;
8384

@@ -350,6 +351,12 @@ private synchronized void initialize() {
350351
// now we've set the threadCount
351352
threadCountSet = true;
352353
}
354+
// If we are iterating and if we have the thread count to 1, we have a single thread acting as both
355+
// consumer and producer of the ThreadPoolExecutor queue. Hence, we produce till the maximum and start
356+
// consuming and produce again. Since the thread count is 1, there is no worry about thread utilization.
357+
if(getThreadCount() == 1) {
358+
isSingleThreaded = true;
359+
}
353360
logger.info("Starting job batchSize={}, threadCount={}, onUrisReady listeners={}, failure listeners={}",
354361
getBatchSize(), getThreadCount(), urisReadyListeners.size(), failureListeners.size());
355362
threadPool = new QueryThreadPoolExecutor(getThreadCount(), this);
@@ -713,6 +720,91 @@ private void shutdownIfAllForestsAreDone() {
713720
threadPool.shutdown();
714721
}
715722

723+
private class IteratorTask implements Runnable {
724+
725+
private QueryBatcher batcher;
726+
727+
IteratorTask(QueryBatcher batcher) {
728+
this.batcher = batcher;
729+
}
730+
731+
@Override
732+
public void run() {
733+
try {
734+
List<String> uriQueue = new ArrayList<>(getBatchSize());
735+
while (iterator.hasNext()) {
736+
uriQueue.add(iterator.next());
737+
// if we've hit batchSize or the end of the iterator
738+
if (uriQueue.size() == getBatchSize() || !iterator.hasNext()) {
739+
final List<String> uris = uriQueue;
740+
uriQueue = new ArrayList<>(getBatchSize());
741+
Runnable processBatch = new Runnable() {
742+
public void run() {
743+
QueryBatchImpl batch = new QueryBatchImpl()
744+
.withBatcher(batcher)
745+
.withTimestamp(Calendar.getInstance())
746+
.withJobTicket(getJobTicket());
747+
try {
748+
long currentBatchNumber = batchNumber.incrementAndGet();
749+
// round-robin from client 0 to (clientList.size() - 1);
750+
List<DatabaseClient> currentClientList = clientList.get();
751+
int clientIndex = (int) (currentBatchNumber % currentClientList.size());
752+
DatabaseClient client = currentClientList.get(clientIndex);
753+
batch = batch.withJobBatchNumber(currentBatchNumber)
754+
.withClient(client)
755+
.withJobResultsSoFar(resultsSoFar.addAndGet(uris.size()))
756+
.withItems(uris.toArray(new String[uris.size()]));
757+
logger.trace("batch size={}, jobBatchNumber={}, jobResultsSoFar={}", uris.size(),
758+
batch.getJobBatchNumber(), batch.getJobResultsSoFar());
759+
for (QueryBatchListener listener : urisReadyListeners) {
760+
try {
761+
listener.processEvent(batch);
762+
} catch (Throwable e) {
763+
logger.error("Exception thrown by an onUrisReady listener", e);
764+
}
765+
}
766+
} catch (Throwable t) {
767+
batch = batch.withItems(uris.toArray(new String[uris.size()]));
768+
for (QueryFailureListener listener : failureListeners) {
769+
try {
770+
listener.processFailure(new QueryBatchException(batch, t));
771+
} catch (Throwable e) {
772+
logger.error("Exception thrown by an onQueryFailure listener", e);
773+
}
774+
}
775+
logger.warn("Error iterating to queue uris: {}", t.toString());
776+
}
777+
}
778+
};
779+
threadPool.execute(processBatch);
780+
// If the queue is almost full, stop producing and add a task to continue later
781+
if (isSingleThreaded && threadPool.getQueue().remainingCapacity() <= 2 && iterator.hasNext()) {
782+
threadPool.execute(new IteratorTask(batcher));
783+
return;
784+
}
785+
}
786+
}
787+
} catch (Throwable t) {
788+
for (QueryFailureListener listener : failureListeners) {
789+
QueryBatchImpl batch = new QueryBatchImpl()
790+
.withItems(new String[0])
791+
.withClient(clientList.get().get(0))
792+
.withBatcher(batcher)
793+
.withTimestamp(Calendar.getInstance())
794+
.withJobResultsSoFar(0);
795+
796+
try {
797+
listener.processFailure(new QueryBatchException(batch, t));
798+
} catch (Throwable e) {
799+
logger.error("Exception thrown by an onQueryFailure listener", e);
800+
}
801+
}
802+
logger.warn("Error iterating to queue uris: {}", t.toString());
803+
}
804+
runJobCompletionListeners.start();
805+
threadPool.shutdown();
806+
}
807+
}
716808

717809
/* startIterating launches in a separate thread (actually a task handled by
718810
* threadPool) and just loops through the Iterator<String>, batching uris of
@@ -729,86 +821,7 @@ private void shutdownIfAllForestsAreDone() {
729821
* their listeners handled, they should use try-catch and handle them.
730822
*/
731823
private void startIterating() {
732-
final QueryBatcher batcher = this;
733-
Runnable queueUris = new Runnable() {
734-
public void run() {
735-
try {
736-
final AtomicLong batchNumber = new AtomicLong(0);
737-
final AtomicLong resultsSoFar = new AtomicLong(0);
738-
List<String> uriQueue = new ArrayList<>(getBatchSize());
739-
while ( iterator.hasNext() ) {
740-
try {
741-
uriQueue.add(iterator.next());
742-
// if we've hit batchSize or the end of the iterator
743-
if ( uriQueue.size() == getBatchSize() || ! iterator.hasNext() ) {
744-
final List<String> uris = uriQueue;
745-
uriQueue = new ArrayList<>(getBatchSize());
746-
Runnable processBatch = new Runnable() {
747-
public void run() {
748-
long currentBatchNumber = batchNumber.incrementAndGet();
749-
// round-robin from client 0 to (clientList.size() - 1);
750-
List<DatabaseClient> currentClientList = clientList.get();
751-
int clientIndex = (int) (currentBatchNumber % currentClientList.size());
752-
DatabaseClient client = currentClientList.get(clientIndex);
753-
QueryBatchImpl batch = new QueryBatchImpl()
754-
.withClient(client)
755-
.withBatcher(batcher)
756-
.withTimestamp(Calendar.getInstance())
757-
.withJobTicket(getJobTicket())
758-
.withJobBatchNumber(currentBatchNumber)
759-
.withJobResultsSoFar(resultsSoFar.addAndGet(uris.size()));
760-
batch = batch.withItems(uris.toArray(new String[uris.size()]));
761-
logger.trace("batch size={}, jobBatchNumber={}, jobResultsSoFar={}", uris.size(),
762-
batch.getJobBatchNumber(), batch.getJobResultsSoFar());
763-
for (QueryBatchListener listener : urisReadyListeners) {
764-
try {
765-
listener.processEvent(batch);
766-
} catch (Throwable e) {
767-
logger.error("Exception thrown by an onUrisReady listener", e);
768-
}
769-
}
770-
}
771-
};
772-
threadPool.execute(processBatch);
773-
}
774-
} catch (Throwable t) {
775-
QueryBatchImpl batch = new QueryBatchImpl()
776-
.withItems(new String[0])
777-
.withClient(clientList.get().get(0))
778-
.withBatcher(batcher)
779-
.withTimestamp(Calendar.getInstance())
780-
.withJobResultsSoFar(0);
781-
for ( QueryFailureListener listener : failureListeners ) {
782-
try {
783-
listener.processFailure(new QueryBatchException(batch, t));
784-
} catch (Throwable e) {
785-
logger.error("Exception thrown by an onQueryFailure listener", e);
786-
}
787-
}
788-
logger.warn("Error iterating to queue uris: {}", t.toString());
789-
}
790-
}
791-
} catch (Throwable t) {
792-
for ( QueryFailureListener listener : failureListeners ) {
793-
try {
794-
QueryBatchImpl batch = new QueryBatchImpl()
795-
.withItems(new String[0])
796-
.withClient(clientList.get().get(0))
797-
.withBatcher(batcher)
798-
.withTimestamp(Calendar.getInstance())
799-
.withJobResultsSoFar(0);
800-
listener.processFailure(new QueryBatchException(batch, t));
801-
} catch (Throwable e) {
802-
logger.error("Exception thrown by an onQueryFailure listener", e);
803-
}
804-
}
805-
logger.warn("Error iterating to queue uris", t.toString());
806-
}
807-
runJobCompletionListeners.start();
808-
threadPool.shutdown();
809-
}
810-
};
811-
threadPool.execute(queueUris);
824+
threadPool.execute(new IteratorTask(this));
812825
}
813826

814827
public void stop() {

marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherTest.java

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,22 @@ public void testRawValueQuery() throws Exception {
209209
runQueryBatcher(moveMgr.newQueryBatcher(query), query, matchesByForest, 17, 99);
210210
}
211211

212+
@Test
213+
public void testIterator() throws Exception {
214+
Map<String, String[]> matchesByForest = new HashMap<>();
215+
matchesByForest.put("java-unittest-1", new String[] {uri1, uri3, uri4});
216+
matchesByForest.put("java-unittest-2", new String[] {uri5});
217+
matchesByForest.put("java-unittest-3", new String[] {uri2});
218+
String[] uris = new String[] {uri1, uri2, uri3, uri4, uri5};
219+
List<String> uriList = Arrays.asList(uris);
220+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 1, 1, false);
221+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 2, 2, false);
222+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 2, 3, false);
223+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 2, 10, false);
224+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 10, 1, false);
225+
runQueryBatcher(moveMgr.newQueryBatcher(uriList.iterator()), null, matchesByForest, 18, 33, false);
226+
}
227+
212228
@Test
213229
public void testRawCombinedQuery() throws Exception {
214230
StringHandle structuredQuery = new StringHandle(
@@ -233,9 +249,7 @@ public void testRawCombinedQuery() throws Exception {
233249
}
234250

235251
public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Map<String,String[]> matchesByForest,
236-
int batchSize, int threadCount)
237-
throws Exception
238-
{
252+
int batchSize, int threadCount, boolean queryBatcherChecks) throws Exception {
239253
String queryBatcherJobId = "QueryBatcherJobId";
240254
String queryBatcherJobName = "QueryBatcherJobName";
241255
int numExpected = 0;
@@ -258,13 +272,15 @@ public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Ma
258272
batch -> {
259273
successfulBatchCount.incrementAndGet();
260274
totalResults.addAndGet(batch.getItems().length);
261-
String forestName = batch.getForest().getForestName();
262-
// atomically gets the set unless it's missing in which case it creates it
263-
Set<String> matches = results.computeIfAbsent(forestName, k->ConcurrentHashMap.<String>newKeySet());
264-
for ( String uri : batch.getItems() ) {
265-
matches.add(uri);
275+
if(queryBatcherChecks) {
276+
String forestName = batch.getForest().getForestName();
277+
// atomically gets the set unless it's missing in which case it creates it
278+
Set<String> matches = results.computeIfAbsent(forestName, k->ConcurrentHashMap.<String>newKeySet());
279+
for ( String uri : batch.getItems() ) {
280+
matches.add(uri);
281+
}
282+
batchDatabaseName.set(batch.getForest().getDatabaseName());
266283
}
267-
batchDatabaseName.set(batch.getForest().getDatabaseName());
268284
batchTicket.set(batch.getJobTicket());
269285
batchTimestamp.set(batch.getTimestamp());
270286
}
@@ -276,8 +292,8 @@ public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Ma
276292
failures.append("ERROR:[" + throwable + "]\n");
277293
}
278294
)
279-
.withJobId(queryBatcherJobId)
280-
.withJobName(queryBatcherJobName);
295+
.withJobId(queryBatcherJobId)
296+
.withJobName(queryBatcherJobName);
281297

282298
assertEquals(batchSize, queryBatcher.getBatchSize());
283299
assertEquals(threadCount, queryBatcher.getThreadCount());
@@ -309,8 +325,6 @@ public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Ma
309325
fail(failures.toString());
310326
}
311327

312-
assertEquals("java-unittest", batchDatabaseName.get());
313-
314328
// make sure we got the right number of results
315329
assertEquals(numExpected, totalResults.get());
316330

@@ -327,27 +341,34 @@ public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Ma
327341
assertEquals("Job Report has incorrect failure events counts", failureBatchCount.get(), report.getFailureEventsCount());
328342
//assertEquals("Job Report has incorrect job completion information", true, report.isJobComplete());
329343

330-
// make sure we get the same number of results via search for the same query
331-
SearchHandle searchResults = client.newQueryManager().search(query, new SearchHandle());
332-
assertEquals(numExpected, searchResults.getTotalResults());
333-
334-
// if there are only the three expected forests, make sure we got the expected results per forest
335-
if ( queryBatcher.getForestConfig().listForests().length == 3 ) {
336-
for ( String forest : matchesByForest.keySet() ) {
337-
String[] expected = matchesByForest.get(forest);
338-
for ( String uri : expected ) {
339-
if ( results.get(forest) == null || ! results.get(forest).contains(uri) ) {
340-
for ( String resultsForest : results.keySet() ) {
341-
logger.error("Results found for forest {}: {}, expected {}", resultsForest, results.get(resultsForest),
342-
Arrays.asList(matchesByForest.get(resultsForest)));
344+
if(queryBatcherChecks) {
345+
assertEquals("java-unittest", batchDatabaseName.get());
346+
// make sure we get the same number of results via search for the same query
347+
SearchHandle searchResults = client.newQueryManager().search(query, new SearchHandle());
348+
assertEquals(numExpected, searchResults.getTotalResults());
349+
// if there are only the three expected forests, make sure we got the expected results per forest
350+
if ( queryBatcher.getForestConfig().listForests().length == 3 ) {
351+
for ( String forest : matchesByForest.keySet() ) {
352+
String[] expected = matchesByForest.get(forest);
353+
for ( String uri : expected ) {
354+
if ( results.get(forest) == null || ! results.get(forest).contains(uri) ) {
355+
for ( String resultsForest : results.keySet() ) {
356+
logger.error("Results found for forest {}: {}, expected {}", resultsForest, results.get(resultsForest),
357+
Arrays.asList(matchesByForest.get(resultsForest)));
358+
}
359+
fail("Missing uri=[" + uri + "] from forest=[" + forest + "]");
343360
}
344-
fail("Missing uri=[" + uri + "] from forest=[" + forest + "]");
345361
}
346362
}
347363
}
348364
}
349365
}
350366

367+
public void runQueryBatcher(QueryBatcher queryBatcher, QueryDefinition query, Map<String,String[]> matchesByForest,
368+
int batchSize, int threadCount) throws Exception {
369+
runQueryBatcher(queryBatcher, query, matchesByForest, batchSize, threadCount, true);
370+
}
371+
351372
@Test
352373
public void testMatchOneAndThrowException() {
353374
StructuredQueryDefinition query = new StructuredQueryBuilder().document(uri1);

0 commit comments

Comments
 (0)