Skip to content

Commit 564958e

Browse files
authored
Merge pull request #1551 from marklogic/feature/rowBatcher-refactor
Small cleanup of RowBatcherImpl
2 parents 6290640 + 33aa419 commit 564958e

File tree

1 file changed

+34
-27
lines changed

1 file changed

+34
-27
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.marklogic.client.io.marker.StructureReadHandle;
3232
import com.marklogic.client.row.RawPlanDefinition;
3333
import com.marklogic.client.row.RawQueryDSLPlan;
34-
import com.marklogic.client.row.RawSQLPlan;
3534
import com.marklogic.client.row.RowManager;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
@@ -57,23 +56,21 @@ class RowBatcherImpl<T> extends BatcherImpl implements RowBatcher<T> {
5756
private final AtomicLong failedBatches = new AtomicLong(0);
5857
private final AtomicInteger runningThreads = new AtomicInteger(0);
5958
private RowBatchFailureListener[] failureListeners;
60-
private RowBatchSuccessListener[] sucessListeners;
61-
62-
private String schemaName;
63-
private String viewName;
59+
private RowBatchSuccessListener[] successListeners;
6460

6561
private RawPlanDefinition pagedPlan;
6662
private long rowCount = 0;
6763

68-
private RowManager defaultRowManager;
69-
private ContentHandle<T> rowsHandle;
70-
private Class<T> rowsClass;
7164
private HostInfo[] hostInfos;
7265

7366
private boolean consistentSnapshot = false;
7467
private final AtomicLong serverTimestamp = new AtomicLong(-1);
7568

76-
RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle<T> rowsHandle) {
69+
private final ContentHandle<T> rowsHandle;
70+
private final Class<T> rowsClass;
71+
private final RowManager defaultRowManager;
72+
73+
RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle<T> rowsHandle) {
7774
super(moveMgr);
7875
if (rowsHandle == null)
7976
throw new IllegalArgumentException("Cannot create RowBatcher with null rows manager");
@@ -124,22 +121,32 @@ public RowBatcher<T> withBatchView(RawQueryDSLPlan viewPlan) {
124121
analyzePlan(viewPlan.getHandle());
125122
return this;
126123
}
127-
private void analyzePlan(AbstractWriteHandle initialPlan) {
124+
125+
/**
126+
* Calls the MarkLogic internal/viewinfo endpoint to obtain two critical items - the estimate of matching rows,
127+
* and a modified version of the user's plan that includes "lower bounds" and "upper bounds" parameters. The
128+
* estimate of matching rows allows for partitions to be defined based on the user-provided thread count.
129+
* The user's modified plan is then run with a lower/upper bounds row ID value based on the calculated partitions.
130+
*
131+
* @param userPlan
132+
*/
133+
private void analyzePlan(AbstractWriteHandle userPlan) {
128134
requireNotStarted("Must specify batch view before starting job");
129135

130136
DatabaseClientImpl client = (DatabaseClientImpl) getPrimaryClient();
131137
JsonNode viewInfo = client.getServices().postResource(
132-
null, "internal/viewinfo", null, null, initialPlan, new JacksonHandle()
138+
null, "internal/viewinfo", null, null, userPlan, new JacksonHandle()
133139
).get();
134-
// System.out.println(viewInfo.toPrettyString());
135140

136-
JsonNode schemaNode = viewInfo.get("schemaName");
137-
this.schemaName = (schemaNode != null) ? schemaNode.asText(null) : null;
138-
this.viewName = viewInfo.get("viewName").asText(null);
139141
this.rowCount = viewInfo.get("rowCount").asLong(0);
140142
this.pagedPlan = getRowManager().newRawPlanDefinition(new JacksonHandle(viewInfo.get("modifiedPlan")));
143+
144+
JsonNode schemaNode = viewInfo.get("schemaName");
141145
logger.info("plan analysis schema name: {}, view name: {}, row estimate: {}",
142-
this.schemaName, this.viewName, this.rowCount);
146+
(schemaNode != null) ? schemaNode.asText(null) : null,
147+
viewInfo.get("viewName").asText(null),
148+
this.rowCount
149+
);
143150
}
144151

145152
@Override
@@ -159,12 +166,12 @@ public RowBatcher<T> withThreadCount(int threadCount) {
159166
public RowBatcher<T> onSuccess(RowBatchSuccessListener listener) {
160167
requireNotStarted("Must set success listener before starting job");
161168
if (listener == null) {
162-
sucessListeners = null;
163-
} else if (sucessListeners == null || sucessListeners.length == 0) {
164-
sucessListeners = new RowBatchSuccessListener[]{listener};
169+
successListeners = null;
170+
} else if (successListeners == null || successListeners.length == 0) {
171+
successListeners = new RowBatchSuccessListener[]{listener};
165172
} else {
166-
sucessListeners = Arrays.copyOf(sucessListeners, sucessListeners.length + 1);
167-
sucessListeners[sucessListeners.length - 1] = listener;
173+
successListeners = Arrays.copyOf(successListeners, successListeners.length + 1);
174+
successListeners[successListeners.length - 1] = listener;
168175
}
169176
return this;
170177
}
@@ -207,7 +214,7 @@ public RowBatcher<T> withConsistentSnapshot() {
207214

208215
@Override
209216
public RowBatchSuccessListener[] getSuccessListeners() {
210-
return sucessListeners;
217+
return successListeners;
211218
}
212219
@Override
213220
public RowBatchFailureListener[] getFailureListeners() {
@@ -216,7 +223,7 @@ public RowBatchFailureListener[] getFailureListeners() {
216223
@Override
217224
public void setSuccessListeners(RowBatchSuccessListener... listeners) {
218225
requireNotStarted("Must set success listeners before starting job");
219-
this.sucessListeners = listeners;
226+
this.successListeners = listeners;
220227
}
221228
@Override
222229
public void setFailureListeners(RowBatchFailureListener... listeners) {
@@ -228,10 +235,10 @@ private void initRequestEvent(RowBatchEventImpl event) {
228235
event.withJobTicket(getJobTicket());
229236
}
230237
private void notifySuccess(RowBatchSuccessListener.RowBatchResponseEvent<T> event) {
231-
if (sucessListeners == null || sucessListeners.length == 0) return;
232-
for (RowBatchSuccessListener sucessListener: sucessListeners) {
238+
if (successListeners == null || successListeners.length == 0) return;
239+
for (RowBatchSuccessListener successListener: successListeners) {
233240
try {
234-
sucessListener.processEvent(event);
241+
successListener.processEvent(event);
235242
} catch(Throwable e) {
236243
logger.info("error in success listener: {}", e.toString());
237244
}
@@ -354,7 +361,7 @@ public synchronized void start(JobTicket ticket) {
354361
if (this.pagedPlan == null)
355362
throw new IllegalStateException("Plan must be supplied before starting the job");
356363

357-
if (sucessListeners == null || sucessListeners.length == 0)
364+
if (successListeners == null || successListeners.length == 0)
358365
throw new IllegalStateException("No listener for rows");
359366

360367
if (failureListeners == null || failureListeners.length == 0) {

0 commit comments

Comments
 (0)