Skip to content

Commit fe22ef3

Browse files
committed
Add status for time-series source operator
1 parent 8a662a6 commit fe22ef3

File tree

5 files changed

+204
-24
lines changed

5 files changed

+204
-24
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ static TransportVersion def(int id) {
252252
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
253253
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
254254
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
255+
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
255256

256257
/*
257258
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public abstract class LuceneOperator extends SourceOperator {
5757
/**
5858
* Count of the number of slices processed.
5959
*/
60-
private int processedSlices;
60+
int processedSlices;
6161
final int maxPageSize;
6262
private final LuceneSliceQueue sliceQueue;
6363

64-
private final Set<Query> processedQueries = new HashSet<>();
65-
private final Set<String> processedShards = new HashSet<>();
64+
final Set<Query> processedQueries = new HashSet<>();
65+
final Set<String> processedShards = new HashSet<>();
6666

6767
private LuceneSlice currentSlice;
6868
private int sliceIndex;
@@ -75,7 +75,7 @@ public abstract class LuceneOperator extends SourceOperator {
7575
/**
7676
* Count of rows this operator has emitted.
7777
*/
78-
private long rowsEmitted;
78+
long rowsEmitted;
7979

8080
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
8181
this.blockFactory = blockFactory;
@@ -277,7 +277,7 @@ public static class Status implements Operator.Status {
277277
private final long rowsEmitted;
278278
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
279279

280-
private Status(LuceneOperator operator) {
280+
protected Status(LuceneOperator operator) {
281281
processedSlices = operator.processedSlices;
282282
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
283283
processNanos = operator.processingNanos;
@@ -447,6 +447,11 @@ public long documentsFound() {
447447
@Override
448448
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
449449
builder.startObject();
450+
toXContentFields(builder, params);
451+
return builder.endObject();
452+
}
453+
454+
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
450455
builder.field("processed_slices", processedSlices);
451456
builder.field("processed_queries", processedQueries);
452457
builder.field("processed_shards", processedShards);
@@ -462,7 +467,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
462467
builder.field("current", current);
463468
builder.field("rows_emitted", rowsEmitted);
464469
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
465-
return builder.endObject();
466470
}
467471

468472
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
import org.apache.lucene.search.Weight;
1818
import org.apache.lucene.util.BytesRef;
1919
import org.apache.lucene.util.PriorityQueue;
20+
import org.elasticsearch.TransportVersion;
21+
import org.elasticsearch.TransportVersions;
22+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
2025
import org.elasticsearch.compute.data.Block;
2126
import org.elasticsearch.compute.data.BlockFactory;
2227
import org.elasticsearch.compute.data.BytesRefVector;
@@ -25,21 +30,23 @@
2530
import org.elasticsearch.compute.data.LongVector;
2631
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2732
import org.elasticsearch.compute.data.Page;
28-
import org.elasticsearch.compute.operator.SourceOperator;
33+
import org.elasticsearch.compute.operator.Operator;
2934
import org.elasticsearch.core.Releasable;
3035
import org.elasticsearch.core.Releasables;
3136
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
3237
import org.elasticsearch.index.mapper.BlockLoader;
3338
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
3439
import org.elasticsearch.index.mapper.SourceLoader;
3540
import org.elasticsearch.search.fetch.StoredFieldsSpec;
41+
import org.elasticsearch.xcontent.XContentBuilder;
3642

3743
import java.io.IOException;
3844
import java.io.UncheckedIOException;
3945
import java.util.Arrays;
4046
import java.util.List;
47+
import java.util.Objects;
4148

42-
public final class TimeSeriesSourceOperator extends SourceOperator {
49+
public final class TimeSeriesSourceOperator extends LuceneOperator {
4350

4451
private final boolean emitDocIds;
4552
private final int maxPageSize;
@@ -55,6 +62,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
5562
private final List<ValuesSourceReaderOperator.FieldInfo> fieldsToExtracts;
5663
private ShardLevelFieldsReader fieldsReader;
5764
private DocIdCollector docCollector;
65+
private long tsidsLoaded;
5866

5967
TimeSeriesSourceOperator(
6068
BlockFactory blockFactory,
@@ -64,6 +72,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
6472
int maxPageSize,
6573
int limit
6674
) {
75+
super(blockFactory, maxPageSize, sliceQueue);
6776
this.maxPageSize = maxPageSize;
6877
this.blockFactory = blockFactory;
6978
this.fieldsToExtracts = fieldsToExtract;
@@ -85,7 +94,7 @@ public boolean isFinished() {
8594
}
8695

8796
@Override
88-
public Page getOutput() {
97+
public Page getCheckedOutput() throws IOException {
8998
if (isFinished()) {
9099
return null;
91100
}
@@ -97,6 +106,7 @@ public Page getOutput() {
97106

98107
Page page = null;
99108
Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
109+
long startInNanos = System.nanoTime();
100110
try {
101111
if (iterator == null) {
102112
var slice = sliceQueue.nextSlice();
@@ -130,6 +140,8 @@ public Page getOutput() {
130140
currentPagePos = 0;
131141
}
132142
if (iterator.completed()) {
143+
processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier());
144+
processedSlices++;
133145
Releasables.close(docCollector, fieldsReader);
134146
iterator = null;
135147
}
@@ -139,6 +151,7 @@ public Page getOutput() {
139151
if (page == null) {
140152
Releasables.closeExpectNoException(blocks);
141153
}
154+
processingNanos += System.nanoTime() - startInNanos;
142155
}
143156
return page;
144157
}
@@ -162,6 +175,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
162175
}
163176
};
164177
Weight weight = luceneSlice.weight();
178+
processedQueries.add(weight.getQuery());
165179
int maxSegmentOrd = 0;
166180
for (var leafReaderContext : luceneSlice.leaves()) {
167181
LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
@@ -237,6 +251,9 @@ private PriorityQueue<LeafIterator> subQueueForNextTsid() {
237251
break;
238252
}
239253
}
254+
if (oneTsidQueue.size() > 0) {
255+
++tsidsLoaded;
256+
}
240257
}
241258
return oneTsidQueue;
242259
}
@@ -306,11 +323,6 @@ void reinitializeIfNeeded(Thread executingThread) throws IOException {
306323
}
307324
}
308325

309-
@Override
310-
public String toString() {
311-
return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
312-
}
313-
314326
static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
315327
BlockLoaderFactory(BlockFactory factory) {
316328
super(factory);
@@ -571,4 +583,84 @@ public void close() {
571583
Releasables.close(docsBuilder, segmentsBuilder);
572584
}
573585
}
586+
587+
@Override
588+
protected void describe(StringBuilder sb) {
589+
sb.append("[" + "maxPageSize=").append(maxPageSize).append(", remainingDocs=").append(remainingDocs).append("]");
590+
}
591+
592+
@Override
593+
public Operator.Status status() {
594+
final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields
595+
return new Status(this, tsidsLoaded, valuesLoaded);
596+
}
597+
598+
public static class Status extends LuceneOperator.Status {
599+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
600+
Operator.Status.class,
601+
"time_series_source",
602+
Status::new
603+
);
604+
605+
private final long tsidLoaded;
606+
private final long valuesLoaded;
607+
608+
Status(TimeSeriesSourceOperator operator, long tsidLoaded, long valuesLoaded) {
609+
super(operator);
610+
this.tsidLoaded = tsidLoaded;
611+
this.valuesLoaded = valuesLoaded;
612+
}
613+
614+
Status(StreamInput in) throws IOException {
615+
super(in);
616+
this.tsidLoaded = in.readVLong();
617+
this.valuesLoaded = in.readVLong();
618+
}
619+
620+
@Override
621+
public void writeTo(StreamOutput out) throws IOException {
622+
super.writeTo(out);
623+
out.writeVLong(tsidLoaded);
624+
out.writeVLong(valuesLoaded);
625+
}
626+
627+
@Override
628+
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
629+
super.toXContentFields(builder, params);
630+
builder.field("tsid_loaded", tsidLoaded);
631+
builder.field("values_loaded", valuesLoaded);
632+
}
633+
634+
public long tsidLoaded() {
635+
return tsidLoaded;
636+
}
637+
638+
@Override
639+
public String getWriteableName() {
640+
return ENTRY.name;
641+
}
642+
643+
@Override
644+
public boolean supportsVersion(TransportVersion version) {
645+
return version.onOrAfter(TransportVersions.ESQL_TIME_SERIES_SOURCE_STATUS);
646+
}
647+
648+
@Override
649+
public long valuesLoaded() {
650+
return valuesLoaded;
651+
}
652+
653+
@Override
654+
public boolean equals(Object o) {
655+
if (o == null || getClass() != o.getClass()) return false;
656+
if (super.equals(o) == false) return false;
657+
Status status = (Status) o;
658+
return tsidLoaded == status.tsidLoaded && valuesLoaded == status.valuesLoaded;
659+
}
660+
661+
@Override
662+
public int hashCode() {
663+
return Objects.hash(super.hashCode(), tsidLoaded, valuesLoaded);
664+
}
665+
}
574666
}

0 commit comments

Comments
 (0)