File tree Expand file tree Collapse file tree 1 file changed +26
-35
lines changed Expand file tree Collapse file tree 1 file changed +26
-35
lines changed Original file line number Diff line number Diff line change @@ -44,9 +44,12 @@ public <T> List<WatchResponse> chunkedPush(
4444) {
4545 List< WatchResponse> responses = new ArrayList<> ();
4646 List< T> records = new ArrayList<> ();
47+ Int count = 0;
4748
4849 for (T item : objects) {
49- if (records.size() == batchSize) {
50+ if (records.size() == batchSize || count == objects.length() - 1) {
51+ count++;
52+
5053 WatchResponse watch =
5154 this.push(
5255 indexName,
@@ -60,42 +63,30 @@ public <T> List<WatchResponse> chunkedPush(
6063 }
6164
6265 records.add(item);
63- }
64-
65- if (records.size() > 0) {
66- WatchResponse watch =
67- this.push(
68- indexName,
69- new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
70- waitForTasks,
71- referenceIndexName,
72- requestOptions
73- );
74- responses.add(watch);
75- }
7666
77- if (waitForTasks) {
78- responses.forEach(response -> {
79- TaskUtils.retryUntil(
80- () -> {
81- try {
82- return this.getEvent(response.getRunID(), response.getEventID());
83- } catch (AlgoliaApiException e) {
84- if (e.getStatusCode() == 404) {
85- return null;
67+ if (waitForTasks && (responses.size() % 50 == 0 || count == objects.length() - 1)) {
68+ responses.subList(Math.max(responses.size() - 50, 0), responses.size()).forEach(response -> {
69+ TaskUtils.retryUntil(
70+ () -> {
71+ try {
72+ return this.getEvent(response.getRunID(), response.getEventID());
73+ } catch (AlgoliaApiException e) {
74+ if (e.getStatusCode() == 404) {
75+ return null;
76+ }
77+
78+ throw e;
8679 }
87-
88- throw e;
89- }
90- },
91- (Event resp) -> {
92- return resp != null;
93- } ,
94- 50,
95- null
96- );
97- }
98- );
80+ },
81+ (Event resp) -> {
82+ return resp != null;
83+ } ,
84+ 50,
85+ null
86+ );
87+ }
88+ );
89+ }
9990 }
10091
10192 return responses;
You can’t perform that action at this time.
0 commit comments