Skip to content

Commit 320bf8e

Browse files
committed
Implement simple pre-fetching for index enumerators
Signed-off-by: Simeon Widdis <[email protected]>
1 parent 2ab5002 commit 320bf8e

File tree

1 file changed

+34
-5
lines changed

1 file changed

+34
-5
lines changed

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.util.Collections;
99
import java.util.Iterator;
1010
import java.util.List;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutionException;
1113
import lombok.EqualsAndHashCode;
1214
import lombok.ToString;
1315
import org.apache.calcite.linq4j.Enumerator;
@@ -53,6 +55,9 @@ public class OpenSearchIndexEnumerator implements Enumerator<Object> {
5355

5456
private ExprValue current;
5557

58+
private CompletableFuture<OpenSearchResponse> nextBatchFuture;
59+
private boolean isLastBatch = false;
60+
5661
public OpenSearchIndexEnumerator(
5762
OpenSearchClient client,
5863
List<String> fields,
@@ -69,14 +74,29 @@ public OpenSearchIndexEnumerator(
6974
if (!this.monitor.isHealthy()) {
7075
throw new NonFallbackCalciteException("insufficient resources to run the query, quit.");
7176
}
77+
this.nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request));
7278
}
7379

7480
private void fetchNextBatch() {
75-
OpenSearchResponse response = client.search(request);
76-
if (!response.isEmpty()) {
77-
iterator = response.iterator();
78-
} else if (iterator == null) {
79-
iterator = Collections.emptyIterator();
81+
try {
82+
// Get results from the pre-fetched batch
83+
OpenSearchResponse response = nextBatchFuture.get();
84+
85+
if (!response.isEmpty()) {
86+
iterator = response.iterator();
87+
88+
// If we haven't hit the end, start pre-fetching next batch
89+
if (!isLastBatch) {
90+
nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request));
91+
}
92+
} else {
93+
if (iterator == null) {
94+
iterator = Collections.emptyIterator();
95+
}
96+
isLastBatch = true;
97+
}
98+
} catch (InterruptedException | ExecutionException e) {
99+
throw new NonFallbackCalciteException("Error fetching batch: " + e.getMessage());
80100
}
81101
}
82102

@@ -98,6 +118,7 @@ private Object resolveForCalcite(ExprValue value, String rawPath) {
98118
@Override
99119
public boolean moveNext() {
100120
if (queryCount >= maxResponseSize) {
121+
isLastBatch = true;
101122
return false;
102123
}
103124

@@ -114,17 +135,21 @@ public boolean moveNext() {
114135
queryCount++;
115136
return true;
116137
} else {
138+
isLastBatch = true;
117139
return false;
118140
}
119141
}
120142

121143
@Override
122144
public void reset() {
145+
isLastBatch = false;
146+
nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request));
123147
OpenSearchResponse response = client.search(request);
124148
if (!response.isEmpty()) {
125149
iterator = response.iterator();
126150
} else {
127151
iterator = Collections.emptyIterator();
152+
isLastBatch = true;
128153
}
129154
queryCount = 0;
130155
}
@@ -133,6 +158,10 @@ public void reset() {
133158
public void close() {
134159
iterator = Collections.emptyIterator();
135160
queryCount = 0;
161+
isLastBatch = true;
162+
if (nextBatchFuture != null) {
163+
nextBatchFuture.cancel(true);
164+
}
136165
client.cleanup(request);
137166
}
138167
}

0 commit comments

Comments
 (0)