Skip to content

Commit e18e938

Browse files
author
ehennum
committed
#1279 size the task queue based on the number of forests
1 parent 70bd20d commit e18e938

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -469,9 +469,9 @@ public synchronized void start(JobTicket ticket) {
469469
}
470470

471471
private synchronized void initialize() {
472+
Forest[] forests = getForestConfig().listForests();
472473
if ( threadCountSet == false ) {
473474
if ( query != null ) {
474-
Forest[] forests = getForestConfig().listForests();
475475
logger.warn("threadCount not set--defaulting to number of forests ({})", forests.length);
476476
withThreadCount(forests.length * (docToUriBatchRatio - threadThrottleFactor));
477477
} else {
@@ -488,11 +488,11 @@ private synchronized void initialize() {
488488
if(getThreadCount() == 1) {
489489
isSingleThreaded = true;
490490
}
491-
logger.info("Starting job docBatchSize={}, docToUriBatchRatio={}, threadThrottleFactor= {}, threadCount={}, " +
492-
"onUrisReady listeners={}, failure listeners={}",
493-
getBatchSize(), getDocToUriBatchRatio(), getThreadThrottleFactor(), getThreadCount(),
491+
logger.info("Starting job forest length={}, docBatchSize={}, docToUriBatchRatio={}, threadThrottleFactor= {}, " +
492+
"threadCount={}, onUrisReady listeners={}, failure listeners={}",
493+
forests.length, getBatchSize(), getDocToUriBatchRatio(), getThreadThrottleFactor(), getThreadCount(),
494494
urisReadyListeners.size(), failureListeners.size());
495-
threadPool = new QueryThreadPoolExecutor(getThreadCount(), getDocToUriBatchRatio(), this);
495+
threadPool = new QueryThreadPoolExecutor(getThreadCount(), forests.length, getDocToUriBatchRatio(), this);
496496
}
497497

498498
/* When withForestConfig is called before the job starts, it just provides
@@ -1141,9 +1141,10 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
11411141
private class QueryThreadPoolExecutor extends ThreadPoolExecutor {
11421142
private Object objectToNotifyFrom;
11431143

1144-
QueryThreadPoolExecutor(int threadCount, int docToUriBatchRatio, Object objectToNotifyFrom) {
1144+
QueryThreadPoolExecutor(int threadCount, int forestsLength, int docToUriBatchRatio, Object objectToNotifyFrom) {
11451145
super(threadCount, threadCount, 0, TimeUnit.MILLISECONDS,
1146-
new LinkedBlockingQueue<Runnable>(threadCount * (docToUriBatchRatio + 2)), new BlockingRunsPolicy());
1146+
new LinkedBlockingQueue<>((forestsLength * docToUriBatchRatio * 2) + threadCount),
1147+
new BlockingRunsPolicy());
11471148
this.objectToNotifyFrom = objectToNotifyFrom;
11481149
}
11491150

0 commit comments

Comments
 (0)