Skip to content

Commit 2bc5bb5

Browse files
committed
Move headers collection to AsyncOperator
1 parent e9b0456 commit 2bc5bb5

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
20+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2021
import org.elasticsearch.compute.data.Page;
2122
import org.elasticsearch.core.TimeValue;
2223
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -34,6 +35,11 @@
3435
* to reduce communication overhead and fetches a {@code Fetched} at a time.
3536
* It's the responsibility of subclasses to transform that {@code Fetched} into
3637
* output.
38+
* <p>
39+
* This operator will also take care of merging response headers from the thread context into the main thread,
40+
* which <b>must</b> be the that closes this.
41+
* </p>
42+
*
3743
* @see #performAsync(Page, ActionListener)
3844
*/
3945
public abstract class AsyncOperator<Fetched> implements Operator {
@@ -45,6 +51,7 @@ public abstract class AsyncOperator<Fetched> implements Operator {
4551
private final DriverContext driverContext;
4652

4753
private final int maxOutstandingRequests;
54+
private final ResponseHeadersCollector responseHeadersCollector;
4855
private final LongAdder totalTimeInNanos = new LongAdder();
4956

5057
private boolean finished = false;
@@ -66,9 +73,10 @@ public abstract class AsyncOperator<Fetched> implements Operator {
6673
*
6774
* @param maxOutstandingRequests the maximum number of outstanding requests
6875
*/
69-
public AsyncOperator(DriverContext driverContext, int maxOutstandingRequests) {
76+
public AsyncOperator(DriverContext driverContext, ThreadContext threadContext, int maxOutstandingRequests) {
7077
this.driverContext = driverContext;
7178
this.maxOutstandingRequests = maxOutstandingRequests;
79+
this.responseHeadersCollector = new ResponseHeadersCollector(threadContext);
7280
}
7381

7482
@Override
@@ -97,6 +105,7 @@ public void addInput(Page input) {
97105
});
98106
final long startNanos = System.nanoTime();
99107
performAsync(input, ActionListener.runAfter(listener, () -> {
108+
responseHeadersCollector.collect();
100109
driverContext.removeAsyncAction();
101110
totalTimeInNanos.add(System.nanoTime() - startNanos);
102111
}));
@@ -172,6 +181,7 @@ public final void close() {
172181
finish();
173182
closed = true;
174183
discardResults();
184+
responseHeadersCollector.finish();
175185
doClose();
176186
}
177187

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.compute.operator.AsyncOperator;
1717
import org.elasticsearch.compute.operator.DriverContext;
1818
import org.elasticsearch.compute.operator.Operator;
19-
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
2019
import org.elasticsearch.core.CheckedFunction;
2120
import org.elasticsearch.tasks.CancellableTask;
2221
import org.elasticsearch.xcontent.XContentBuilder;
@@ -38,7 +37,6 @@ public final class EnrichLookupOperator extends AsyncOperator<Page> {
3837
private final String matchType;
3938
private final String matchField;
4039
private final List<NamedExpression> enrichFields;
41-
private final ResponseHeadersCollector responseHeadersCollector;
4240
private final Source source;
4341
private long totalTerms = 0L;
4442

@@ -101,7 +99,7 @@ public EnrichLookupOperator(
10199
List<NamedExpression> enrichFields,
102100
Source source
103101
) {
104-
super(driverContext, maxOutstandingRequests);
102+
super(driverContext, enrichLookupService.getThreadContext(), maxOutstandingRequests);
105103
this.sessionId = sessionId;
106104
this.parentTask = parentTask;
107105
this.inputChannel = inputChannel;
@@ -112,7 +110,6 @@ public EnrichLookupOperator(
112110
this.matchField = matchField;
113111
this.enrichFields = enrichFields;
114112
this.source = source;
115-
this.responseHeadersCollector = new ResponseHeadersCollector(enrichLookupService.getThreadContext());
116113
}
117114

118115
@Override
@@ -135,11 +132,7 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
135132
}
136133
return inputPage.appendPage(pages.getFirst());
137134
};
138-
enrichLookupService.lookupAsync(
139-
request,
140-
parentTask,
141-
ActionListener.runBefore(listener.map(handleResponse), responseHeadersCollector::collect)
142-
);
135+
enrichLookupService.lookupAsync(request, parentTask, listener.map(handleResponse));
143136
}
144137

145138
@Override
@@ -171,7 +164,6 @@ public String toString() {
171164
protected void doClose() {
172165
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
173166
// then cancel it when this operator terminates early (e.g., have enough result).
174-
responseHeadersCollector.finish();
175167
}
176168

177169
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.DriverContext;
1919
import org.elasticsearch.compute.operator.IsBlockedResult;
2020
import org.elasticsearch.compute.operator.Operator;
21-
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
2221
import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
2322
import org.elasticsearch.core.Releasable;
2423
import org.elasticsearch.core.Releasables;
@@ -94,7 +93,6 @@ public Operator get(DriverContext driverContext) {
9493
private final String lookupIndex;
9594
private final String matchField;
9695
private final List<NamedExpression> loadFields;
97-
private final ResponseHeadersCollector responseHeadersCollector;
9896
private final Source source;
9997
private long totalTerms = 0L;
10098
/**
@@ -120,7 +118,7 @@ public LookupFromIndexOperator(
120118
List<NamedExpression> loadFields,
121119
Source source
122120
) {
123-
super(driverContext, maxOutstandingRequests);
121+
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
124122
this.sessionId = sessionId;
125123
this.configuration = configuration;
126124
this.parentTask = parentTask;
@@ -131,7 +129,6 @@ public LookupFromIndexOperator(
131129
this.matchField = matchField;
132130
this.loadFields = loadFields;
133131
this.source = source;
134-
this.responseHeadersCollector = new ResponseHeadersCollector(lookupService.getThreadContext());
135132
}
136133

137134
@Override
@@ -151,10 +148,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
151148
lookupService.lookupAsync(
152149
request,
153150
parentTask,
154-
ActionListener.runBefore(
155-
listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator())),
156-
responseHeadersCollector::collect
157-
)
151+
listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator()))
158152
);
159153
}
160154

@@ -226,7 +220,6 @@ public IsBlockedResult isBlocked() {
226220
protected void doClose() {
227221
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
228222
// then cancel it when this operator terminates early (e.g., have enough result).
229-
responseHeadersCollector.finish();
230223
Releasables.close(ongoing);
231224
}
232225

0 commit comments

Comments
 (0)