Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00);
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00);
public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS = def(9_116_0_00);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public String describe() {
/**
* Total nanos for emitting the output
*/
protected long emitNanos;
private long emitNanos;

@SuppressWarnings("this-escape")
public HashAggregationOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.compute.Describable;
Expand All @@ -36,7 +42,9 @@
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -100,6 +108,23 @@ public String describe() {

private boolean finished = false;

/**
* Nanoseconds this operator has spent processing the rows.
*/
private long processNanos;
/**
* Count of pages this operator has processed.
*/
private int pagesProcessed;
/**
* Count of rows this operator has received.
*/
private long rowsReceived;
/**
* Count of rows this operator has emitted.
*/
private long rowsEmitted;

// used to extract and aggregate values
private final int maxPageSize;
private ValuesAggregator valuesAggregator;
Expand Down Expand Up @@ -135,6 +160,7 @@ public boolean needsInput() {
public void addInput(Page page) {
checkState(needsInput(), "Operator is already finishing");
requireNonNull(page, "page is null");
long start = System.nanoTime();
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
final int shardIndex = docVector.shards().getInt(0);
RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex);
Expand Down Expand Up @@ -184,6 +210,9 @@ public void addInput(Page page) {
if (pagePassed == false) {
Releasables.closeExpectNoException(page::releaseBlocks);
}
pagesProcessed++;
rowsReceived += page.getPositionCount();
processNanos += System.nanoTime() - start;
}
}

Expand All @@ -208,25 +237,28 @@ public Page getOutput() {
if (finished == false) {
return null;
}
Page page = null;
if (valuesAggregator != null) {
try {
return valuesAggregator.getOutput();
page = valuesAggregator.getOutput();
} finally {
final ValuesAggregator aggregator = this.valuesAggregator;
this.valuesAggregator = null;
Releasables.close(aggregator);
}
}
if (ordinalAggregators.isEmpty() == false) {
} else if (ordinalAggregators.isEmpty() == false) {
try {
return mergeOrdinalsSegmentResults();
page = mergeOrdinalsSegmentResults();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear);
}
}
return null;
if (page != null) {
rowsEmitted += page.getPositionCount();
}
return page;
}

@Override
Expand Down Expand Up @@ -322,6 +354,11 @@ public void close() {
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator);
}

@Override
public Operator.Status status() {
return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

private static void checkState(boolean condition, String msg) {
if (condition == false) {
throw new IllegalArgumentException(msg);
Expand All @@ -337,6 +374,133 @@ public String toString() {
return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]";
}

public static class Status implements Operator.Status {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"ordinals_grouping",
Status::new
);

/**
* Nanoseconds this operator has spent processing the rows.
*/
private final long processNanos;
/**
* Count of pages this operator has processed.
*/
private final int pagesProcessed;
/**
* Count of rows this operator has received.
*/
private final long rowsReceived;
/**
* Count of rows this operator has emitted.
*/
private final long rowsEmitted;

/**
* Build.
* @param processNanos Nanoseconds this operator has spent processing the rows.
* @param pagesProcessed Count of pages this operator has processed.
* @param rowsReceived Count of rows this operator has received.
* @param rowsEmitted Count of rows this operator has emitted.
*/
public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
this.processNanos = processNanos;
this.pagesProcessed = pagesProcessed;
this.rowsReceived = rowsReceived;
this.rowsEmitted = rowsEmitted;
}

protected Status(StreamInput in) throws IOException {
processNanos = in.readVLong();
pagesProcessed = in.readVInt();
rowsReceived = in.readVLong();
rowsEmitted = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(processNanos);
out.writeVInt(pagesProcessed);
out.writeVLong(rowsReceived);
out.writeVLong(rowsEmitted);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

/**
* Nanoseconds this operator has spent processing the rows.
*/
public long processNanos() {
return processNanos;
}

/**
* Count of pages this operator has processed.
*/
public int pagesProcessed() {
return pagesProcessed;
}

/**
* Count of rows this operator has received.
*/
public long rowsReceived() {
return rowsReceived;
}

/**
* Count of rows this operator has emitted.
*/
public long rowsEmitted() {
return rowsEmitted;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("process_nanos", processNanos);
if (builder.humanReadable()) {
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
}
builder.field("pages_processed", pagesProcessed);
builder.field("rows_received", rowsReceived);
builder.field("rows_emitted", rowsEmitted);
return builder.endObject();

}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processNanos == status.processNanos
&& pagesProcessed == status.pagesProcessed
&& rowsReceived == status.rowsReceived
&& rowsEmitted == status.rowsEmitted;
}

@Override
public int hashCode() {
return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS;
}
}

record SegmentID(int shardIndex, int segmentIndex) {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase<OrdinalsGroupingOperator.Status> {
public static OrdinalsGroupingOperator.Status simple() {
return new OrdinalsGroupingOperator.Status(200012, 123, 111, 222);
}

public static String simpleToJson() {
return """
{
"process_nanos" : 200012,
"process_time" : "200micros",
"pages_processed" : 123,
"rows_received" : 111,
"rows_emitted" : 222
}""";
}

public void testToXContent() {
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
}

@Override
protected Writeable.Reader<OrdinalsGroupingOperator.Status> instanceReader() {
return OrdinalsGroupingOperator.Status::new;
}

@Override
public OrdinalsGroupingOperator.Status createTestInstance() {
return new OrdinalsGroupingOperator.Status(
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}

@Override
protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperator.Status instance) {
long processNanos = instance.processNanos();
int pagesProcessed = instance.pagesProcessed();
long rowsReceived = instance.rowsReceived();
long rowsEmitted = instance.rowsEmitted();
switch (between(0, 3)) {
case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
case 2 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
case 3 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
default -> throw new UnsupportedOperationException();
}
return new OrdinalsGroupingOperator.Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}
}