File tree Expand file tree Collapse file tree 1 file changed +17
-14
lines changed Expand file tree Collapse file tree 1 file changed +17
-14
lines changed Original file line number Diff line number Diff line change @@ -44,6 +44,7 @@ public <T> List<WatchResponse> chunkedPush(
4444) {
4545 List< WatchResponse> responses = new ArrayList<> ();
4646 List< T> records = new ArrayList<> ();
47+ int offset = 0;
4748 int waitBatchSize = batchSize / 10;
4849 if (waitBatchSize < 1) {
4950 waitBatchSize = batchSize;
@@ -53,23 +54,24 @@ public <T> List<WatchResponse> chunkedPush(
5354 T current = it.next();
5455
5556 while (true) {
57+ records.add(current);
58+
5659 if (records.size() == batchSize || ! it.hasNext()) {
57- WatchResponse watch =
58- this.push(
59- indexName,
60- new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
61- waitForTasks,
62- referenceIndexName,
63- requestOptions
64- );
60+ WatchResponse watch = this.push(
61+ indexName,
62+ new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
63+ waitForTasks,
64+ referenceIndexName,
65+ requestOptions
66+ );
6567 responses.add(watch);
6668 records.clear();
6769 }
6870
69- records.add(current);
70-
71- if (waitForTasks && (responses.size() % waitBatchSize == 0 || !it.hasNext ())) {
72- responses.subList(Math.max(responses.size() - waitBatchSize, 0), responses.size()) .forEach(response -> {
71+ if (waitForTasks && responses.size() > 0 && (responses.size() % waitBatchSize == 0 || !it.hasNext())) {
72+ responses
73+ .subList(offset, Math.min(offset + waitBatchSize, responses.size ()))
74+ .forEach(response -> {
7375 TaskUtils.retryUntil(
7476 () -> {
7577 try {
@@ -88,8 +90,9 @@ public <T> List<WatchResponse> chunkedPush(
8890 50,
8991 null
9092 );
91- }
92- );
93+ });
94+
95+ offset += waitBatchSize;
9396 }
9497
9598 if (!it.hasNext()) {
You can’t perform that action at this time.
0 commit comments