Skip to content

Commit 723cb3d

Browse files
committed
MLE-14416 Added info logging for start/stop in DMSDK batchers.
Intent is to give us better visibility in the NiFi connector in particular as to when a WriteBatcher stops. Added a few methods to avoid direct access of the two AtomicBoolean variables.
1 parent 58ac3e6 commit 723cb3d

File tree

5 files changed

+39
-18
lines changed

5 files changed

+39
-18
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatcherImpl.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.datamovement.*;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
import java.util.*;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325

2426
public abstract class BatcherImpl implements Batcher {
27+
28+
private final Logger logger = LoggerFactory.getLogger(getClass());
29+
2530
private String jobName = "unnamed";
2631
private String jobId = null;
2732
private int batchSize = 100;
@@ -31,6 +36,7 @@ public abstract class BatcherImpl implements Batcher {
3136
private JobTicket jobTicket;
3237
private Calendar jobStartTime;
3338
private Calendar jobEndTime;
39+
3440
private final AtomicBoolean stopped = new AtomicBoolean(false);
3541
private final AtomicBoolean started = new AtomicBoolean(false);
3642

@@ -136,19 +142,32 @@ void setJobEndTime() {
136142
this.jobEndTime = Calendar.getInstance();
137143
}
138144

139-
AtomicBoolean getStarted() {
140-
return this.started;
141-
}
142145
@Override
143146
public boolean isStarted() {
144147
return started.get();
145148
}
149+
146150
@Override
147151
public boolean isStopped() {
148152
return stopped.get();
149153
}
150-
AtomicBoolean getStopped() {
151-
return this.stopped;
154+
155+
final void setStartedToTrue() {
156+
logger.info("Setting 'started' to true.");
157+
this.started.set(true);
158+
}
159+
160+
final void setStoppedToTrue() {
161+
logger.info("Setting 'stopped' to true.");
162+
this.stopped.set(true);
163+
}
164+
165+
final boolean isStoppedTrue() {
166+
// This method is necessary as calling "isStopped()" results in different behavior in QueryBatcherImpl, where
167+
// that method has been overridden to inspect the thread pool status instead. It's not clear why that was done,
168+
// so this preserves the existing behavior where the value of `stopped` is check in multiple places (it would seem
169+
// that in all of those places, calling "isStopped()" would be preferable).
170+
return this.stopped.get() == true;
152171
}
153172

154173
protected DataMovementManagerImpl getMoveMgr() {

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ public JobReport getJobReport(JobTicket ticket) {
7878
@Override
7979
public void stopJob(JobTicket ticket) {
8080
if ( ticket == null ) throw new IllegalArgumentException("ticket must not be null");
81+
logger.info("Stopping {} job with ID: {}", ticket.getJobType(), ticket.getJobId());
8182
service.stopJob(ticket, activeJobs);
8283
}
8384

8485
@Override
8586
public void stopJob(Batcher batcher) {
8687
if ( batcher == null ) throw new IllegalArgumentException("batcher must not be null");
88+
logger.info("Stopping batcher; job name: {}; job ID: {}", batcher.getJobName(), batcher.getJobId());
8789
service.stopJob(batcher, activeJobs);
8890
}
8991

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ public synchronized void start(JobTicket ticket) {
445445
urisReadyListener.initializeListener(this);
446446
}
447447
super.setJobStartTime();
448-
super.getStarted().set(true);
448+
setStartedToTrue();
449449
if(this.maxBatches < Long.MAX_VALUE) {
450450
setMaxUris(getMaxBatches());
451451
}
@@ -720,7 +720,7 @@ private class QueryTask implements Runnable {
720720

721721
public void run() {
722722
// don't proceed if this job is stopped (because dataMovementManager.stopJob was called)
723-
if (batcher.getStopped().get() == true) {
723+
if (batcher.isStoppedTrue()) {
724724
logger.warn("Cancelling task to query forest '{}' forestBatchNum {} with start {} after the job is stopped",
725725
forest.getForestName(), forestBatchNum, start);
726726
return;
@@ -906,7 +906,7 @@ private void processDocs(QueryBatchImpl batch) {
906906
}
907907

908908
private void launchNextTask() {
909-
if (batcher.getStopped().get() == true ) {
909+
if (batcher.isStoppedTrue()) {
910910
// we're stopping, so don't do anything more
911911
return;
912912
}
@@ -1059,7 +1059,7 @@ private void startIterating() {
10591059

10601060
@Override
10611061
public void stop() {
1062-
super.getStopped().set(true);
1062+
setStoppedToTrue();
10631063
if ( threadPool != null ) threadPool.shutdownNow();
10641064
super.setJobEndTime();
10651065
if ( query != null ) {
@@ -1102,7 +1102,7 @@ private void closeAllListeners() {
11021102
}
11031103

11041104
protected void finalize() {
1105-
if (this.getStopped().get() == false ) {
1105+
if (!isStoppedTrue()) {
11061106
logger.warn("QueryBatcher instance \"{}\" was never cleanly stopped. You should call dataMovementManager.stopJob.",
11071107
getJobName());
11081108
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,14 @@ private void requireStarted(String msg) {
350350

351351
@Override
352352
public void stop() {
353-
if (super.getStopped().get()) return;
354-
super.getStopped().set(true);
353+
if (isStoppedTrue()) return;
354+
setStoppedToTrue();
355355
if (threadPool != null) threadPool.shutdownNow();
356356
super.setJobEndTime();
357357
}
358358
private void orderlyStop() {
359-
if (super.getStopped().get()) return;
360-
super.getStopped().set(true);
359+
if (isStoppedTrue()) return;
360+
setStoppedToTrue();
361361
if (threadPool != null) threadPool.shutdown();
362362
super.setJobEndTime();
363363
}
@@ -409,7 +409,7 @@ public synchronized void start(JobTicket ticket) {
409409

410410
super.setJobTicket(ticket);
411411
super.setJobStartTime();
412-
super.getStarted().set(true);
412+
setStartedToTrue();
413413

414414
for (int i=0; i<super.getThreadCount(); i++) {
415415
ContentHandle<T> threadHandle = rowsHandle.newHandle();
@@ -528,7 +528,7 @@ private boolean readRows(RowBatchCallable<T> callable) {
528528
private boolean shouldRequestBatch(RowBatchFailureEventImpl requestEvent, int batchRetries) {
529529
if (batchRetries == 0) return true; // first request
530530
if (requestEvent == null) return false; // request succeeded
531-
if (super.getStopped().get()) return false; // stopped
531+
if (isStoppedTrue()) return false; // stopped
532532
// whether to retry request
533533
return (requestEvent.getDisposition() == RowBatchFailureListener.BatchFailureDisposition.RETRY &&
534534
batchRetries < requestEvent.getMaxRetries());

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void initialize() {
190190
logger.debug("batchSize={}", getBatchSize());
191191
}
192192
super.setJobStartTime();
193-
super.getStarted().set(true);
193+
setStartedToTrue();
194194
}
195195
}
196196

@@ -459,7 +459,7 @@ public void start(JobTicket ticket) {
459459
@Override
460460
public void stop() {
461461
super.setJobEndTime();
462-
super.getStopped().set(true);
462+
setStoppedToTrue();
463463
if ( threadPool != null ) threadPool.shutdownNow();
464464
closeAllListeners();
465465
}

0 commit comments

Comments
 (0)