Skip to content

Commit 108034b

Browse files
dnhatnywangd
authored andcommitted
Add status for time-series source operator (elastic#127932)
This change provides more detailed status information for the TimeSeriesSourceOperator. ```json { "processed_slices" : 1, "processed_queries" : [ "*:*" ], "processed_shards" : [ "test" ], "process_nanos" : 13315249, "process_time" : "13.3ms", "slice_index" : 0, "total_slices" : 1, "pages_emitted" : 1, "slice_min" : 0, "slice_max" : 0, "current" : 0, "rows_emitted" : 6, "partitioning_strategies" : { "test" : "SHARD" }, "tsid_loaded" : 1, "values_loaded" : 18 } ```
1 parent d28015a commit 108034b

File tree

6 files changed

+428
-25
lines changed

6 files changed

+428
-25
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ static TransportVersion def(int id) {
252252
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
253253
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
254254
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
255+
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
255256

256257
/*
257258
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public abstract class LuceneOperator extends SourceOperator {
5757
/**
5858
* Count of the number of slices processed.
5959
*/
60-
private int processedSlices;
60+
int processedSlices;
6161
final int maxPageSize;
6262
private final LuceneSliceQueue sliceQueue;
6363

64-
private final Set<Query> processedQueries = new HashSet<>();
65-
private final Set<String> processedShards = new HashSet<>();
64+
final Set<Query> processedQueries = new HashSet<>();
65+
final Set<String> processedShards = new HashSet<>();
6666

6767
private LuceneSlice currentSlice;
6868
private int sliceIndex;
@@ -75,7 +75,7 @@ public abstract class LuceneOperator extends SourceOperator {
7575
/**
7676
* Count of rows this operator has emitted.
7777
*/
78-
private long rowsEmitted;
78+
long rowsEmitted;
7979

8080
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
8181
this.blockFactory = blockFactory;
@@ -277,7 +277,7 @@ public static class Status implements Operator.Status {
277277
private final long rowsEmitted;
278278
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
279279

280-
private Status(LuceneOperator operator) {
280+
protected Status(LuceneOperator operator) {
281281
processedSlices = operator.processedSlices;
282282
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
283283
processNanos = operator.processingNanos;
@@ -447,6 +447,11 @@ public long documentsFound() {
447447
@Override
448448
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
449449
builder.startObject();
450+
toXContentFields(builder, params);
451+
return builder.endObject();
452+
}
453+
454+
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
450455
builder.field("processed_slices", processedSlices);
451456
builder.field("processed_queries", processedQueries);
452457
builder.field("processed_shards", processedShards);
@@ -462,7 +467,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
462467
builder.field("current", current);
463468
builder.field("rows_emitted", rowsEmitted);
464469
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
465-
return builder.endObject();
466470
}
467471

468472
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
import org.apache.lucene.search.Weight;
1818
import org.apache.lucene.util.BytesRef;
1919
import org.apache.lucene.util.PriorityQueue;
20+
import org.elasticsearch.TransportVersion;
21+
import org.elasticsearch.TransportVersions;
22+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
2025
import org.elasticsearch.compute.data.Block;
2126
import org.elasticsearch.compute.data.BlockFactory;
2227
import org.elasticsearch.compute.data.BytesRefVector;
@@ -25,21 +30,25 @@
2530
import org.elasticsearch.compute.data.LongVector;
2631
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2732
import org.elasticsearch.compute.data.Page;
28-
import org.elasticsearch.compute.operator.SourceOperator;
33+
import org.elasticsearch.compute.operator.Operator;
2934
import org.elasticsearch.core.Releasable;
3035
import org.elasticsearch.core.Releasables;
3136
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
3237
import org.elasticsearch.index.mapper.BlockLoader;
3338
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
3439
import org.elasticsearch.index.mapper.SourceLoader;
3540
import org.elasticsearch.search.fetch.StoredFieldsSpec;
41+
import org.elasticsearch.xcontent.XContentBuilder;
3642

3743
import java.io.IOException;
3844
import java.io.UncheckedIOException;
3945
import java.util.Arrays;
4046
import java.util.List;
47+
import java.util.Map;
48+
import java.util.Objects;
49+
import java.util.Set;
4150

42-
public final class TimeSeriesSourceOperator extends SourceOperator {
51+
public final class TimeSeriesSourceOperator extends LuceneOperator {
4352

4453
private final boolean emitDocIds;
4554
private final int maxPageSize;
@@ -55,6 +64,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
5564
private final List<ValuesSourceReaderOperator.FieldInfo> fieldsToExtracts;
5665
private ShardLevelFieldsReader fieldsReader;
5766
private DocIdCollector docCollector;
67+
private long tsidsLoaded;
5868

5969
TimeSeriesSourceOperator(
6070
BlockFactory blockFactory,
@@ -64,6 +74,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
6474
int maxPageSize,
6575
int limit
6676
) {
77+
super(blockFactory, maxPageSize, sliceQueue);
6778
this.maxPageSize = maxPageSize;
6879
this.blockFactory = blockFactory;
6980
this.fieldsToExtracts = fieldsToExtract;
@@ -85,7 +96,7 @@ public boolean isFinished() {
8596
}
8697

8798
@Override
88-
public Page getOutput() {
99+
public Page getCheckedOutput() throws IOException {
89100
if (isFinished()) {
90101
return null;
91102
}
@@ -97,6 +108,7 @@ public Page getOutput() {
97108

98109
Page page = null;
99110
Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
111+
long startInNanos = System.nanoTime();
100112
try {
101113
if (iterator == null) {
102114
var slice = sliceQueue.nextSlice();
@@ -130,6 +142,8 @@ public Page getOutput() {
130142
currentPagePos = 0;
131143
}
132144
if (iterator.completed()) {
145+
processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier());
146+
processedSlices++;
133147
Releasables.close(docCollector, fieldsReader);
134148
iterator = null;
135149
}
@@ -139,6 +153,7 @@ public Page getOutput() {
139153
if (page == null) {
140154
Releasables.closeExpectNoException(blocks);
141155
}
156+
processingNanos += System.nanoTime() - startInNanos;
142157
}
143158
return page;
144159
}
@@ -162,6 +177,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
162177
}
163178
};
164179
Weight weight = luceneSlice.weight();
180+
processedQueries.add(weight.getQuery());
165181
int maxSegmentOrd = 0;
166182
for (var leafReaderContext : luceneSlice.leaves()) {
167183
LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
@@ -237,6 +253,9 @@ private PriorityQueue<LeafIterator> subQueueForNextTsid() {
237253
break;
238254
}
239255
}
256+
if (oneTsidQueue.size() > 0) {
257+
++tsidsLoaded;
258+
}
240259
}
241260
return oneTsidQueue;
242261
}
@@ -306,11 +325,6 @@ void reinitializeIfNeeded(Thread executingThread) throws IOException {
306325
}
307326
}
308327

309-
@Override
310-
public String toString() {
311-
return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
312-
}
313-
314328
static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
315329
BlockLoaderFactory(BlockFactory factory) {
316330
super(factory);
@@ -571,4 +585,118 @@ public void close() {
571585
Releasables.close(docsBuilder, segmentsBuilder);
572586
}
573587
}
588+
589+
@Override
590+
protected void describe(StringBuilder sb) {
591+
sb.append("[" + "maxPageSize=").append(maxPageSize).append(", remainingDocs=").append(remainingDocs).append("]");
592+
}
593+
594+
@Override
595+
public Operator.Status status() {
596+
final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields
597+
return new Status(this, tsidsLoaded, valuesLoaded);
598+
}
599+
600+
public static class Status extends LuceneOperator.Status {
601+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
602+
Operator.Status.class,
603+
"time_series_source",
604+
Status::new
605+
);
606+
607+
private final long tsidLoaded;
608+
private final long valuesLoaded;
609+
610+
Status(TimeSeriesSourceOperator operator, long tsidLoaded, long valuesLoaded) {
611+
super(operator);
612+
this.tsidLoaded = tsidLoaded;
613+
this.valuesLoaded = valuesLoaded;
614+
}
615+
616+
Status(
617+
int processedSlices,
618+
Set<String> processedQueries,
619+
Set<String> processedShards,
620+
long processNanos,
621+
int sliceIndex,
622+
int totalSlices,
623+
int pagesEmitted,
624+
int sliceMin,
625+
int sliceMax,
626+
int current,
627+
long rowsEmitted,
628+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies,
629+
long tsidLoaded,
630+
long valuesLoaded
631+
) {
632+
super(
633+
processedSlices,
634+
processedQueries,
635+
processedShards,
636+
processNanos,
637+
sliceIndex,
638+
totalSlices,
639+
pagesEmitted,
640+
sliceMin,
641+
sliceMax,
642+
current,
643+
rowsEmitted,
644+
partitioningStrategies
645+
);
646+
this.tsidLoaded = tsidLoaded;
647+
this.valuesLoaded = valuesLoaded;
648+
}
649+
650+
Status(StreamInput in) throws IOException {
651+
super(in);
652+
this.tsidLoaded = in.readVLong();
653+
this.valuesLoaded = in.readVLong();
654+
}
655+
656+
@Override
657+
public void writeTo(StreamOutput out) throws IOException {
658+
super.writeTo(out);
659+
out.writeVLong(tsidLoaded);
660+
out.writeVLong(valuesLoaded);
661+
}
662+
663+
@Override
664+
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
665+
super.toXContentFields(builder, params);
666+
builder.field("tsid_loaded", tsidLoaded);
667+
builder.field("values_loaded", valuesLoaded);
668+
}
669+
670+
public long tsidLoaded() {
671+
return tsidLoaded;
672+
}
673+
674+
@Override
675+
public String getWriteableName() {
676+
return ENTRY.name;
677+
}
678+
679+
@Override
680+
public boolean supportsVersion(TransportVersion version) {
681+
return version.onOrAfter(TransportVersions.ESQL_TIME_SERIES_SOURCE_STATUS);
682+
}
683+
684+
@Override
685+
public long valuesLoaded() {
686+
return valuesLoaded;
687+
}
688+
689+
@Override
690+
public boolean equals(Object o) {
691+
if (o == null || getClass() != o.getClass()) return false;
692+
if (super.equals(o) == false) return false;
693+
Status status = (Status) o;
694+
return tsidLoaded == status.tsidLoaded && valuesLoaded == status.valuesLoaded;
695+
}
696+
697+
@Override
698+
public int hashCode() {
699+
return Objects.hash(super.hashCode(), tsidLoaded, valuesLoaded);
700+
}
701+
}
574702
}

0 commit comments

Comments
 (0)