33 */
44package com .marklogic .client .datamovement .impl ;
55
6- import java .util .*;
7- import java .util .concurrent .BlockingQueue ;
8- import java .util .concurrent .ConcurrentHashMap ;
9- import java .util .concurrent .ConcurrentLinkedQueue ;
10- import java .util .concurrent .LinkedBlockingQueue ;
11- import java .util .concurrent .ThreadPoolExecutor ;
12- import java .util .concurrent .TimeUnit ;
13- import java .util .concurrent .atomic .AtomicLong ;
14- import java .util .stream .Stream ;
15-
16- import org .slf4j .Logger ;
17- import org .slf4j .LoggerFactory ;
18-
196import com .marklogic .client .DatabaseClient ;
207import com .marklogic .client .DatabaseClientFactory ;
8+ import com .marklogic .client .datamovement .*;
219import com .marklogic .client .document .DocumentWriteOperation ;
22- import com .marklogic .client .document .ServerTransform ;
2310import com .marklogic .client .document .DocumentWriteOperation .OperationType ;
24- import com .marklogic .client .io . DocumentMetadataHandle ;
11+ import com .marklogic .client .document . ServerTransform ;
2512import com .marklogic .client .impl .DocumentWriteOperationImpl ;
2613import com .marklogic .client .impl .Utilities ;
14+ import com .marklogic .client .io .DocumentMetadataHandle ;
2715import com .marklogic .client .io .marker .AbstractWriteHandle ;
2816import com .marklogic .client .io .marker .ContentHandle ;
2917import com .marklogic .client .io .marker .DocumentMetadataWriteHandle ;
18+ import org .slf4j .Logger ;
19+ import org .slf4j .LoggerFactory ;
3020
31- import com .marklogic .client .datamovement .DataMovementException ;
32- import com .marklogic .client .datamovement .DataMovementManager ;
33- import com .marklogic .client .datamovement .Forest ;
34- import com .marklogic .client .datamovement .ForestConfiguration ;
35- import com .marklogic .client .datamovement .JobTicket ;
36- import com .marklogic .client .datamovement .WriteBatch ;
37- import com .marklogic .client .datamovement .WriteBatchListener ;
38- import com .marklogic .client .datamovement .WriteEvent ;
39- import com .marklogic .client .datamovement .WriteFailureListener ;
40- import com .marklogic .client .datamovement .WriteBatcher ;
21+ import java .util .*;
22+ import java .util .concurrent .*;
23+ import java .util .concurrent .atomic .AtomicLong ;
24+ import java .util .stream .Stream ;
4125
4226/**
4327 * The implementation of WriteBatcher.
@@ -254,19 +238,19 @@ public WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle
254238 }
255239
256240 private void requireInitialized () {
257- if ( initialized == false ) {
241+ if (! initialized ) {
258242 throw new IllegalStateException ("This operation must be called after starting this job" );
259243 }
260244 }
261245
262246 private void requireNotInitialized () {
263- if ( initialized == true ) {
247+ if (initialized ) {
264248 throw new IllegalStateException ("Configuration cannot be changed after starting this job or calling add or addAs" );
265249 }
266250 }
267251
268252 private void requireNotStopped () {
269- if ( isStopped () == true ) throw new IllegalStateException ("This instance has been stopped" );
253+ if (isStopped ()) throw new IllegalStateException ("This instance has been stopped" );
270254 }
271255
272256 private BatchWriteSet newBatchWriteSet () {
@@ -278,12 +262,8 @@ private BatchWriteSet newBatchWriteSet(long batchNum) {
278262 int hostToUse = (int ) (batchNum % hostInfos .length );
279263 HostInfo host = hostInfos [hostToUse ];
280264 BatchWriteSet batchWriteSet = new BatchWriteSet (this , host .client , getTransform (), getTemporalCollection (), batchNum );
281- batchWriteSet .onSuccess ( () -> {
282- sendSuccessToListeners (batchWriteSet );
283- });
284- batchWriteSet .onFailure ( (throwable ) -> {
285- sendThrowableToListeners (throwable , "Error writing batch: {}" , batchWriteSet );
286- });
265+ batchWriteSet .onSuccess ( () -> sendSuccessToListeners (batchWriteSet ));
266+ batchWriteSet .onFailure (throwable -> sendThrowableToListeners (throwable , batchWriteSet ));
287267 return batchWriteSet ;
288268 }
289269
@@ -311,7 +291,7 @@ public void retry(WriteBatch batch) {
311291 }
312292
313293 private void retry (WriteBatch batch , boolean callFailListeners ) {
314- if ( isStopped () == true ) {
294+ if (isStopped ()) {
315295 logger .warn ("Job is now stopped, aborting the retry" );
316296 return ;
317297 }
@@ -385,9 +365,9 @@ private void flush(boolean waitForCompletion) {
385365 }
386366 Iterator <DocumentWriteOperation > iter = docs .iterator ();
387367 for ( int i =0 ; iter .hasNext (); i ++ ) {
388- if ( isStopped () == true ) {
368+ if (isStopped ()) {
389369 logger .warn ("Job is now stopped, preventing the flush of {} queued docs" , docs .size () - i );
390- if ( waitForCompletion == true ) awaitCompletion ();
370+ if (waitForCompletion ) awaitCompletion ();
391371 return ;
392372 }
393373 BatchWriteSet writeSet = newBatchWriteSet ();
@@ -402,7 +382,7 @@ private void flush(boolean waitForCompletion) {
402382 threadPool .submit ( new BatchWriter (writeSet ) );
403383 }
404384
405- if ( waitForCompletion == true ) awaitCompletion ();
385+ if (waitForCompletion ) awaitCompletion ();
406386 }
407387
408388 private void sendSuccessToListeners (BatchWriteSet batchWriteSet ) {
@@ -417,7 +397,7 @@ private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
417397 }
418398 }
419399
420- private void sendThrowableToListeners (Throwable t , String message , BatchWriteSet batchWriteSet ) {
400+ private void sendThrowableToListeners (Throwable t , BatchWriteSet batchWriteSet ) {
421401 batchWriteSet .setItemsSoFar (itemsSoFar .get ());
422402 WriteBatch batch = batchWriteSet .getBatchOfWriteEvents ();
423403 for ( WriteFailureListener failureListener : failureListeners ) {
@@ -427,7 +407,7 @@ private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet
427407 logger .error ("Exception thrown by an onBatchFailure listener" , t2 );
428408 }
429409 }
430- if ( message != null ) logger .warn (message , t .toString ());
410+ logger .warn ("Error writing batch: {}" , t .toString ());
431411 }
432412
433413 @ Override
@@ -606,15 +586,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
606586 for ( Runnable task : tasks ) {
607587 if ( task instanceof BatchWriter ) {
608588 BatchWriter writerTask = (BatchWriter ) task ;
609- if ( removedHostInfos .containsKey (writerTask .getBatchWriteSet ().getClient ().getHost ()) ) {
589+ if ( removedHostInfos .containsKey (writerTask .batchWriteSet ().getClient ().getHost ()) ) {
610590 // this batch was targeting a host that's no longer on the list
611591 // if we re-add these docs they'll now be in batches that target acceptable hosts
612- BatchWriteSet writeSet = newBatchWriteSet (writerTask .getBatchWriteSet ().getBatchNumber ());
592+ BatchWriteSet writeSet = newBatchWriteSet (writerTask .batchWriteSet ().getBatchNumber ());
613593 writeSet .onFailure (throwable -> {
614594 if ( throwable instanceof RuntimeException ) throw (RuntimeException ) throwable ;
615595 else throw new DataMovementException ("Failed to retry batch after failover" , throwable );
616596 });
617- for ( WriteEvent doc : writerTask .getBatchWriteSet ().getBatchOfWriteEvents ().getItems () ) {
597+ for ( WriteEvent doc : writerTask .batchWriteSet ().getBatchOfWriteEvents ().getItems () ) {
618598 writeSet .getDocumentWriteSet ().add (doc .getTargetUri (), doc .getMetadata (), doc .getContent ());
619599 }
620600 BatchWriter retryWriterTask = new BatchWriter (writeSet );
0 commit comments