Skip to content

Commit 70bd20d

Browse files
author
ehennum
committed
#1279 size the task queue for uri collectors and document processors
1 parent 6f89e26 commit 70bd20d

File tree

2 files changed

+13
-8
lines changed
  • marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests
  • marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl

2 files changed

+13
-8
lines changed

marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/TestSplitters.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,6 @@ public void testWriteOpsMultipleThreads() throws KeyManagementException, NoSuchA
476476
try {
477477
tempJsonFile1 = File.createTempFile("TestWriteOps1", ".json");
478478
BufferedWriter bwJson1 = new BufferedWriter(new FileWriter(tempJsonFile1));
479-
DocumentManager docMgrJson1 = client.newJSONDocumentManager();
480479
dmManager = client.newDataMovementManager();
481480
WriteBatcher wbatcher1 = dmManager.newWriteBatcher();
482481

@@ -530,15 +529,22 @@ public void testWriteOpsMultipleThreads() throws KeyManagementException, NoSuchA
530529
t1.join();
531530
t2.join();
532531

533-
Thread.sleep(1000);
532+
wbatcher1.awaitCompletion();
533+
534534
// Verify docs count
535535
QueryBatcher queryBatcherdMgr = dmManager.newQueryBatcher(
536536
new StructuredQueryBuilder().collection(collectionName))
537537
.withBatchSize(20)
538-
//.withThreadCount(1)
538+
.withThreadCount(1)
539539
.onUrisReady((batch) -> {
540540
cnt1.addAndGet(batch.getItems().length);
541-
});
541+
})
542+
.onQueryFailure(failure -> {
543+
System.out.println("query failed");
544+
failure.printStackTrace(System.out);
545+
dmManager.stopJob(failure.getBatcher());
546+
});
547+
542548
dmManager.startJob(queryBatcherdMgr);
543549
queryBatcherdMgr.awaitCompletion();
544550
assertEquals(2 * nDocs, cnt1.get());
@@ -565,7 +571,6 @@ public void testWriteOpsMultipleThreads() throws KeyManagementException, NoSuchA
565571
jsonfs1.close();
566572
if (jsonfs2 != null)
567573
jsonfs2.close();
568-
docMgrJson1 = null;
569574
} catch (Exception ex) {
570575
System.out.println("Exceptions thrown testSplitterWithMultipleThreads " + ex.getMessage());
571576
} finally {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ private synchronized void initialize() {
492492
"onUrisReady listeners={}, failure listeners={}",
493493
getBatchSize(), getDocToUriBatchRatio(), getThreadThrottleFactor(), getThreadCount(),
494494
urisReadyListeners.size(), failureListeners.size());
495-
threadPool = new QueryThreadPoolExecutor(getThreadCount(), this);
495+
threadPool = new QueryThreadPoolExecutor(getThreadCount(), getDocToUriBatchRatio(), this);
496496
}
497497

498498
/* When withForestConfig is called before the job starts, it just provides
@@ -1141,9 +1141,9 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
11411141
private class QueryThreadPoolExecutor extends ThreadPoolExecutor {
11421142
private Object objectToNotifyFrom;
11431143

1144-
QueryThreadPoolExecutor(int threadCount, Object objectToNotifyFrom) {
1144+
QueryThreadPoolExecutor(int threadCount, int docToUriBatchRatio, Object objectToNotifyFrom) {
11451145
super(threadCount, threadCount, 0, TimeUnit.MILLISECONDS,
1146-
new LinkedBlockingQueue<Runnable>(threadCount * 25), new BlockingRunsPolicy());
1146+
new LinkedBlockingQueue<Runnable>(threadCount * (docToUriBatchRatio + 2)), new BlockingRunsPolicy());
11471147
this.objectToNotifyFrom = objectToNotifyFrom;
11481148
}
11491149

0 commit comments

Comments
 (0)