Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ static TransportVersion def(int id) {
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public abstract class LuceneOperator extends SourceOperator {
/**
* Count of the number of slices processed.
*/
private int processedSlices;
int processedSlices;
final int maxPageSize;
private final LuceneSliceQueue sliceQueue;

private final Set<Query> processedQueries = new HashSet<>();
private final Set<String> processedShards = new HashSet<>();
final Set<Query> processedQueries = new HashSet<>();
final Set<String> processedShards = new HashSet<>();

private LuceneSlice currentSlice;
private int sliceIndex;
Expand All @@ -75,7 +75,7 @@ public abstract class LuceneOperator extends SourceOperator {
/**
* Count of rows this operator has emitted.
*/
private long rowsEmitted;
long rowsEmitted;

protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
this.blockFactory = blockFactory;
Expand Down Expand Up @@ -277,7 +277,7 @@ public static class Status implements Operator.Status {
private final long rowsEmitted;
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;

private Status(LuceneOperator operator) {
protected Status(LuceneOperator operator) {
processedSlices = operator.processedSlices;
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
processNanos = operator.processingNanos;
Expand Down Expand Up @@ -447,6 +447,11 @@ public long documentsFound() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentFields(builder, params);
return builder.endObject();
}

protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.field("processed_slices", processedSlices);
builder.field("processed_queries", processedQueries);
builder.field("processed_shards", processedShards);
Expand All @@ -462,7 +467,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("current", current);
builder.field("rows_emitted", rowsEmitted);
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
return builder.endObject();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefVector;
Expand All @@ -25,21 +30,25 @@
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public final class TimeSeriesSourceOperator extends SourceOperator {
public final class TimeSeriesSourceOperator extends LuceneOperator {

private final boolean emitDocIds;
private final int maxPageSize;
Expand All @@ -55,6 +64,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
private final List<ValuesSourceReaderOperator.FieldInfo> fieldsToExtracts;
private ShardLevelFieldsReader fieldsReader;
private DocIdCollector docCollector;
private long tsidsLoaded;

TimeSeriesSourceOperator(
BlockFactory blockFactory,
Expand All @@ -64,6 +74,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
int maxPageSize,
int limit
) {
super(blockFactory, maxPageSize, sliceQueue);
this.maxPageSize = maxPageSize;
this.blockFactory = blockFactory;
this.fieldsToExtracts = fieldsToExtract;
Expand All @@ -85,7 +96,7 @@ public boolean isFinished() {
}

@Override
public Page getOutput() {
public Page getCheckedOutput() throws IOException {
if (isFinished()) {
return null;
}
Expand All @@ -97,6 +108,7 @@ public Page getOutput() {

Page page = null;
Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
long startInNanos = System.nanoTime();
try {
if (iterator == null) {
var slice = sliceQueue.nextSlice();
Expand Down Expand Up @@ -130,6 +142,8 @@ public Page getOutput() {
currentPagePos = 0;
}
if (iterator.completed()) {
processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier());
processedSlices++;
Releasables.close(docCollector, fieldsReader);
iterator = null;
}
Expand All @@ -139,6 +153,7 @@ public Page getOutput() {
if (page == null) {
Releasables.closeExpectNoException(blocks);
}
processingNanos += System.nanoTime() - startInNanos;
}
return page;
}
Expand All @@ -162,6 +177,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
}
};
Weight weight = luceneSlice.weight();
processedQueries.add(weight.getQuery());
int maxSegmentOrd = 0;
for (var leafReaderContext : luceneSlice.leaves()) {
LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
Expand Down Expand Up @@ -237,6 +253,9 @@ private PriorityQueue<LeafIterator> subQueueForNextTsid() {
break;
}
}
if (oneTsidQueue.size() > 0) {
++tsidsLoaded;
}
}
return oneTsidQueue;
}
Expand Down Expand Up @@ -306,11 +325,6 @@ void reinitializeIfNeeded(Thread executingThread) throws IOException {
}
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
}

static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
BlockLoaderFactory(BlockFactory factory) {
super(factory);
Expand Down Expand Up @@ -571,4 +585,118 @@ public void close() {
Releasables.close(docsBuilder, segmentsBuilder);
}
}

@Override
protected void describe(StringBuilder sb) {
sb.append("[" + "maxPageSize=").append(maxPageSize).append(", remainingDocs=").append(remainingDocs).append("]");
}

@Override
public Operator.Status status() {
final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields
return new Status(this, tsidsLoaded, valuesLoaded);
}

public static class Status extends LuceneOperator.Status {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably deserves it's own little serialization test. I know it's kind of a pain, but it's helpful paranoia.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"time_series_source",
Status::new
);

private final long tsidLoaded;
private final long valuesLoaded;

Status(TimeSeriesSourceOperator operator, long tsidLoaded, long valuesLoaded) {
super(operator);
this.tsidLoaded = tsidLoaded;
this.valuesLoaded = valuesLoaded;
}

Status(
int processedSlices,
Set<String> processedQueries,
Set<String> processedShards,
long processNanos,
int sliceIndex,
int totalSlices,
int pagesEmitted,
int sliceMin,
int sliceMax,
int current,
long rowsEmitted,
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies,
long tsidLoaded,
long valuesLoaded
) {
super(
processedSlices,
processedQueries,
processedShards,
processNanos,
sliceIndex,
totalSlices,
pagesEmitted,
sliceMin,
sliceMax,
current,
rowsEmitted,
partitioningStrategies
);
this.tsidLoaded = tsidLoaded;
this.valuesLoaded = valuesLoaded;
}

Status(StreamInput in) throws IOException {
super(in);
this.tsidLoaded = in.readVLong();
this.valuesLoaded = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(tsidLoaded);
out.writeVLong(valuesLoaded);
}

@Override
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
super.toXContentFields(builder, params);
builder.field("tsid_loaded", tsidLoaded);
builder.field("values_loaded", valuesLoaded);
}

public long tsidLoaded() {
return tsidLoaded;
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public boolean supportsVersion(TransportVersion version) {
return version.onOrAfter(TransportVersions.ESQL_TIME_SERIES_SOURCE_STATUS);
}

@Override
public long valuesLoaded() {
return valuesLoaded;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
Status status = (Status) o;
return tsidLoaded == status.tsidLoaded && valuesLoaded == status.valuesLoaded;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), tsidLoaded, valuesLoaded);
}
}
}
Loading