Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9171000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
transform_check_for_dangling_tasks,9170000
esql_fuse_linear_operator_status,9171000
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@
package org.elasticsearch.compute.operator.fuse;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.DoubleVectorBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
Expand Down Expand Up @@ -60,6 +68,12 @@ public String describe() {
private final Deque<Page> outputPages;
private boolean finished;

private long emitNanos;
private int pagesReceived = 0;
private int pagesProcessed = 0;
private long rowsReceived = 0;
private long rowsEmitted = 0;

public LinearScoreEvalOperator(int discriminatorPosition, int scorePosition, LinearConfig config) {
this.scorePosition = scorePosition;
this.discriminatorPosition = discriminatorPosition;
Expand All @@ -79,6 +93,8 @@ public boolean needsInput() {
@Override
public void addInput(Page page) {
inputPages.add(page);
pagesReceived++;
rowsReceived += page.getPositionCount();
}

@Override
Expand All @@ -90,35 +106,58 @@ public void finish() {
}

private void createOutputPages() {
final var emitStart = System.nanoTime();
normalizer.preprocess(inputPages, scorePosition, discriminatorPosition);
try {
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
processInputPage(inputPage);
inputPages.removeFirst();
pagesProcessed += 1;
}
} finally {
emitNanos = System.nanoTime() - emitStart;
Releasables.close(inputPages);
}
}

while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
private void processInputPage(Page inputPage) {
BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition);
DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition);

BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition);
DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition);
Page newPage = null;
Block scoreBlock = null;
DoubleVector.Builder scores = null;

DoubleVector.Builder scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount());
try {
scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount());

for (int i = 0; i < inputPage.getPositionCount(); i++) {
String discriminator = discriminatorBlock.getBytesRef(i, new BytesRef()).utf8ToString();

var weight = config.weights().get(discriminator) == null ? 1.0 : config.weights().get(discriminator);

Double score = initialScoreBlock.getDouble(i);
double score = initialScoreBlock.getDouble(i);
scores.appendDouble(weight * normalizer.normalize(score, discriminator));
}
Block scoreBlock = scores.build().asBlock();
inputPage = inputPage.appendBlock(scoreBlock);

int[] projections = new int[inputPage.getBlockCount() - 1];
scoreBlock = scores.build().asBlock();
newPage = inputPage.appendBlock(scoreBlock);

int[] projections = new int[newPage.getBlockCount() - 1];

for (int i = 0; i < inputPage.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? inputPage.getBlockCount() - 1 : i;
for (int i = 0; i < newPage.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i;
}

outputPages.add(newPage.projectBlocks(projections));
} finally {
if (newPage != null) {
newPage.releaseBlocks();
}
if (scoreBlock == null && scores != null) {
Releasables.close(scores);
}
inputPages.removeFirst();
outputPages.add(inputPage.projectBlocks(projections));
inputPage.releaseBlocks();
}
}

Expand All @@ -132,7 +171,11 @@ public Page getOutput() {
if (finished == false || outputPages.isEmpty()) {
return null;
}
return outputPages.removeFirst();

Page page = outputPages.removeFirst();
rowsEmitted += page.getPositionCount();

return page;
}

@Override
Expand All @@ -156,6 +199,69 @@ public String toString() {
+ "]";
}

@Override
public Operator.Status status() {
return new Status(emitNanos, pagesReceived, pagesProcessed, rowsReceived, rowsEmitted);
}

public record Status(long emitNanos, int pagesReceived, int pagesProcessed, long rowsReceived, long rowsEmitted)
implements
Operator.Status {

public static final TransportVersion ESQL_FUSE_LINEAR_OPERATOR_STATUS = TransportVersion.fromName(
"esql_fuse_linear_operator_status"
);

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"linearScoreEval",
Status::new
);

Status(StreamInput streamInput) throws IOException {
this(streamInput.readLong(), streamInput.readInt(), streamInput.readInt(), streamInput.readLong(), streamInput.readLong());
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public boolean supportsVersion(TransportVersion version) {
return version.supports(ESQL_FUSE_LINEAR_OPERATOR_STATUS);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
assert false : "must not be called when overriding supportsVersion";
throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(emitNanos);
out.writeInt(pagesReceived);
out.writeInt(pagesProcessed);
out.writeLong(rowsReceived);
out.writeLong(rowsEmitted);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("emit_nanos", emitNanos);
if (builder.humanReadable()) {
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos));
}
builder.field("pages_received", pagesReceived);
builder.field("pages_processed", pagesProcessed);
builder.field("rows_received", rowsReceived);
builder.field("rows_emitted", rowsEmitted);
return builder.endObject();
}
}

private Normalizer createNormalizer(LinearConfig.Normalizer normalizer) {
return switch (normalizer) {
case NONE -> new NoneNormalizer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.Releasables;

import java.util.HashMap;

Expand Down Expand Up @@ -73,18 +74,30 @@ protected Page process(Page page) {
scores.appendDouble(1.0 / (config.rankConstant() + rank) * weight);
}

Block scoreBlock = scores.build().asBlock();
page = page.appendBlock(scoreBlock);
Page newPage = null;
Block scoreBlock = null;

int[] projections = new int[page.getBlockCount() - 1];

for (int i = 0; i < page.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
}
try {
return page.projectBlocks(projections);
scoreBlock = scores.build().asBlock();
newPage = page.appendBlock(scoreBlock);

int[] projections = new int[newPage.getBlockCount() - 1];

for (int i = 0; i < newPage.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i;
}
return newPage.projectBlocks(projections);
} finally {
page.releaseBlocks();
if (newPage != null) {
newPage.releaseBlocks();
} else {
// we never got to a point where the new page was constructed, so we need to release the initial one
page.releaseBlocks();
}
if (scoreBlock == null) {
// we never built scoreBlock, so we need to release the scores builder
Releasables.close(scores);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.fuse;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleVectorBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.test.AbstractBlockSourceOperator;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.core.Releasables;
import org.junit.Before;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class FuseOperatorTestCase extends OperatorTestCase {
protected int blocksCount;
protected int discriminatorPosition;
protected int scorePosition;
protected int discriminatorCount;

@Before
public void initialize() {
discriminatorPosition = randomIntBetween(1, 20);
scorePosition = randomIntBetween(discriminatorPosition + 1, 50);
blocksCount = randomIntBetween(scorePosition + 1, 100);
discriminatorCount = randomIntBetween(1, 20);
}

protected void assertOutput(List<Page> input, List<Page> results, TriConsumer<String, Double, Double> assertScore) {
assertEquals(input.size(), results.size());

for (int i = 0; i < results.size(); i++) {
Page resultPage = results.get(i);
Page initialPage = input.get(i);

assertEquals(initialPage.getPositionCount(), resultPage.getPositionCount());
assertEquals(resultPage.getBlockCount(), blocksCount);

BytesRefBlock discriminatorBlock = resultPage.getBlock(discriminatorPosition);
DoubleVectorBlock actualScoreBlock = resultPage.getBlock(scorePosition);
DoubleVectorBlock initialScoreBlock = initialPage.getBlock(scorePosition);

for (int j = 0; j < resultPage.getPositionCount(); j++) {
String discriminator = discriminatorBlock.getBytesRef(j, new BytesRef()).utf8ToString();
double actualScore = actualScoreBlock.getDouble(j);
double initialScore = initialScoreBlock.getDouble(j);
assertScore.apply(discriminator, actualScore, initialScore);
}
}
}

@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new AbstractBlockSourceOperator(blockFactory, 8 * 1024) {
@Override
protected int remaining() {
return size - currentPosition;
}

@Override
protected Page createPage(int positionOffset, int length) {
length = Integer.min(length, remaining());
Block[] blocks = new Block[blocksCount];

try {
for (int b = 0; b < blocksCount; b++) {
if (b == scorePosition) {
try (var builder = blockFactory.newDoubleBlockBuilder(length)) {
for (int i = 0; i < length; i++) {
builder.appendDouble(randomDouble());
}
blocks[b] = builder.build();
}
} else {
try (var builder = blockFactory.newBytesRefBlockBuilder(length)) {
for (int i = 0; i < length; i++) {
String stringInput = b == discriminatorPosition
? "fork" + randomIntBetween(0, discriminatorCount)
: randomAlphaOfLength(10);

builder.appendBytesRef(new BytesRef(stringInput));
}
blocks[b] = builder.build();
}
}
}
} catch (Exception e) {
Releasables.closeExpectNoException(blocks);
throw e;
}

currentPosition += length;
return new Page(blocks);
}
};
}

protected Map<String, Double> randomWeights() {
Map<String, Double> weights = new HashMap<>();
for (int i = 0; i < discriminatorCount; i++) {
if (randomBoolean()) {
weights.put("fork" + i, randomDouble());
}
}
return weights;
}
}
Loading