Skip to content

Commit 9e646a9

Browse files
committed
Fix early termination in LuceneSourceOperator
1 parent 2bda4c1 commit 9e646a9

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void collect(int doc) throws IOException {
140140

141141
@Override
142142
public boolean isFinished() {
143-
return doneCollecting;
143+
return doneCollecting || remainingDocs == 0;
144144
}
145145

146146
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.compute.operator.Driver;
2626
import org.elasticsearch.compute.operator.DriverContext;
2727
import org.elasticsearch.compute.operator.Operator;
28+
import org.elasticsearch.compute.operator.SourceOperator;
2829
import org.elasticsearch.compute.test.AnyOperatorTestCase;
2930
import org.elasticsearch.compute.test.OperatorTestCase;
3031
import org.elasticsearch.compute.test.TestResultPageSinkOperator;
@@ -117,6 +118,27 @@ public void testShardDataPartitioning() {
117118
testSimple(driverContext(), size, limit);
118119
}
119120

121+
public void testEarlyTermination() {
122+
int size = between(1_000, 20_000);
123+
int limit = between(10, size);
124+
LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), size, limit, scoring);
125+
try (SourceOperator sourceOperator = factory.get(driverContext())) {
126+
assertFalse(sourceOperator.isFinished());
127+
int collected = 0;
128+
while (sourceOperator.isFinished() == false) {
129+
Page page = sourceOperator.getOutput();
130+
if (page != null) {
131+
collected += page.getPositionCount();
132+
page.releaseBlocks();
133+
}
134+
if (collected >= limit) {
135+
assertTrue("source operator is not finished after reaching limit", sourceOperator.isFinished());
136+
assertThat(collected, equalTo(limit));
137+
}
138+
}
139+
}
140+
}
141+
120142
public void testEmpty() {
121143
testSimple(driverContext(), 0, between(10, 10_000));
122144
}

0 commit comments

Comments
 (0)