Skip to content

Commit fc142df

Browse files
committed
Rename and fix bug
1 parent 229e253 commit fc142df

File tree

8 files changed

+30
-21
lines changed

8 files changed

+30
-21
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ protected Page getCheckedOutput() throws IOException {
121121
remainingDocs = 0;
122122
} else {
123123
if (scorer.tags().isEmpty() == false) {
124-
throw new UnsupportedOperationException("extra not supported by " + getClass());
124+
throw new UnsupportedOperationException("tags not supported by " + getClass());
125125
}
126126
Weight weight = scorer.weight();
127127
var leafReaderContext = scorer.leafReaderContext();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public Page getCheckedOutput() throws IOException {
103103
remainingDocs = 0;
104104
} else {
105105
if (scorer.tags().isEmpty() == false) {
106-
throw new UnsupportedOperationException("extra not supported by " + getClass());
106+
throw new UnsupportedOperationException("tags not supported by " + getClass());
107107
}
108108
final LeafReader reader = scorer.leafReaderContext().reader();
109109
final Query query = scorer.weight().getQuery();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
155155
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
156156
logger.trace("Starting {}", partialLeaf);
157157
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
158-
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
158+
if (currentScorer == null // First time
159+
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
160+
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
161+
) {
159162
final Weight weight = currentSlice.weight();
160163
processedQueries.add(weight.getQuery());
161-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.extra(), leaf);
164+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
162165
}
163166
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
164167
currentScorer.maxPosition = partialLeaf.maxDoc();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> extra) {
17+
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
1818
int numLeaves() {
1919
return leaves.size();
2020
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ public Page getCheckedOutput() throws IOException {
318318
final int discardedDocs = collectedDocs - limiter.tryAccumulateHits(collectedDocs);
319319
Page page = null;
320320
if (currentPagePos >= minPageSize || scorer.isDone() || (remainingDocs = limiter.remaining()) == 0) {
321-
log.error("SADADSF {}", remainingDocs);
322321
IntVector shard = null;
323322
IntVector leaf = null;
324323
IntVector docs = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private Page collect() throws IOException {
172172
}
173173
try {
174174
if (scorer.tags().isEmpty() == false) {
175-
throw new UnsupportedOperationException("extra not supported by " + getClass());
175+
throw new UnsupportedOperationException("tags not supported by " + getClass());
176176
}
177177
if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
178178
// TODO: share the bottom between shardCollectors

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ public Page getCheckedOutput() throws IOException {
116116
doneCollecting = true;
117117
return null;
118118
}
119-
if (slice.extra().isEmpty() == false) {
120-
throw new UnsupportedOperationException("extra not supported by " + getClass());
119+
if (slice.tags().isEmpty() == false) {
120+
throw new UnsupportedOperationException("tags not supported by " + getClass());
121121
}
122122
Releasables.close(fieldsReader);
123123
fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99

1010
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1111

12-
import com.carrotsearch.randomizedtesting.annotations.Repeat;
13-
14-
import com.carrotsearch.randomizedtesting.annotations.Seed;
15-
1612
import org.apache.lucene.document.SortedNumericDocValuesField;
1713
import org.apache.lucene.index.IndexReader;
1814
import org.apache.lucene.index.IndexableField;
@@ -67,8 +63,6 @@
6763
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6864
import static org.hamcrest.Matchers.matchesRegex;
6965

70-
//@Seed("4FF2CB98F60FD89D:EBBB701671985F2B")
71-
@Repeat(iterations = 100)
7266
public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
7367
private static final MappedFieldType S_FIELD = new NumberFieldMapper.NumberFieldType("s", NumberFieldMapper.NumberType.LONG);
7468

@@ -210,14 +204,21 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i
210204
}
211205
}
212206
reader = writer.getReader();
207+
208+
IndexSearcher searcher = new IndexSearcher(reader);
209+
int count = 0;
210+
for (LuceneSliceQueue.QueryAndTags q : testCase.queryAndExtra()) {
211+
count += searcher.count(q.query());
212+
}
213+
assertThat(count, equalTo(testCase.numResults(numDocs)));
213214
} catch (IOException e) {
214215
throw new RuntimeException(e);
215216
}
216217

217218
ShardContext ctx = new MockShardContext(reader, 0);
218219
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> testCase.queryAndExtra();
219220
int maxPageSize = between(10, Math.max(10, numDocs));
220-
int taskConcurrency = 4; // randomIntBetween(1, 4); NOCOMMIT
221+
int taskConcurrency = randomIntBetween(1, 4);
221222
return new LuceneSourceOperator.Factory(
222223
List.of(ctx),
223224
queryFunction,
@@ -268,17 +269,17 @@ private void testSimple(DataPartitioning partitioning) {
268269
}
269270

270271
public void testEarlyTermination() {
271-
int numDocs = 20_000; //between(1_000, 20_000); NOCOMMIT
272-
int limit = 100_000; //between(0, numDocs * 2); NOCOMMIT
273-
LuceneSourceOperator.Factory factory = simple(DataPartitioning.DOC
274-
// randomFrom(DataPartitioning.values()) NOCOMMIT
275-
, numDocs, limit, scoring);
272+
int numDocs = between(1_000, 20_000);
273+
int limit = between(0, numDocs * 2);
274+
LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), numDocs, limit, scoring);
276275
int taskConcurrency = factory.taskConcurrency();
277276
final AtomicInteger receivedRows = new AtomicInteger();
277+
List<SourceOperator> sources = new ArrayList<>();
278278
List<Driver> drivers = new ArrayList<>();
279279
for (int i = 0; i < taskConcurrency; i++) {
280280
DriverContext driverContext = driverContext();
281281
SourceOperator sourceOperator = factory.get(driverContext);
282+
sources.add(sourceOperator);
282283
SinkOperator sinkOperator = new PageConsumerOperator(p -> {
283284
receivedRows.addAndGet(p.getPositionCount());
284285
p.releaseBlocks();
@@ -301,6 +302,9 @@ public void testEarlyTermination() {
301302
drivers.add(driver);
302303
}
303304
OperatorTestCase.runDriver(drivers);
305+
for (SourceOperator source : sources) {
306+
logger.info("source status {}", source.status());
307+
}
304308
logger.info(
305309
"{} received={} limit={} numResults={}",
306310
factory.dataPartitioning,
@@ -376,6 +380,9 @@ private void testSimple(DriverContext ctx, DataPartitioning partitioning, int nu
376380
}
377381

378382
testCase.checkPages(numDocs, limit, factory.maxPageSize(), results);
383+
int count = results.stream().mapToInt(Page::getPositionCount).sum();
384+
logger.info("{} received={} limit={} numResults={}", factory.dataPartitioning, count, limit, testCase.numResults(numDocs));
385+
assertThat(count, equalTo(Math.min(limit, testCase.numResults(numDocs))));
379386
}
380387

381388
// Returns the initial block index, ignoring the score block if scoring is enabled

0 commit comments

Comments
 (0)