Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit b0d59cf

Browse files
author
Rob Rudin
committed
#51 Added WriteListener for catching failures
1 parent 7eea938 commit b0d59cf

File tree

5 files changed

+122
-9
lines changed

5 files changed

+122
-9
lines changed

src/main/java/com/marklogic/client/batch/BatchWriterSupport.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package com.marklogic.client.batch;
22

3+
import com.marklogic.client.document.DocumentWriteOperation;
34
import com.marklogic.client.helper.LoggingObject;
5+
import org.springframework.core.task.AsyncListenableTaskExecutor;
46
import org.springframework.core.task.AsyncTaskExecutor;
57
import org.springframework.core.task.SyncTaskExecutor;
68
import org.springframework.core.task.TaskExecutor;
79
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
810
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
11+
import org.springframework.util.concurrent.ListenableFuture;
12+
import org.springframework.util.concurrent.ListenableFutureCallback;
913

1014
import java.util.ArrayList;
1115
import java.util.List;
@@ -21,6 +25,7 @@ public abstract class BatchWriterSupport extends LoggingObject implements BatchW
2125

2226
private TaskExecutor taskExecutor;
2327
private int threadCount = 16;
28+
private WriteListener writeListener;
2429

2530
@Override
2631
public void initialize() {
@@ -64,6 +69,31 @@ protected void initializeDefaultTaskExecutor() {
6469
}
6570
}
6671

72+
/**
73+
* Will use the WriteListener if the TaskExecutor is an instance of AsyncListenableTaskExecutor. The WriteListener
74+
* will then be used to listen for failures.
75+
*
76+
* @param runnable
77+
* @param items
78+
*/
79+
protected void executeRunnable(Runnable runnable, final List<? extends DocumentWriteOperation> items) {
80+
if (writeListener != null && taskExecutor instanceof AsyncListenableTaskExecutor) {
81+
AsyncListenableTaskExecutor asyncListenableTaskExecutor = (AsyncListenableTaskExecutor)taskExecutor;
82+
ListenableFuture<?> future = asyncListenableTaskExecutor.submitListenable(runnable);
83+
future.addCallback(new ListenableFutureCallback<Object>() {
84+
@Override
85+
public void onFailure(Throwable ex) {
86+
writeListener.onWriteFailure(ex, items);
87+
}
88+
@Override
89+
public void onSuccess(Object result) {
90+
}
91+
});
92+
} else {
93+
taskExecutor.execute(runnable);
94+
}
95+
}
96+
6797
protected TaskExecutor getTaskExecutor() {
6898
return taskExecutor;
6999
}
@@ -75,4 +105,12 @@ public void setTaskExecutor(TaskExecutor taskExecutor) {
75105
public void setThreadCount(int threadCount) {
76106
this.threadCount = threadCount;
77107
}
108+
109+
protected WriteListener getWriteListener() {
110+
return writeListener;
111+
}
112+
113+
public void setWriteListener(WriteListener writeListener) {
114+
this.writeListener = writeListener;
115+
}
78116
}

src/main/java/com/marklogic/client/batch/RestBatchWriter.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.client.document.DocumentWriteSet;
77
import com.marklogic.client.document.ServerTransform;
88

9+
import java.util.Arrays;
910
import java.util.List;
1011

1112
/**
@@ -20,19 +21,32 @@ public class RestBatchWriter extends BatchWriterSupport {
2021
private boolean releaseDatabaseClients = true;
2122
private ServerTransform serverTransform;
2223

24+
public RestBatchWriter(DatabaseClient databaseClient) {
25+
this(Arrays.asList(databaseClient));
26+
}
27+
2328
public RestBatchWriter(List<DatabaseClient> databaseClients) {
2429
this.databaseClients = databaseClients;
2530
}
2631

2732
@Override
28-
public void write(final List<? extends DocumentWriteOperation> items) {
33+
public void write(List<? extends DocumentWriteOperation> items) {
34+
DatabaseClient client = determineDatabaseClientToUse();
35+
Runnable runnable = buildRunnable(client, items);
36+
executeRunnable(runnable, items);
37+
}
38+
39+
protected DatabaseClient determineDatabaseClientToUse() {
2940
if (clientIndex >= databaseClients.size()) {
3041
clientIndex = 0;
3142
}
32-
final DatabaseClient client = databaseClients.get(clientIndex);
43+
DatabaseClient client = databaseClients.get(clientIndex);
3344
clientIndex++;
45+
return client;
46+
}
3447

35-
getTaskExecutor().execute(new Runnable() {
48+
protected Runnable buildRunnable(final DatabaseClient client, final List<? extends DocumentWriteOperation> items) {
49+
return new Runnable() {
3650
@Override
3751
public void run() {
3852
DocumentManager<?, ?> mgr = buildDocumentManager(client);
@@ -53,7 +67,7 @@ public void run() {
5367
logger.info("Wrote " + count + " documents to MarkLogic");
5468
}
5569
}
56-
});
70+
};
5771
}
5872

5973
/**
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.document.DocumentWriteOperation;
4+
5+
import java.util.List;
6+
7+
/**
8+
* Callback interface for when a list of DocumentWriteOperation instances cannot be written to MarkLogic.
9+
*/
10+
public interface WriteListener {
11+
12+
void onWriteFailure(Throwable ex, List<? extends DocumentWriteOperation> items);
13+
}

src/main/java/com/marklogic/client/batch/XccBatchWriter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.marklogic.xcc.Session;
99
import com.marklogic.xcc.exceptions.RequestException;
1010

11-
import java.util.ArrayList;
1211
import java.util.List;
1312

1413
/**
@@ -28,14 +27,22 @@ public XccBatchWriter(List<ContentSource> contentSources) {
2827

2928
@Override
3029
public void write(final List<? extends DocumentWriteOperation> items) {
30+
ContentSource contentSource = determineContentSourceToUse();
31+
Runnable runnable = buildRunnable(contentSource, items);
32+
executeRunnable(runnable, items);
33+
}
34+
35+
protected ContentSource determineContentSourceToUse() {
3136
if (contentSourceIndex >= contentSources.size()) {
3237
contentSourceIndex = 0;
3338
}
34-
35-
final ContentSource contentSource = contentSources.get(contentSourceIndex);
39+
ContentSource contentSource = contentSources.get(contentSourceIndex);
3640
contentSourceIndex++;
41+
return contentSource;
42+
}
3743

38-
getTaskExecutor().execute(new Runnable() {
44+
protected Runnable buildRunnable(final ContentSource contentSource, final List<? extends DocumentWriteOperation> items) {
45+
return new Runnable() {
3946
@Override
4047
public void run() {
4148
Session session = contentSource.newSession();
@@ -58,7 +65,7 @@ public void run() {
5865
session.close();
5966
}
6067
}
61-
});
68+
};
6269
}
6370

6471
public void setDocumentWriteOperationAdapter(DocumentWriteOperationAdapter documentWriteOperationAdapter) {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.AbstractIntegrationTest;
4+
import com.marklogic.client.FailedRequestException;
5+
import com.marklogic.client.document.DocumentWriteOperation;
6+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
7+
import com.marklogic.client.io.StringHandle;
8+
import org.junit.Test;
9+
10+
import java.util.Arrays;
11+
import java.util.List;
12+
13+
public class RestBatchWriterTest extends AbstractIntegrationTest {
14+
15+
@Test
16+
public void failureTest() {
17+
RestBatchWriter writer = new RestBatchWriter(newClient("Documents"));
18+
TestWriteListener testWriteListener = new TestWriteListener();
19+
writer.setWriteListener(testWriteListener);
20+
21+
DocumentWriteOperation op = new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.DOCUMENT_WRITE,
22+
"/test.xml", null, new StringHandle("<hello>world</hello>asdf"));
23+
24+
writer.initialize();
25+
writer.write(Arrays.asList(op));
26+
writer.waitForCompletion();
27+
28+
Throwable caughtError = testWriteListener.caughtError;
29+
assertNotNull("An error should have been thrown due to the invalid XML", caughtError);
30+
assertTrue(caughtError instanceof FailedRequestException);
31+
}
32+
}
33+
34+
class TestWriteListener implements WriteListener {
35+
public Throwable caughtError;
36+
37+
@Override
38+
public void onWriteFailure(Throwable ex, List<? extends DocumentWriteOperation> items) {
39+
this.caughtError = ex;
40+
}
41+
}

0 commit comments

Comments
 (0)