Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.compute.operator.HashLookupOperator;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;

// public class DedupOperator extends HashLookupOperator {
// public DedupOperator(BlockFactory blockFactory, HashLookupOperator.Key[] keys, int[] blockMapping) {
// super(blockFactory, keys, blockMapping);
// }
// }

public class DedupOperator implements Operator {
private final int[] fields;

private Page lastInput;

private boolean finished;

private HashSet<Integer> seenKeys = new HashSet<Integer>();

/**
* Count of pages that have been processed by this operator.
*/
private int pagesIn;
private int pagesOut;

public DedupOperator(List<Integer> fields) {
this.fields = fields.stream().mapToInt(Integer::intValue).toArray();;
}

public record DedupOperatorFactory(List<Integer> fields) implements OperatorFactory {

@Override
public DedupOperator get(DriverContext driverContext) {
System.out.println(describe());
return new DedupOperator(fields);
}

@Override
public String describe() {
if (fields.size() < 10) {
return "DedupOperator[fields = " + fields + "]";
}
return "DedupOperator[fields = [size: " + fields.size() + "]]";
}
}

@Override
public boolean needsInput() {
return finished == false && lastInput == null;
}

@Override
public void addInput(Page page) {
assert lastInput == null : "has pending input page";
lastInput = page;
// this.expandingBlock = prev.getBlock(channel);
pagesIn++;
}

@Override
public void finish() {
finished = true;
}

@Override
public boolean isFinished() {
return finished && lastInput == null;
}

@Override
public Page getOutput() {
if (lastInput == null) {
return null;
}

pagesOut++;
Page result;

// positions -> rows
// valueCount -> number of values in given field
//

// Go over all rows and check if the row is a duplicate
for (int p = 0; p < lastInput.getPositionCount(); p++) {


// if (test.isNull(p) || test.getValueCount(p) != 1) {
// // Null is like false
// // And, for now, multivalued results are like false too
// continue;
// }
// if (test.getBoolean(test.getFirstValueIndex(p))) {
// positions[rowCount++] = p;
// }
}

// if (rowCount == 0) {
// page.releaseBlocks();
// return null;
// }
// if (rowCount == page.getPositionCount()) {
// return page;
// }
// positions = Arrays.copyOf(positions, rowCount);

// Block[] filteredBlocks = new Block[page.getBlockCount()];
// boolean success = false;
// try {
// for (int i = 0; i < page.getBlockCount(); i++) {
// filteredBlocks[i] = page.getBlock(i).filter(positions);
// }
// success = true;
// } finally {
// page.releaseBlocks();
// if (success == false) {
// Releasables.closeExpectNoException(filteredBlocks);
// }
// }
// return new Page(filteredBlocks);

// lastInput.getPositionCount() - number of rows
// int[] filter = new int[limitRemaining];
// for (int i = 0; i < limitRemaining; i++) {
// filter[i] = i;
// }

Block[] blocks = new Block[lastInput.getBlockCount()];
boolean success = false;
try {
for (int b = 0; b < blocks.length; b++) {
System.out.println(lastInput.getBlock(b).getFirstValueIndex(fields[0]));
System.out.println(lastInput.getBlock(b).getClass());
blocks[b] = lastInput.getBlock(b).filter(fields);
}
success = true;
} finally {
if (success == false) {
Releasables.closeExpectNoException(lastInput::releaseBlocks, Releasables.wrap(blocks));
} else {
lastInput.releaseBlocks();
}
lastInput = null;
}
result = new Page(blocks);
lastInput = null;

return result;
}

@Override
public Status status() {
return new Status(pagesIn);
}

@Override
public void close() {
if (lastInput != null) {
lastInput.releaseBlocks();
}
}

@Override
public String toString() {
if (fields.length < 10) {
return "DedupOperator[fields = " + Arrays.toString(fields) + "]";
}
return "DedupOperator[fields = [size: " + fields.length + "]]";
}

public static class Status implements Operator.Status {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"dedup",
Status::new
);

/**
* Count of pages that have been processed by this operator.
*/
private final int pagesProcessed;

protected Status(int pagesProcessed) {
this.pagesProcessed = pagesProcessed;
}

protected Status(StreamInput in) throws IOException {
pagesProcessed = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(pagesProcessed);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

/**
* Count of pages that have been processed by this operator.
*/
public int pagesProcessed() {
return pagesProcessed;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pages_processed", pagesProcessed);
return builder.endObject();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return pagesProcessed == status.pagesProcessed;
}

@Override
public int hashCode() {
return Objects.hash(pagesProcessed);
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.V_8_11_X;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public record TopNOperatorFactory(
List<ElementType> elementTypes,
List<TopNEncoder> encoders,
List<SortOrder> sortOrders,
int maxPageSize
int maxPageSize,
boolean dedup
) implements OperatorFactory {
public TopNOperatorFactory {
for (ElementType e : elementTypes) {
Expand All @@ -244,7 +245,8 @@ public TopNOperator get(DriverContext driverContext) {
elementTypes,
encoders,
sortOrders,
maxPageSize
maxPageSize,
dedup
);
}

Expand All @@ -267,6 +269,7 @@ public String describe() {
private final Queue inputQueue;

private final int maxPageSize;
private final boolean dedup;

private final List<ElementType> elementTypes;
private final List<TopNEncoder> encoders;
Expand All @@ -285,7 +288,8 @@ public TopNOperator(
List<ElementType> elementTypes,
List<TopNEncoder> encoders,
List<SortOrder> sortOrders,
int maxPageSize
int maxPageSize,
boolean dedup
) {
this.blockFactory = blockFactory;
this.breaker = breaker;
Expand All @@ -294,6 +298,19 @@ public TopNOperator(
this.encoders = encoders;
this.sortOrders = sortOrders;
this.inputQueue = new Queue(topCount);
this.dedup = dedup;
}

public TopNOperator(
BlockFactory blockFactory,
CircuitBreaker breaker,
int topCount,
List<ElementType> elementTypes,
List<TopNEncoder> encoders,
List<SortOrder> sortOrders,
int maxPageSize
) {
this(blockFactory, breaker, topCount, elementTypes, encoders, sortOrders, maxPageSize, false);
}

static int compareRows(Row r1, Row r2) {
Expand Down Expand Up @@ -394,7 +411,15 @@ private Iterator<Page> toPages() {
boolean success = false;
try {
while (inputQueue.size() > 0) {
list.add(inputQueue.pop());
Row row = inputQueue.pop();

if (this.dedup && list.size() > 0 && compareRows(list.get(list.size() - 1), row) == 0) {
// row = list.set(list.size()-1, row);
row.close();
continue;
}

list.add(row);
}
Collections.reverse(list);

Expand Down
29 changes: 29 additions & 0 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ MV_EXPAND : 'mv_expand' -> pushMode(MVEXPAND_MODE);
RENAME : 'rename' -> pushMode(RENAME_MODE);
ROW : 'row' -> pushMode(EXPRESSION_MODE);
SHOW : 'show' -> pushMode(SHOW_MODE);
DEDUP : 'dedup' -> pushMode(EXPRESSION_MODE);
SORT : 'sort' -> pushMode(EXPRESSION_MODE);
STATS : 'stats' -> pushMode(EXPRESSION_MODE);
WHERE : 'where' -> pushMode(EXPRESSION_MODE);
Expand Down Expand Up @@ -367,6 +368,34 @@ MVEXPAND_WS
: WS -> channel(HIDDEN)
;

mode DEDUP_MODE;
DEDUP_PIPE : PIPE -> type(PIPE), popMode;
DEDUP_DOT: DOT -> type(DOT);

DEDUP_FIELD_ID_PATTERN
: ID_PATTERN -> type(ID_PATTERN)
;

DEDUP_QUOTED_IDENTIFIER
: QUOTED_IDENTIFIER -> type(QUOTED_IDENTIFIER)
;

DEDUP_UNQUOTED_IDENTIFIER
: UNQUOTED_IDENTIFIER -> type(UNQUOTED_IDENTIFIER)
;

DEDUP_LINE_COMMENT
: LINE_COMMENT -> channel(HIDDEN)
;

DEDUP_MULTILINE_COMMENT
: MULTILINE_COMMENT -> channel(HIDDEN)
;

DEDUP_WS
: WS -> channel(HIDDEN)
;

//
// SHOW commands
//
Expand Down
Loading