Skip to content

Commit 1855dc3

Browse files
committed
Collect more
1 parent 4673be7 commit 1855dc3

File tree

8 files changed

+100
-34
lines changed

8 files changed

+100
-34
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/CollectOperator.java

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ExceptionsHelper;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.bulk.BulkItemResponse;
1314
import org.elasticsearch.action.bulk.BulkRequest;
1415
import org.elasticsearch.action.bulk.BulkResponse;
1516
import org.elasticsearch.action.index.IndexRequest;
@@ -70,10 +71,16 @@ public String describe() {
7071
private volatile IsBlockedResult blocked = NOT_BLOCKED;
7172

7273
private int pagesReceived;
73-
private int pagesEmitted;
74+
private int pagesSaved;
7475
private long rowsReceived;
75-
private long rowsEmitted;
76+
private long rowsSaved;
77+
private long rowsCreated;
78+
private long rowsUpdated;
79+
private long rowsNoop;
7680
private long bulkBytesSent;
81+
private long bulkBytesSaved;
82+
private long bulkTookMillis;
83+
private long bulkIngestTookMillis;
7784

7885
public CollectOperator(
7986
Client client,
@@ -103,8 +110,9 @@ public void addInput(Page page) {
103110
rowsReceived += page.getPositionCount();
104111
try {
105112
BulkRequest request = request(page);
106-
bulkBytesSent += request.estimatedSizeInBytes();
107-
Listener listener = new Listener(page.getPositionCount());
113+
long bulkSize = request.estimatedSizeInBytes();
114+
bulkBytesSent += bulkSize;
115+
Listener listener = new Listener(page.getPositionCount(), bulkSize);
108116
blocked = new IsBlockedResult(listener.blockedFuture, "indexing");
109117
client.bulk(request, listener);
110118
} catch (IOException e) {
@@ -182,15 +190,33 @@ public Page getOutput() {
182190
if (phase != Phase.READY_TO_OUTPUT) {
183191
return null;
184192
}
185-
Block rowCount = null;
193+
Block rowsSaved = null;
194+
Block rowsCreated = null;
195+
Block rowsUpdated = null;
196+
Block rowsNoop = null;
197+
Block bytesSaved = null;
198+
Block bulkTookMillis = null;
199+
Block bulkIngestTookMillis = null;
186200
try {
187-
rowCount = driverContext.blockFactory().newConstantLongBlockWith(rowsEmitted, 1);
188-
Page result = new Page(rowCount);
189-
rowCount = null;
201+
rowsSaved = driverContext.blockFactory().newConstantLongBlockWith(this.rowsSaved, 1);
202+
rowsCreated = driverContext.blockFactory().newConstantLongBlockWith(this.rowsCreated, 1);
203+
rowsUpdated = driverContext.blockFactory().newConstantLongBlockWith(this.rowsUpdated, 1);
204+
rowsNoop = driverContext.blockFactory().newConstantLongBlockWith(this.rowsNoop, 1);
205+
bytesSaved = driverContext.blockFactory().newConstantLongBlockWith(this.bulkBytesSaved, 1);
206+
bulkTookMillis = driverContext.blockFactory().newConstantLongBlockWith(this.bulkTookMillis, 1);
207+
bulkIngestTookMillis = driverContext.blockFactory().newConstantLongBlockWith(this.bulkIngestTookMillis, 1);
208+
Page result = new Page(rowsSaved, rowsCreated, rowsUpdated, rowsNoop, bytesSaved, bulkTookMillis, bulkIngestTookMillis);
209+
rowsSaved = null;
210+
rowsCreated = null;
211+
rowsUpdated = null;
212+
rowsNoop = null;
213+
bytesSaved = null;
214+
bulkTookMillis = null;
215+
bulkIngestTookMillis = null;
190216
phase = Phase.FINISHED;
191217
return result;
192218
} finally {
193-
Releasables.close(rowCount);
219+
Releasables.close(rowsSaved, rowsCreated, rowsUpdated, rowsNoop, bytesSaved, bulkTookMillis, bulkIngestTookMillis);
194220
}
195221
}
196222

@@ -214,18 +240,36 @@ private enum Phase {
214240
private class Listener implements ActionListener<BulkResponse> {
215241
private final SubscribableListener<Void> blockedFuture = new SubscribableListener<>();
216242
private final int positionCount;
243+
private final long bulkSize;
217244

218-
Listener(int positionCount) {
245+
Listener(int positionCount, long bulkSize) {
219246
driverContext.addAsyncAction();
220247
this.positionCount = positionCount;
248+
this.bulkSize = bulkSize;
221249
}
222250

223251
@Override
224252
public void onResponse(BulkResponse bulkItemResponses) {
225-
pagesEmitted++;
226-
rowsEmitted += positionCount;
227253
if (bulkItemResponses.hasFailures()) {
228-
failureCollector.unwrapAndCollect(new ElasticsearchException(bulkItemResponses.buildFailureMessage()));
254+
onFailure(new ElasticsearchException(bulkItemResponses.buildFailureMessage()));
255+
return;
256+
}
257+
pagesSaved++;
258+
rowsSaved += positionCount;
259+
for (BulkItemResponse i : bulkItemResponses.getItems()) {
260+
switch (i.getResponse().getResult()) {
261+
case CREATED -> rowsCreated++;
262+
case UPDATED -> rowsUpdated++;
263+
case NOOP -> rowsNoop++;
264+
case DELETED, NOT_FOUND -> {
265+
assert false : "delete and not_found not supported but was [" + i.getResponse().getResult() + "]";
266+
}
267+
}
268+
}
269+
bulkBytesSaved += bulkSize;
270+
bulkTookMillis += bulkItemResponses.getTookInMillis();
271+
if (bulkItemResponses.getIngestTookInMillis() != BulkResponse.NO_INGEST_TOOK) {
272+
bulkIngestTookMillis += bulkItemResponses.getIngestTookInMillis();
229273
}
230274
unblock();
231275
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,9 @@ public PlanFactory visitCollectCommand(EsqlBaseParser.CollectCommandContext ctx)
388388
throw new ParsingException(source(ctx), "COLLECT command currently requires a snapshot build");
389389
}
390390
Source src = source(ctx);
391-
ReferenceAttribute rowsEmittedAttribute = new ReferenceAttribute(src, "rows_emitted", DataType.LONG);
392391
Literal index = Literal.keyword(source(ctx.COLLECT_INDEX()), ctx.COLLECT_INDEX().getText());
393392
List<NamedExpression> idFields = ctx.COLLECT_ID() != null ? visitQualifiedNamePatterns(ctx.qualifiedNamePatterns()) : List.of();
394-
return input -> new Collect(src, input, rowsEmittedAttribute, index, idFields);
393+
return input -> new Collect(src, input, index, idFields);
395394
}
396395

397396
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Collect.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1717
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1818
import org.elasticsearch.xpack.esql.core.tree.Source;
19+
import org.elasticsearch.xpack.esql.core.type.DataType;
1920

2021
import java.io.IOException;
2122
import java.util.List;
@@ -27,19 +28,37 @@
2728
* {@code Filter} has a "condition" Expression that does the filtering.
2829
*/
2930
public class Collect extends UnaryPlan implements TelemetryAware, SortAgnostic {
30-
private final ReferenceAttribute rowsEmittedAttribute;
31+
private final List<Attribute> outputAttributes;
3132
private final Literal index;
3233
private final List<NamedExpression> idFields;
3334

34-
public Collect(
35+
public Collect(Source source, LogicalPlan child, Literal index, List<NamedExpression> idFields) {
36+
this(
37+
source,
38+
child,
39+
List.of(
40+
new ReferenceAttribute(source, "rows_saved", DataType.LONG),
41+
new ReferenceAttribute(source, "rows_created", DataType.LONG),
42+
new ReferenceAttribute(source, "rows_updated", DataType.LONG),
43+
new ReferenceAttribute(source, "rows_noop", DataType.LONG),
44+
new ReferenceAttribute(source, "bytes_saved", DataType.LONG),
45+
new ReferenceAttribute(source, "bulk_took_millis", DataType.LONG),
46+
new ReferenceAttribute(source, "bulk_ingest_took_millis", DataType.LONG)
47+
),
48+
index,
49+
idFields
50+
);
51+
}
52+
53+
private Collect(
3554
Source source,
3655
LogicalPlan child,
37-
ReferenceAttribute rowsEmittedAttribute,
56+
List<Attribute> outputAttributes,
3857
Literal index,
3958
List<NamedExpression> idFields
4059
) {
4160
super(source, child);
42-
this.rowsEmittedAttribute = rowsEmittedAttribute;
61+
this.outputAttributes = outputAttributes;
4362
this.index = index;
4463
this.idFields = idFields;
4564
}
@@ -56,16 +75,12 @@ public String getWriteableName() {
5675

5776
@Override
5877
protected NodeInfo<Collect> info() {
59-
return NodeInfo.create(this, Collect::new, child(), rowsEmittedAttribute, index, idFields);
78+
return NodeInfo.create(this, Collect::new, child(), outputAttributes, index, idFields);
6079
}
6180

6281
@Override
6382
public Collect replaceChild(LogicalPlan newChild) {
64-
return new Collect(source(), newChild, rowsEmittedAttribute, index, idFields);
65-
}
66-
67-
public ReferenceAttribute rowsEmittedAttribute() {
68-
return rowsEmittedAttribute;
83+
return new Collect(source(), newChild, outputAttributes, index, idFields);
6984
}
7085

7186
public Literal index() {
@@ -78,12 +93,13 @@ public List<NamedExpression> idFields() {
7893

7994
@Override
8095
protected AttributeSet computeReferences() {
96+
// NOCOMMIT this is busted for `FROM foo | COLLECT`
8197
return child().outputSet();
8298
}
8399

84100
@Override
85101
public List<Attribute> output() {
86-
return List.of(rowsEmittedAttribute);
102+
return outputAttributes;
87103
}
88104

89105
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/CollectExec.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1212
import org.elasticsearch.xpack.esql.core.expression.Literal;
1313
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
14-
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1514
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1615
import org.elasticsearch.xpack.esql.core.tree.Source;
1716

@@ -20,19 +19,19 @@
2019
import java.util.Objects;
2120

2221
public class CollectExec extends UnaryExec {
23-
private final ReferenceAttribute rowsEmittedAttribute;
22+
private final List<Attribute> outputAttributes;
2423
private final Literal index;
2524
private final List<NamedExpression> idFields;
2625

2726
public CollectExec(
2827
Source source,
2928
PhysicalPlan child,
30-
ReferenceAttribute rowsEmittedAttribute,
29+
List<Attribute> outputAttributes,
3130
Literal index,
3231
List<NamedExpression> idFields
3332
) {
3433
super(source, child);
35-
this.rowsEmittedAttribute = rowsEmittedAttribute;
34+
this.outputAttributes = outputAttributes;
3635
this.index = index;
3736
this.idFields = idFields;
3837
}
@@ -49,12 +48,12 @@ public String getWriteableName() {
4948

5049
@Override
5150
protected NodeInfo<CollectExec> info() {
52-
return NodeInfo.create(this, CollectExec::new, child(), rowsEmittedAttribute, index, idFields);
51+
return NodeInfo.create(this, CollectExec::new, child(), outputAttributes, index, idFields);
5352
}
5453

5554
@Override
5655
public CollectExec replaceChild(PhysicalPlan newChild) {
57-
return new CollectExec(source(), newChild, rowsEmittedAttribute, index, idFields);
56+
return new CollectExec(source(), newChild, outputAttributes, index, idFields);
5857
}
5958

6059
public Literal index() {
@@ -72,7 +71,7 @@ protected AttributeSet computeReferences() {
7271

7372
@Override
7473
public List<Attribute> output() {
75-
return List.of(rowsEmittedAttribute);
74+
return outputAttributes;
7675
}
7776

7877
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
148148
}
149149

150150
if (p instanceof Collect collect) {
151-
return new CollectExec(collect.source(), child, collect.rowsEmittedAttribute(), collect.index(), collect.idFields());
151+
return new CollectExec(collect.source(), child, collect.output(), collect.index(), collect.idFields());
152152
}
153153

154154
return unsupported(p);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7899,6 +7899,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
78997899
FoldContext.small(),
79007900
List.of(),
79017901
null,
7902+
null,
79027903
new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1))
79037904
),
79047905
List.of()

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ private EsPhysicalOperationProviders esPhysicalOperationProviders(List<EsPhysica
345345
FoldContext.small(),
346346
shardContexts,
347347
null,
348+
null,
348349
new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1))
349350
);
350351
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes;
5555
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
5656
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
57+
import org.elasticsearch.xpack.esql.plan.physical.CollectExec;
5758
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
5859
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
5960
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
@@ -361,6 +362,11 @@ private static void consumeIndexDoc(Consumer<DocBlock> indexDocConsumer, DocVect
361362
}
362363
}
363364

365+
@Override
366+
public PhysicalOperation collect(CollectExec collect, PhysicalOperation source, LocalExecutionPlannerContext context) {
367+
throw new UnsupportedOperationException();
368+
}
369+
364370
private class TestHashAggregationOperator extends HashAggregationOperator {
365371

366372
private final Attribute attribute;

0 commit comments

Comments
 (0)