Skip to content

Commit 8e8eb72

Browse files
committed
tests
1 parent 85f8a71 commit 8e8eb72

File tree

11 files changed

+42
-56
lines changed

11 files changed

+42
-56
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.index.mapper.BlockLoader;
1717
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
1818
import org.elasticsearch.index.mapper.SourceLoader;
19-
import org.elasticsearch.logging.LogManager;
2019
import org.elasticsearch.search.fetch.StoredFieldsSpec;
2120

2221
import java.io.IOException;
@@ -85,9 +84,7 @@ void run(int offset) throws IOException {
8584
read(firstDoc, shard);
8685

8786
int i = offset + 1;
88-
while (i < forwards.length
89-
// && estimatedMemory() < LARGE_BLOCK_BYTES NOCOMMIT
90-
) {
87+
while (i < forwards.length) {
9188
p = forwards[i];
9289
shard = docs.shards().getInt(p);
9390
segment = docs.segments().getInt(p);
@@ -114,7 +111,6 @@ private void buildBlocks(int offset, int end) {
114111
*/
115112
positions = Arrays.copyOfRange(positions, offset, end);
116113
}
117-
LogManager.getLogger(ValuesSourceReaderOperator.class).error("AFDAF {} {} {}", offset, end, positions.length);
118114
for (int f = 0; f < target.length; f++) {
119115
for (int s = 0; s < operator.shardContexts.size(); s++) {
120116
if (builders[f][s] != null) {
@@ -124,7 +120,6 @@ private void buildBlocks(int offset, int end) {
124120
}
125121
}
126122
try (Block targetBlock = fieldTypeBuilders[f].build()) {
127-
LogManager.getLogger(ValuesSourceReaderOperator.class).error("AFDAF {}", targetBlock);
128123
target[f] = targetBlock.filter(positions);
129124
}
130125
operator.sanityCheckBlock(rowStride[f], positions.length, target[f], f);
@@ -148,21 +143,6 @@ private void read(int doc, int shard) throws IOException {
148143
}
149144
}
150145

151-
/**
152-
* An overestimate of the memory used by all builders.
153-
*/
154-
private long estimatedMemory() {
155-
long total = 0;
156-
for (Block.Builder[] builders : builders) {
157-
for (Block.Builder builder : builders) {
158-
if (builder != null) {
159-
total += builder.estimatedBytes();
160-
}
161-
}
162-
}
163-
return total;
164-
}
165-
166146
@Override
167147
public void close() {
168148
Releasables.closeExpectNoException(fieldTypeBuilders);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class ValuesFromSingleReader extends ValuesReader {
4747
this.segment = docs.segments().getInt(0);
4848
}
4949

50-
5150
@Override
5251
protected void load(Block[] target, int offset) throws IOException {
5352
assert offset == 0; // NOCOMMIT implement me
@@ -111,7 +110,7 @@ private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IO
111110
}
112111
}
113112

114-
if (rowStrideReaders.isEmpty()) {
113+
if (rowStrideReaders.isEmpty() == false) {
115114
loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs);
116115
}
117116
} finally {
@@ -150,9 +149,7 @@ private void loadFromRowStrideReaders(
150149
sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
151150
);
152151
int p = 0;
153-
while (p < docs.count()
154-
// && estimatedMemory(rowStrideReaders) < ValuesSourceReaderOperator.LARGE_BLOCK_BYTES NOCOMMIT
155-
) {
152+
while (p < docs.count()) {
156153
int doc = docs.get(p++);
157154
storedFields.advanceTo(doc);
158155
for (RowStrideReaderWork work : rowStrideReaders) {
@@ -165,17 +162,6 @@ private void loadFromRowStrideReaders(
165162
}
166163
}
167164

168-
/**
169-
* An overestimate of the memory used by all builders.
170-
*/
171-
private long estimatedMemory(List<RowStrideReaderWork> rowStrideReaders) {
172-
long total = 0;
173-
for (RowStrideReaderWork work : rowStrideReaders) {
174-
total += work.builder.estimatedBytes();
175-
}
176-
return total;
177-
}
178-
179165
/**
180166
* Is it more efficient to use a sequential stored field reader
181167
* when reading stored fields for the documents contained in {@code docIds}?
@@ -193,7 +179,7 @@ private record ColumnAtATimeReaderWork(BlockLoader.ColumnAtATimeReader reader, i
193179

194180
private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset)
195181
implements
196-
Releasable {
182+
Releasable {
197183
void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException {
198184
reader.read(doc, storedFields, builder);
199185
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.index.IndexReader;
1111
import org.apache.lucene.index.LeafReaderContext;
12-
import org.elasticsearch.common.unit.ByteSizeValue;
1312
import org.elasticsearch.compute.data.Block;
1413
import org.elasticsearch.compute.data.BlockFactory;
1514
import org.elasticsearch.compute.data.DocBlock;
@@ -20,10 +19,8 @@
2019
import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
2120
import org.elasticsearch.compute.operator.DriverContext;
2221
import org.elasticsearch.compute.operator.Operator;
23-
import org.elasticsearch.core.Releasable;
2422
import org.elasticsearch.core.ReleasableIterator;
2523
import org.elasticsearch.index.mapper.BlockLoader;
26-
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
2724
import org.elasticsearch.index.mapper.SourceLoader;
2825
import org.elasticsearch.search.fetch.StoredFieldsSpec;
2926

@@ -39,9 +36,6 @@
3936
* and outputs them to a new column.
4037
*/
4138
public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOperator {
42-
// NOCOMMIT javadoc
43-
static final long LARGE_BLOCK_BYTES = ByteSizeValue.ofMb(2).getBytes();
44-
4539
/**
4640
* Creates a factory for {@link ValuesSourceReaderOperator}.
4741
* @param fields fields to load

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import java.util.Map;
2222
import java.util.Objects;
2323

24-
import static org.elasticsearch.TransportVersions.*;
24+
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
25+
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
26+
import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES;
2527

2628
public class ValuesSourceReaderOperatorStatus extends AbstractPageMappingToIteratorOperator.Status {
2729
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
@@ -42,7 +44,7 @@ public ValuesSourceReaderOperatorStatus(
4244
long rowsEmitted,
4345
long valuesLoaded
4446
) {
45-
super(processNanos, pagesEmitted, pagesReceived, rowsReceived, rowsEmitted);
47+
super(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
4648
this.readersBuilt = readersBuilt;
4749
this.valuesLoaded = valuesLoaded;
4850
}
@@ -70,7 +72,15 @@ static ValuesSourceReaderOperatorStatus readFrom(StreamInput in) throws IOExcept
7072
}
7173
Map<String, Integer> readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
7274
long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
73-
return new ValuesSourceReaderOperatorStatus(readersBuilt, processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted, valuesLoaded);
75+
return new ValuesSourceReaderOperatorStatus(
76+
readersBuilt,
77+
processNanos,
78+
pagesReceived,
79+
pagesEmitted,
80+
rowsReceived,
81+
rowsEmitted,
82+
valuesLoaded
83+
);
7484
}
7585

7686
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,24 @@ public final void addInput(Page page) {
111111
if (next != null) {
112112
assert next.hasNext() == false : "has pending input page";
113113
next.close();
114+
next = null;
114115
}
115116
if (page.getPositionCount() == 0) {
116117
return;
117118
}
118-
next = new RuntimeTrackingIterator(receive(page));
119-
pagesReceived++;
120-
rowsReceived += page.getPositionCount();
119+
try {
120+
next = new RuntimeTrackingIterator(receive(page));
121+
pagesReceived++;
122+
rowsReceived += page.getPositionCount();
123+
} finally {
124+
if (next == null) {
125+
/*
126+
* The `receive` operation failed, we need to release the incoming page
127+
* because it's no longer owned by anyone.
128+
*/
129+
page.releaseBlocks();
130+
}
131+
}
121132
}
122133

123134
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class LuceneTopNSourceOperatorScoringTests extends LuceneTopNSourceOperat
5353
private IndexReader reader;
5454

5555
@After
56-
private void closeIndex() throws IOException {
56+
public void closeScoringIndex() throws IOException {
5757
IOUtils.close(reader, directory);
5858
}
5959

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class LuceneTopNSourceOperatorTests extends AnyOperatorTestCase {
5858
private IndexReader reader;
5959

6060
@After
61-
private void closeIndex() throws IOException {
61+
public void closeIndex() throws IOException {
6262
IOUtils.close(reader, directory);
6363
}
6464

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@
4848
import org.elasticsearch.compute.data.LongBlock;
4949
import org.elasticsearch.compute.data.LongVector;
5050
import org.elasticsearch.compute.data.Page;
51-
import org.elasticsearch.compute.lucene.*;
51+
import org.elasticsearch.compute.lucene.DataPartitioning;
52+
import org.elasticsearch.compute.lucene.LuceneOperator;
53+
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
54+
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
55+
import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests;
56+
import org.elasticsearch.compute.lucene.ShardContext;
5257
import org.elasticsearch.compute.operator.Driver;
5358
import org.elasticsearch.compute.operator.DriverContext;
5459
import org.elasticsearch.compute.operator.DriverRunner;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.elasticsearch.common.io.stream.Writeable;
1313
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
1414
import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests;
15-
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
1615
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
16+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
1717
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
1818
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1919
import org.elasticsearch.test.ESTestCase;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.elasticsearch.common.io.stream.Writeable;
1313
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
1414
import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests;
15-
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
1615
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
16+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
1717
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
1818
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperatorStatusTests;
1919
import org.elasticsearch.test.AbstractWireSerializingTestCase;

0 commit comments

Comments
 (0)