Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit af07a23

Browse files
author
Rob Rudin
committed
#44 Figured out how to use Spring's support for waiting for tasks to finish
1 parent aaac9eb commit af07a23

File tree

6 files changed

+56
-39
lines changed

6 files changed

+56
-39
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
/.project
88
.idea
99
*.iml
10+
out

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ dependencies {
1717
compile 'com.marklogic:java-client-api:3.0.5'
1818
compile 'com.marklogic:marklogic-xcc:8.0.5'
1919
compile 'org.jdom:jdom2:2.0.5'
20-
compile 'org.springframework:spring-context:4.1.5.RELEASE'
20+
compile 'org.springframework:spring-context:4.3.5.RELEASE'
2121

2222
testCompile 'junit:junit:4+'
23-
testCompile 'org.springframework:spring-test:4.1.5.RELEASE'
23+
testCompile 'org.springframework:spring-test:4.3.5.RELEASE'
2424

2525
// Used for testing loading modules from the classpath
2626
testRuntime files("lib/modules.jar")

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
group=com.marklogic
22
javadocsDir=../gh-pages-marklogic-java/javadocs
3-
version=2.11.0
3+
version=2.12.DEV

src/main/java/com/marklogic/client/batch/BatchWriterSupport.java

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.springframework.core.task.AsyncTaskExecutor;
55
import org.springframework.core.task.SyncTaskExecutor;
66
import org.springframework.core.task.TaskExecutor;
7+
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
78
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
89

910
import java.util.ArrayList;
@@ -13,27 +14,44 @@
1314

1415
/**
1516
* Support class for BatchWriter implementations that uses Spring's TaskExecutor interface for parallelizing writes to
16-
* MarkLogic.
17+
* MarkLogic. Allows for setting a TaskExecutor instance, and if one is not set, a default one will be created based
18+
* on the threadCount attribute. That attribute is ignored if a TaskExecutor is set.
1719
*/
1820
public abstract class BatchWriterSupport extends LoggingObject implements BatchWriter {
1921

2022
private TaskExecutor taskExecutor;
2123
private int threadCount = 16;
2224

23-
/**
24-
* Seems necessary to keep track of each Future instance so that we can properly wait for each one to finish.
25-
* Spring's TaskExecutor library doesn't seem to provide a better way of doing this.
26-
*/
27-
private List<Future<?>> futures = new ArrayList<>();
28-
2925
@Override
3026
public void initialize() {
27+
if (taskExecutor == null) {
28+
initializeDefaultTaskExecutor();
29+
}
30+
}
31+
32+
@Override
33+
public void waitForCompletion() {
34+
if (taskExecutor instanceof ExecutorConfigurationSupport) {
35+
if (logger.isInfoEnabled()) {
36+
logger.info("Calling shutdown on thread pool");
37+
}
38+
((ExecutorConfigurationSupport) taskExecutor).shutdown();
39+
if (logger.isInfoEnabled()) {
40+
logger.info("Thread pool finished shutdown");
41+
}
42+
}
43+
}
44+
45+
protected void initializeDefaultTaskExecutor() {
3146
if (threadCount > 1) {
3247
if (logger.isInfoEnabled()) {
3348
logger.info("Initializing thread pool with a count of " + threadCount);
3449
}
3550
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
3651
tpte.setCorePoolSize(threadCount);
52+
// By default, wait for tasks to finish, and wait up to an hour
53+
tpte.setWaitForTasksToCompleteOnShutdown(true);
54+
tpte.setAwaitTerminationSeconds(60 * 60);
3755
tpte.afterPropertiesSet();
3856
this.taskExecutor = tpte;
3957
} else {
@@ -44,33 +62,8 @@ public void initialize() {
4462
}
4563
}
4664

47-
@Override
48-
public void waitForCompletion() {
49-
int size = futures.size();
50-
if (logger.isDebugEnabled()) {
51-
logger.debug("Waiting for threads to finish document processing; futures count: " + size);
52-
}
53-
54-
for (int i = 0; i < size; i++) {
55-
Future<?> f = futures.get(i);
56-
if (f.isDone()) {
57-
continue;
58-
}
59-
try {
60-
// Wait up to 1 hour for a write to ML to finish (should never happen)
61-
f.get(1, TimeUnit.HOURS);
62-
} catch (Exception ex) {
63-
logger.warn("Unable to wait for last set of documents to be processed: " + ex.getMessage(), ex);
64-
}
65-
}
66-
}
67-
68-
protected void execute(Runnable runnable) {
69-
if (taskExecutor instanceof AsyncTaskExecutor) {
70-
futures.add(((AsyncTaskExecutor) taskExecutor).submit(runnable));
71-
} else {
72-
taskExecutor.execute(runnable);
73-
}
65+
protected TaskExecutor getTaskExecutor() {
66+
return taskExecutor;
7467
}
7568

7669
public void setTaskExecutor(TaskExecutor taskExecutor) {

src/main/java/com/marklogic/client/batch/RestBatchWriter.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@
77

88
import java.util.List;
99

10+
/**
11+
* REST API-based implementation, using the Java Client API. By default, this will call release() on each of the
12+
* DatabaseClient objects that are passed in. Be sure to disable this if you want to keep using those DatabaseClient
13+
* objects.
14+
*/
1015
public class RestBatchWriter extends BatchWriterSupport {
1116

1217
private List<DatabaseClient> databaseClients;
1318
private int clientIndex = 0;
19+
private boolean releaseDatabaseClients = true;
1420

1521
public RestBatchWriter(List<DatabaseClient> databaseClients) {
1622
this.databaseClients = databaseClients;
@@ -24,7 +30,7 @@ public void write(final List<? extends DocumentWriteOperation> items) {
2430
final DatabaseClient client = databaseClients.get(clientIndex);
2531
clientIndex++;
2632

27-
execute(new Runnable() {
33+
getTaskExecutor().execute(new Runnable() {
2834
@Override
2935
public void run() {
3036
GenericDocumentManager mgr = client.newDocumentManager();
@@ -43,4 +49,21 @@ public void run() {
4349
}
4450
});
4551
}
52+
53+
@Override
54+
public void waitForCompletion() {
55+
super.waitForCompletion();
56+
57+
if (databaseClients != null && releaseDatabaseClients) {
58+
logger.info("Releasing DatabaseClient instances...");
59+
for (DatabaseClient client : databaseClients) {
60+
client.release();
61+
}
62+
logger.info("Finished releasing DatabaseClient instances");
63+
}
64+
}
65+
66+
public void setReleaseDatabaseClients(boolean releaseDatabaseClients) {
67+
this.releaseDatabaseClients = releaseDatabaseClients;
68+
}
4669
}

src/main/java/com/marklogic/client/batch/XccBatchWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void write(final List<? extends DocumentWriteOperation> items) {
3535
final ContentSource contentSource = contentSources.get(contentSourceIndex);
3636
contentSourceIndex++;
3737

38-
execute(new Runnable() {
38+
getTaskExecutor().execute(new Runnable() {
3939
@Override
4040
public void run() {
4141
Session session = contentSource.newSession();

0 commit comments

Comments
 (0)