Skip to content

Commit b452860

Browse files
committed
MLE-26420 Some refactoring of WriteBatcher before incremental write PR
Just some cleanup here before new functionality is added. Fixing some warnings in WriteBatcherImpl, and changed BatchWriter into a record.
1 parent 3e84b5c commit b452860

File tree

5 files changed

+87
-97
lines changed

5 files changed

+87
-97
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.marklogic
2-
version=8.0-SNAPSHOT
2+
version=8.1-SNAPSHOT
33
publishUrl=file:../marklogic-java/releases
44

55
okhttpVersion=5.3.2

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
*/
44
package com.marklogic.client.datamovement;
55

6-
import java.util.concurrent.TimeUnit;
7-
import java.util.stream.Stream;
8-
96
import com.marklogic.client.document.DocumentWriteOperation;
107
import com.marklogic.client.document.ServerTransform;
118
import com.marklogic.client.io.DocumentMetadataHandle;
129
import com.marklogic.client.io.marker.AbstractWriteHandle;
1310
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
1411

12+
import java.util.concurrent.TimeUnit;
13+
import java.util.stream.Stream;
14+
1515
/**
1616
* <p>To facilitate long-running write jobs, batches documents added by many
1717
* external threads and coordinates internal threads to send the batches
@@ -182,12 +182,7 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
182182
*
183183
* @param queryEvent the information about the batch that failed
184184
*/
185-
public void retry(WriteBatch queryEvent);
186-
187-
/*
188-
public WriteBatcher withTransactionSize(int transactionSize);
189-
public int getTransactionSize();
190-
*/
185+
void retry(WriteBatch queryEvent);
191186

192187
/**
193188
* Get the array of WriteBatchListener instances registered via
@@ -361,5 +356,5 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
361356
*
362357
* @param writeBatch the information about the batch that failed
363358
*/
364-
public void retryWithFailureListeners(WriteBatch writeBatch);
359+
void retryWithFailureListeners(WriteBatch writeBatch);
365360
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.marklogic.client.datamovement.impl;
55

66
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.DocumentWriteSet;
78
import com.marklogic.client.document.XMLDocumentManager;
89
import com.marklogic.client.io.Format;
910
import org.slf4j.Logger;
@@ -12,70 +13,76 @@
1213
import java.io.Closeable;
1314
import java.util.function.Consumer;
1415

15-
class BatchWriter implements Runnable {
16+
record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable {
1617

1718
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
1819

19-
private final BatchWriteSet batchWriteSet;
20-
21-
BatchWriter(BatchWriteSet batchWriteSet) {
22-
if (batchWriteSet.getDocumentWriteSet().size() == 0) {
23-
throw new IllegalStateException("Attempt to write an empty batch");
24-
}
25-
this.batchWriteSet = batchWriteSet;
26-
}
27-
2820
@Override
2921
public void run() {
22+
if (batchWriteSet.getDocumentWriteSet() == null || batchWriteSet.getDocumentWriteSet().isEmpty()) {
23+
logger.debug("Unexpected empty batch {}, skipping", batchWriteSet.getBatchNumber());
24+
return;
25+
}
26+
3027
try {
31-
logger.trace("begin write batch {} to forest on host \"{}\"", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
32-
if (batchWriteSet.getTemporalCollection() == null) {
33-
batchWriteSet.getClient().newDocumentManager().write(
34-
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null
35-
);
36-
} else {
37-
// to get access to the TemporalDocumentManager write overload we need to instantiate
38-
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
39-
// format, so we'll set the default content format to unknown
40-
XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
41-
docMgr.setContentFormat(Format.UNKNOWN);
42-
docMgr.write(
43-
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection()
44-
);
45-
}
28+
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
29+
30+
DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
31+
writeDocuments(documentWriteSet);
32+
33+
// This seems like it should be part of a finally block - but it's able to throw an exception. Which implies
34+
// that onFailure() should occur when this fails, which seems odd???
4635
closeAllHandles();
47-
Runnable onSuccess = batchWriteSet.getOnSuccess();
48-
if (onSuccess != null) {
49-
onSuccess.run();
50-
}
36+
37+
onSuccess();
5138
} catch (Throwable t) {
52-
logger.trace("failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
53-
Consumer<Throwable> onFailure = batchWriteSet.getOnFailure();
54-
if (onFailure != null) {
55-
onFailure.accept(t);
56-
}
39+
onFailure(t);
40+
}
41+
}
42+
43+
private void writeDocuments(DocumentWriteSet documentWriteSet) {
44+
if (batchWriteSet.getTemporalCollection() == null) {
45+
batchWriteSet.getClient().newDocumentManager().write(documentWriteSet, batchWriteSet.getTransform(), null);
46+
} else {
47+
// to get access to the TemporalDocumentManager write overload we need to instantiate
48+
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
49+
// format, so we'll set the default content format to unknown
50+
XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
51+
docMgr.setContentFormat(Format.UNKNOWN);
52+
docMgr.write(documentWriteSet, batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection());
53+
}
54+
}
55+
56+
private void onSuccess() {
57+
Runnable onSuccess = batchWriteSet.getOnSuccess();
58+
if (onSuccess != null) {
59+
onSuccess.run();
60+
}
61+
}
62+
63+
private void onFailure(Throwable t) {
64+
logger.trace("Failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
65+
Consumer<Throwable> onFailure = batchWriteSet.getOnFailure();
66+
if (onFailure != null) {
67+
onFailure.accept(t);
5768
}
5869
}
5970

6071
private void closeAllHandles() throws Throwable {
6172
Throwable lastThrowable = null;
6273
for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) {
6374
try {
64-
if (doc.getContent() instanceof Closeable) {
65-
((Closeable) doc.getContent()).close();
75+
if (doc.getContent() instanceof Closeable closeable) {
76+
closeable.close();
6677
}
67-
if (doc.getMetadata() instanceof Closeable) {
68-
((Closeable) doc.getMetadata()).close();
78+
if (doc.getMetadata() instanceof Closeable closeable) {
79+
closeable.close();
6980
}
7081
} catch (Throwable t) {
71-
logger.error("error calling close()", t);
82+
logger.error("Error closing all handles in BatchWriter", t);
7283
lastThrowable = t;
7384
}
7485
}
7586
if (lastThrowable != null) throw lastThrowable;
7687
}
77-
78-
public BatchWriteSet getBatchWriteSet() {
79-
return batchWriteSet;
80-
}
8188
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,25 @@
33
*/
44
package com.marklogic.client.datamovement.impl;
55

6-
import java.util.*;
7-
import java.util.concurrent.BlockingQueue;
8-
import java.util.concurrent.ConcurrentHashMap;
9-
import java.util.concurrent.ConcurrentLinkedQueue;
10-
import java.util.concurrent.LinkedBlockingQueue;
11-
import java.util.concurrent.ThreadPoolExecutor;
12-
import java.util.concurrent.TimeUnit;
13-
import java.util.concurrent.atomic.AtomicLong;
14-
import java.util.stream.Stream;
15-
16-
import org.slf4j.Logger;
17-
import org.slf4j.LoggerFactory;
18-
196
import com.marklogic.client.DatabaseClient;
207
import com.marklogic.client.DatabaseClientFactory;
8+
import com.marklogic.client.datamovement.*;
219
import com.marklogic.client.document.DocumentWriteOperation;
22-
import com.marklogic.client.document.ServerTransform;
2310
import com.marklogic.client.document.DocumentWriteOperation.OperationType;
24-
import com.marklogic.client.io.DocumentMetadataHandle;
11+
import com.marklogic.client.document.ServerTransform;
2512
import com.marklogic.client.impl.DocumentWriteOperationImpl;
2613
import com.marklogic.client.impl.Utilities;
14+
import com.marklogic.client.io.DocumentMetadataHandle;
2715
import com.marklogic.client.io.marker.AbstractWriteHandle;
2816
import com.marklogic.client.io.marker.ContentHandle;
2917
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
3020

31-
import com.marklogic.client.datamovement.DataMovementException;
32-
import com.marklogic.client.datamovement.DataMovementManager;
33-
import com.marklogic.client.datamovement.Forest;
34-
import com.marklogic.client.datamovement.ForestConfiguration;
35-
import com.marklogic.client.datamovement.JobTicket;
36-
import com.marklogic.client.datamovement.WriteBatch;
37-
import com.marklogic.client.datamovement.WriteBatchListener;
38-
import com.marklogic.client.datamovement.WriteEvent;
39-
import com.marklogic.client.datamovement.WriteFailureListener;
40-
import com.marklogic.client.datamovement.WriteBatcher;
21+
import java.util.*;
22+
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.stream.Stream;
4125

4226
/**
4327
* The implementation of WriteBatcher.
@@ -254,19 +238,19 @@ public WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle
254238
}
255239

256240
private void requireInitialized() {
257-
if ( initialized == false ) {
241+
if (!initialized) {
258242
throw new IllegalStateException("This operation must be called after starting this job");
259243
}
260244
}
261245

262246
private void requireNotInitialized() {
263-
if ( initialized == true ) {
247+
if (initialized) {
264248
throw new IllegalStateException("Configuration cannot be changed after starting this job or calling add or addAs");
265249
}
266250
}
267251

268252
private void requireNotStopped() {
269-
if ( isStopped() == true ) throw new IllegalStateException("This instance has been stopped");
253+
if (isStopped()) throw new IllegalStateException("This instance has been stopped");
270254
}
271255

272256
private BatchWriteSet newBatchWriteSet() {
@@ -278,12 +262,8 @@ private BatchWriteSet newBatchWriteSet(long batchNum) {
278262
int hostToUse = (int) (batchNum % hostInfos.length);
279263
HostInfo host = hostInfos[hostToUse];
280264
BatchWriteSet batchWriteSet = new BatchWriteSet(this, host.client, getTransform(), getTemporalCollection(), batchNum);
281-
batchWriteSet.onSuccess( () -> {
282-
sendSuccessToListeners(batchWriteSet);
283-
});
284-
batchWriteSet.onFailure( (throwable) -> {
285-
sendThrowableToListeners(throwable, "Error writing batch: {}", batchWriteSet);
286-
});
265+
batchWriteSet.onSuccess( () -> sendSuccessToListeners(batchWriteSet));
266+
batchWriteSet.onFailure(throwable -> sendThrowableToListeners(throwable, batchWriteSet));
287267
return batchWriteSet;
288268
}
289269

@@ -311,7 +291,7 @@ public void retry(WriteBatch batch) {
311291
}
312292

313293
private void retry(WriteBatch batch, boolean callFailListeners) {
314-
if ( isStopped() == true ) {
294+
if (isStopped()) {
315295
logger.warn("Job is now stopped, aborting the retry");
316296
return;
317297
}
@@ -385,9 +365,9 @@ private void flush(boolean waitForCompletion) {
385365
}
386366
Iterator<DocumentWriteOperation> iter = docs.iterator();
387367
for ( int i=0; iter.hasNext(); i++ ) {
388-
if ( isStopped() == true ) {
368+
if (isStopped()) {
389369
logger.warn("Job is now stopped, preventing the flush of {} queued docs", docs.size() - i);
390-
if ( waitForCompletion == true ) awaitCompletion();
370+
if (waitForCompletion) awaitCompletion();
391371
return;
392372
}
393373
BatchWriteSet writeSet = newBatchWriteSet();
@@ -402,7 +382,7 @@ private void flush(boolean waitForCompletion) {
402382
threadPool.submit( new BatchWriter(writeSet) );
403383
}
404384

405-
if ( waitForCompletion == true ) awaitCompletion();
385+
if (waitForCompletion) awaitCompletion();
406386
}
407387

408388
private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
@@ -417,7 +397,7 @@ private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
417397
}
418398
}
419399

420-
private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet batchWriteSet) {
400+
private void sendThrowableToListeners(Throwable t, BatchWriteSet batchWriteSet) {
421401
batchWriteSet.setItemsSoFar(itemsSoFar.get());
422402
WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
423403
for ( WriteFailureListener failureListener : failureListeners ) {
@@ -427,7 +407,7 @@ private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet
427407
logger.error("Exception thrown by an onBatchFailure listener", t2);
428408
}
429409
}
430-
if ( message != null ) logger.warn(message, t.toString());
410+
logger.warn("Error writing batch: {}", t.toString());
431411
}
432412

433413
@Override
@@ -606,15 +586,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
606586
for ( Runnable task : tasks ) {
607587
if ( task instanceof BatchWriter ) {
608588
BatchWriter writerTask = (BatchWriter) task;
609-
if ( removedHostInfos.containsKey(writerTask.getBatchWriteSet().getClient().getHost()) ) {
589+
if ( removedHostInfos.containsKey(writerTask.batchWriteSet().getClient().getHost()) ) {
610590
// this batch was targeting a host that's no longer on the list
611591
// if we re-add these docs they'll now be in batches that target acceptable hosts
612-
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getBatchWriteSet().getBatchNumber());
592+
BatchWriteSet writeSet = newBatchWriteSet(writerTask.batchWriteSet().getBatchNumber());
613593
writeSet.onFailure(throwable -> {
614594
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
615595
else throw new DataMovementException("Failed to retry batch after failover", throwable);
616596
});
617-
for ( WriteEvent doc : writerTask.getBatchWriteSet().getBatchOfWriteEvents().getItems() ) {
597+
for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) {
618598
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
619599
}
620600
BatchWriter retryWriterTask = new BatchWriter(writeSet);

marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.marklogic.client.DatabaseClient;
77
import com.marklogic.junit5.AbstractMarkLogicTest;
8+
import org.junit.jupiter.api.AfterEach;
89

910
/**
1011
* Intended to be the base class for all future client API tests, as it properly prepares the database by deleting
@@ -25,4 +26,11 @@ protected final String getJavascriptForDeletingDocumentsBeforeTestRuns() {
2526
.toArray().forEach(item => xdmp.documentDelete(item))
2627
""";
2728
}
29+
30+
@AfterEach
31+
void releaseClient() {
32+
if (Common.client != null) {
33+
Common.client.release();
34+
}
35+
}
2836
}

0 commit comments

Comments
 (0)