Skip to content

Commit df3c60e

Browse files
committed
WIP
1 parent dc3ee44 commit df3c60e

30 files changed

+3469
-2768
lines changed
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.bulk.BulkRequest;
14+
import org.elasticsearch.action.bulk.BulkResponse;
15+
import org.elasticsearch.action.index.IndexRequest;
16+
import org.elasticsearch.action.support.SubscribableListener;
17+
import org.elasticsearch.client.internal.Client;
18+
import org.elasticsearch.compute.data.Block;
19+
import org.elasticsearch.compute.data.Page;
20+
import org.elasticsearch.core.Releasables;
21+
import org.elasticsearch.xcontent.XContentBuilder;
22+
import org.elasticsearch.xcontent.json.JsonXContent;
23+
24+
import java.io.IOException;
25+
import java.io.UncheckedIOException;
26+
import java.util.List;
27+
import java.util.function.Function;
28+
29+
public class CollectOperator implements Operator {
30+
public interface Writer {
31+
XContentBuilder write(XContentBuilder builder, int valueIndex) throws IOException;
32+
}
33+
34+
public record Factory(Client client, String index, List<Function<Block, Writer>> writers) implements OperatorFactory {
35+
@Override
36+
public CollectOperator get(DriverContext driverContext) {
37+
return new CollectOperator(client, driverContext, index, writers);
38+
}
39+
40+
@Override
41+
public String describe() {
42+
return "CollectOperator";
43+
}
44+
}
45+
46+
private final FailureCollector failureCollector = new FailureCollector();
47+
48+
private final Client client;
49+
private final DriverContext driverContext;
50+
private final String index;
51+
private final List<Function<Block, Writer>> writers;
52+
53+
private volatile Phase phase = Phase.COLLECTING;
54+
private volatile IsBlockedResult blocked = NOT_BLOCKED;
55+
56+
private int pagesReceived;
57+
private int pagesEmitted;
58+
private long rowsReceived;
59+
private long rowsEmitted;
60+
private long bulkBytesSent;
61+
62+
public CollectOperator(Client client, DriverContext driverContext, String index, List<Function<Block, Writer>> writers) {
63+
this.client = client;
64+
this.driverContext = driverContext;
65+
this.index = index;
66+
this.writers = writers;
67+
}
68+
69+
@Override
70+
public boolean needsInput() {
71+
return failureCollector.hasFailure() == false && phase == Phase.COLLECTING && blocked.listener().isDone();
72+
}
73+
74+
@Override
75+
public void addInput(Page page) {
76+
assert needsInput();
77+
checkFailure();
78+
pagesReceived++;
79+
rowsReceived += page.getPositionCount();
80+
81+
try {
82+
BulkRequest request = request(page);
83+
bulkBytesSent += request.estimatedSizeInBytes();
84+
Listener listener = new Listener(page.getPositionCount());
85+
blocked = new IsBlockedResult(listener.blockedFuture, "indexing");
86+
client.bulk(request, listener);
87+
} catch (IOException e) {
88+
throw new UncheckedIOException(e);
89+
}
90+
}
91+
92+
private BulkRequest request(Page page) throws IOException {
93+
XContentBuilder[] source = new XContentBuilder[page.getPositionCount()];
94+
for (int p = 0; p < page.getPositionCount(); p++) {
95+
source[p] = JsonXContent.contentBuilder().startObject();
96+
}
97+
for (int b = 0; b < page.getBlockCount(); b++) {
98+
Block block = page.getBlock(b);
99+
Writer writer = writers.get(b).apply(block);
100+
for (int p = 0; p < block.getPositionCount(); p++) {
101+
writer.write(source[p], p);
102+
}
103+
}
104+
BulkRequest request = new BulkRequest();
105+
for (int p = 0; p < page.getPositionCount(); p++) {
106+
request.add(new IndexRequest(index).source(source[p].endObject()));
107+
}
108+
return request;
109+
}
110+
111+
@Override
112+
public void finish() {
113+
if (phase != Phase.COLLECTING) {
114+
return;
115+
}
116+
phase = Phase.WAITING_TO_FINISH;
117+
checkFailure();
118+
blocked.listener().addListener(new ActionListener<>() {
119+
@Override
120+
public void onResponse(Void unused) {
121+
phase = Phase.READY_TO_OUTPUT;
122+
}
123+
124+
@Override
125+
public void onFailure(Exception e) {
126+
failureCollector.unwrapAndCollect(e);
127+
}
128+
});
129+
}
130+
131+
@Override
132+
public boolean isFinished() {
133+
return phase == Phase.FINISHED;
134+
}
135+
136+
@Override
137+
public IsBlockedResult isBlocked() {
138+
return blocked;
139+
}
140+
141+
@Override
142+
public Page getOutput() {
143+
checkFailure();
144+
if (phase != Phase.READY_TO_OUTPUT) {
145+
return null;
146+
}
147+
Block rowCount = null;
148+
try {
149+
rowCount = driverContext.blockFactory().newConstantLongBlockWith(rowsEmitted, 1);
150+
Page result = new Page(rowCount);
151+
rowCount = null;
152+
phase = Phase.FINISHED;
153+
return result;
154+
} finally {
155+
Releasables.close(rowCount);
156+
}
157+
}
158+
159+
@Override
160+
public void close() {}
161+
162+
private void checkFailure() {
163+
Exception e = failureCollector.getFailure();
164+
if (e != null) {
165+
throw ExceptionsHelper.convertToRuntime(e);
166+
}
167+
}
168+
169+
private enum Phase {
170+
COLLECTING,
171+
WAITING_TO_FINISH,
172+
READY_TO_OUTPUT,
173+
FINISHED;
174+
}
175+
176+
private class Listener implements ActionListener<BulkResponse> {
177+
private final SubscribableListener<Void> blockedFuture = new SubscribableListener<>();
178+
private final int positionCount;
179+
180+
Listener(int positionCount) {
181+
driverContext.addAsyncAction();
182+
this.positionCount = positionCount;
183+
}
184+
185+
@Override
186+
public void onResponse(BulkResponse bulkItemResponses) {
187+
pagesEmitted++;
188+
rowsEmitted += positionCount;
189+
if (bulkItemResponses.hasFailures()) {
190+
failureCollector.unwrapAndCollect(new ElasticsearchException(bulkItemResponses.buildFailureMessage()));
191+
}
192+
unblock();
193+
}
194+
195+
@Override
196+
public void onFailure(Exception e) {
197+
failureCollector.unwrapAndCollect(e);
198+
unblock();
199+
}
200+
201+
private void unblock() {
202+
driverContext.removeAsyncAction();
203+
blockedFuture.onResponse(null);
204+
}
205+
}
206+
}

x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ options {
5555
* all other commands.
5656
*/
5757
import ChangePoint,
58+
Collect,
5859
Enrich,
5960
Explain,
6061
Expression,

0 commit comments

Comments
 (0)