Skip to content

Commit 6290640

Browse files
authored
Merge pull request #1550 from marklogic/feature/366-threadCount
DEVEXP-366 Can now adjust thread count on QueryBatcher
2 parents 61dc176 + 9a11188 commit 6290640

File tree

5 files changed

+159
-13
lines changed

5 files changed

+159
-13
lines changed

marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.junit.jupiter.api.AfterAll;
1818
import org.junit.jupiter.api.Assertions;
1919
import org.junit.jupiter.api.BeforeAll;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022
import org.xml.sax.SAXException;
2123

2224
import javax.xml.parsers.ParserConfigurationException;
@@ -34,6 +36,7 @@
3436
public abstract class AbstractFunctionalTest extends BasicJavaClientREST {
3537

3638
protected final static String DB_NAME = "java-functest";
39+
protected final Logger logger = LoggerFactory.getLogger(getClass());
3740

3841
protected final static ObjectMapper objectMapper = new ObjectMapper();
3942

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package com.marklogic.client.fastfunctest.datamovement;
2+
3+
import com.marklogic.client.datamovement.DataMovementManager;
4+
import com.marklogic.client.datamovement.QueryBatcher;
5+
import com.marklogic.client.fastfunctest.AbstractFunctionalTest;
6+
import org.junit.jupiter.api.Test;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Set;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertThrows;
17+
18+
public class AdjustQueryBatcherThreadCountTest extends AbstractFunctionalTest {
19+
20+
@Test
21+
void increaseThreadCount() {
22+
List<String> uris = writeJsonDocs(20);
23+
Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
24+
AtomicInteger uriCount = new AtomicInteger();
25+
26+
DataMovementManager dmm = client.newDataMovementManager();
27+
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
28+
.withThreadCount(1)
29+
.withBatchSize(1)
30+
.onUrisReady(batch -> {
31+
waitFor(50);
32+
threadNames.add(Thread.currentThread().getName());
33+
uriCount.addAndGet(batch.getItems().length);
34+
});
35+
36+
dmm.startJob(qb);
37+
waitFor(100);
38+
qb.withThreadCount(3);
39+
qb.awaitCompletion();
40+
dmm.stopJob(qb);
41+
42+
assertEquals(20, uriCount.get());
43+
assertEquals(3, threadNames.size(), "3 threads should have processed all the batches, as the thread count " +
44+
"was increased from 1 to 3 100ms into the job.");
45+
}
46+
47+
@Test
48+
void reduceThreadCount() {
49+
List<String> uris = writeJsonDocs(20);
50+
List<String> threadNames = Collections.synchronizedList(new ArrayList<>());
51+
52+
DataMovementManager dmm = client.newDataMovementManager();
53+
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
54+
.withThreadCount(4)
55+
.withBatchSize(1)
56+
.onUrisReady(batch -> {
57+
waitFor(50);
58+
threadNames.add(Thread.currentThread().getName());
59+
});
60+
61+
dmm.startJob(qb);
62+
waitFor(100);
63+
qb.withThreadCount(2);
64+
qb.awaitCompletion();
65+
dmm.stopJob(qb);
66+
67+
assertEquals(20, threadNames.size(), "With 20 docs and a batch size of 1, the onUrisReady listener should " +
68+
"have been called 20 times and thus captured 20 names.");
69+
70+
Set<String> lastEightThreadNames = new HashSet<>(threadNames.subList(12, 19));
71+
assertEquals(2, lastEightThreadNames.size(), "Since the thread count was reduced from 4 to 2 100ms into the " +
72+
"job, then we can assume that 8 batches of size 1 were processed by 4 threads during those first 100ms, " +
73+
"as there's a 50ms pause in the onUrisReady listener. The thread count would have been reduced to 2. In " +
74+
"theory, the last 12 batches should have only been processed by those 2 threads. But just to be safe, " +
75+
"we verify that the last 8 batches were only processed by 2 threads in case it took the thread pool a " +
76+
"little bit of time to switch from 4 to 2 threads.");
77+
}
78+
79+
@Test
80+
void setThreadCountToOneAndThenHigher() {
81+
List<String> uris = writeJsonDocs(20);
82+
AtomicInteger uriCount = new AtomicInteger();
83+
84+
DataMovementManager dmm = client.newDataMovementManager();
85+
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
86+
.withThreadCount(4)
87+
.withBatchSize(1)
88+
.onUrisReady(batch -> {
89+
waitFor(50);
90+
uriCount.addAndGet(batch.getItems().length);
91+
});
92+
93+
dmm.startJob(qb);
94+
waitFor(100);
95+
qb.withThreadCount(1);
96+
qb.withThreadCount(8);
97+
qb.awaitCompletion();
98+
dmm.stopJob(qb);
99+
100+
assertEquals(20, uriCount.get(), "The purpose of this test is to verify that if the thread count is set to 1, " +
101+
"the thread pool doesn't stop or throw an error. It may pause execution (for reasons that aren't known), " +
102+
"but testing has shown that increasing it to a value greater than 1 will cause execution to resume if it " +
103+
"was in fact paused.");
104+
}
105+
106+
@Test
107+
void setThreadCountToZero() {
108+
List<String> uris = writeJsonDocs(20);
109+
AtomicInteger uriCount = new AtomicInteger();
110+
111+
DataMovementManager dmm = client.newDataMovementManager();
112+
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
113+
.withThreadCount(4)
114+
.withBatchSize(1)
115+
.onUrisReady(batch -> {
116+
waitFor(50);
117+
uriCount.addAndGet(batch.getItems().length);
118+
});
119+
120+
dmm.startJob(qb);
121+
waitFor(100);
122+
123+
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> qb.withThreadCount(0));
124+
assertEquals("threadCount must be 1 or greater", ex.getMessage());
125+
assertEquals(4, qb.getThreadCount(), "The thread count should not have been adjusted since the input was invalid");
126+
127+
qb.awaitCompletion();
128+
dmm.stopJob(qb);
129+
130+
assertEquals(20, uriCount.get(), "All 20 URIs should still have been retrieved");
131+
}
132+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public interface Batcher {
7474
* that batches will be processed sequentially because the calling thread
7575
* will sometimes also process batches.</p>
7676
*
77-
* <p>This method cannot be called after the job has started.</p>
77+
* <p>Unless otherwise noted by a subclass, this method cannot be called after the job has started.</p>
7878
*
7979
* @param threadCount the number of threads to use in this Batcher
8080
*

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public interface QueryBatcher extends Batcher {
142142
QueryBatcher onQueryFailure(QueryFailureListener listener);
143143

144144
/**
145-
* <p>Add a listener to run when the Query job is completed i.e. when all the
145+
* <p>Add a listener to run when the Query job is completed i.e. when all the
146146
* document URIs are retrieved and the associated listeners are completed</p>
147147
*
148148
* @param listener the code to run when the Query job is completed
@@ -333,6 +333,12 @@ public interface QueryBatcher extends Batcher {
333333
* threads used for processing the queued batches (running processEvent on
334334
* the listeners regiested with onUrisReady).
335335
*
336+
* As of the 6.2.0 release, this can now be adjusted after the batcher has been started. The underlying Java
337+
* {@code ThreadPoolExecutor} will have both its core and max pool sizes set to the given thread count. Use caution
338+
* when reducing this to a value of 1 while the batcher is running; in some cases, the underlying
339+
* {@code ThreadPoolExecutor} may halt execution of any tasks. Execution can be resumed by increasing the thread count
340+
* to a value of 2 or higher.
341+
*
336342
* @return this instance for method chaining
337343
*/
338344
@Override
@@ -391,19 +397,19 @@ public interface QueryBatcher extends Batcher {
391397
* Retry in the same thread to query a batch that failed. If it fails again,
392398
* all the failure listeners associated with the batcher using onQueryFailure
393399
* method would be processed.
394-
*
400+
*
395401
* Note : Use this method with caution as there is a possibility of infinite
396402
* loops. If a batch fails and one of the failure listeners calls this method
397403
* to retry with failure listeners and if the batch again fails, this would go
398404
* on as an infinite loop until the batch succeeds.
399-
*
405+
*
400406
* @param queryEvent the information about the batch that failed
401407
*/
402408
void retryWithFailureListeners(QueryEvent queryEvent);
403-
409+
404410
/**
405411
* Sets the limit for the maximum number of batches that can be collected.
406-
*
412+
*
407413
* @param maxBatches is the value of the limit.
408414
*/
409415
void setMaxBatches(long maxBatches);
@@ -412,10 +418,10 @@ public interface QueryBatcher extends Batcher {
412418
* Caps the query at the current batch.
413419
*/
414420
void setMaxBatches();
415-
421+
416422
/**
417-
* Returns the maximum number of Batches for the current job.
418-
*
423+
* Returns the maximum number of Batches for the current job.
424+
*
419425
* @return the maximum number of Batches that can be collected.
420426
*/
421427
long getMaxBatches();

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,12 +364,17 @@ public int getMaxDocToUriBatchRatio() {
364364

365365
@Override
366366
public QueryBatcher withThreadCount(int threadCount) {
367-
requireNotStarted();
368-
if ( getThreadCount() <= 0 ) {
367+
if (threadCount <= 0 ) {
369368
throw new IllegalArgumentException("threadCount must be 1 or greater");
370369
}
371-
threadCountSet = true;
372-
super.withThreadCount(threadCount);
370+
if (threadPool != null) {
371+
logger.info("Adjusting thread pool size from {} to {}", getThreadCount(), threadCount);
372+
threadPool.setCorePoolSize(threadCount);
373+
threadPool.setMaximumPoolSize(threadCount);
374+
} else {
375+
threadCountSet = true;
376+
}
377+
super.withThreadCount(threadCount);
373378
return this;
374379
}
375380

0 commit comments

Comments
 (0)