Skip to content

Commit 08c8ec0

Browse files
committed
Use existing ResponseHeadersCollector instead of manually taking warnings
1 parent e6b1e5d commit 08c8ec0

File tree

4 files changed

+8
-64
lines changed

4 files changed

+8
-64
lines changed

server/src/main/java/org/elasticsearch/common/logging/HeaderWarning.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.nio.charset.Charset;
1717
import java.nio.charset.StandardCharsets;
1818
import java.util.BitSet;
19-
import java.util.HashSet;
2019
import java.util.Iterator;
2120
import java.util.Locale;
2221
import java.util.Objects;
@@ -409,17 +408,6 @@ public static void addWarning(String message, Object... params) {
409408
addWarning(THREAD_CONTEXT, message, params);
410409
}
411410

412-
/**
413-
* Adds a warning header without any formatting or prefix
414-
*/
415-
public static void addRawWarning(String warningHeader) {
416-
addRawWarning(THREAD_CONTEXT, warningHeader);
417-
}
418-
419-
public static Set<String> getWarnings() {
420-
return getWarnings(THREAD_CONTEXT);
421-
}
422-
423411
// package scope for testing
424412
static void addWarning(Set<ThreadContext> threadContexts, String message, Object... params) {
425413
final Iterator<ThreadContext> iterator = threadContexts.iterator();
@@ -444,19 +432,4 @@ static void addRawWarning(Set<ThreadContext> threadContexts, String warningHeade
444432
threadContext.addResponseHeader("Warning", warningHeader);
445433
}
446434
}
447-
448-
static Set<String> getWarnings(Set<ThreadContext> threadContexts) {
449-
final Iterator<ThreadContext> iterator = threadContexts.iterator();
450-
if (iterator.hasNext()) {
451-
final Set<String> warnings = new HashSet<>();
452-
while (iterator.hasNext()) {
453-
final ThreadContext next = iterator.next();
454-
final Set<String> contextWarnings = next.getResponseHeader("Warning");
455-
warnings.addAll(contextWarnings);
456-
457-
}
458-
return warnings;
459-
}
460-
return Set.of();
461-
}
462435
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -581,23 +581,6 @@ public Map<String, List<String>> getResponseHeaders() {
581581
return Collections.unmodifiableMap(map);
582582
}
583583

584-
/**
585-
* Get a copy of the specified <em>response</em> header.
586-
*
587-
* @param header The header to get.
588-
* @return Never {@code null}.
589-
*/
590-
public Set<String> getResponseHeader(String header) {
591-
Map<String, Set<String>> responseHeaders = threadLocal.get().responseHeaders;
592-
Set<String> values = responseHeaders.get(header);
593-
594-
if (values == null) {
595-
return Set.of();
596-
}
597-
598-
return Set.copyOf(values);
599-
}
600-
601584
/**
602585
* Copies all header key, value pairs into the current context
603586
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
19-
import org.elasticsearch.common.logging.HeaderWarning;
2019
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2120
import org.elasticsearch.compute.data.Page;
2221
import org.elasticsearch.core.TimeValue;
@@ -27,7 +26,6 @@
2726
import java.io.IOException;
2827
import java.util.Map;
2928
import java.util.Objects;
30-
import java.util.Set;
3129
import java.util.concurrent.atomic.LongAdder;
3230

3331
/**
@@ -43,7 +41,6 @@ public abstract class AsyncOperator<Fetched> implements Operator {
4341
private volatile SubscribableListener<Void> blockedFuture;
4442

4543
private final Map<Long, Fetched> buffers = ConcurrentCollections.newConcurrentMap();
46-
private final Set<String> storedWarningHeaders = ConcurrentCollections.newConcurrentSet();
4744
private final FailureCollector failureCollector = new FailureCollector();
4845
private final DriverContext driverContext;
4946

@@ -92,11 +89,9 @@ public void addInput(Page input) {
9289
try {
9390
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
9491
buffers.put(seqNo, output);
95-
storeWarnings();
9692
onSeqNoCompleted(seqNo);
9793
}, e -> {
9894
releasePageOnAnyThread(input);
99-
storeWarnings();
10095
failureCollector.unwrapAndCollect(e);
10196
onSeqNoCompleted(seqNo);
10297
});
@@ -189,7 +184,6 @@ public void finish() {
189184
public boolean isFinished() {
190185
if (finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo()) {
191186
checkFailure();
192-
restoreWarnings();
193187
return true;
194188
} else {
195189
return false;
@@ -203,7 +197,6 @@ public boolean isFinished() {
203197
public final Fetched fetchFromBuffer() {
204198
checkFailure();
205199
long persistedCheckpoint = checkpoint.getPersistedCheckpoint();
206-
restoreWarnings();
207200
if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) {
208201
persistedCheckpoint++;
209202
Fetched result = buffers.remove(persistedCheckpoint);
@@ -214,18 +207,6 @@ public final Fetched fetchFromBuffer() {
214207
}
215208
}
216209

217-
private void storeWarnings() {
218-
Set<String> warnings = HeaderWarning.getWarnings();
219-
storedWarningHeaders.addAll(warnings);
220-
}
221-
222-
private void restoreWarnings() {
223-
for (String warning : storedWarningHeaders) {
224-
HeaderWarning.addRawWarning(warning);
225-
}
226-
// TODO: Remove warnings?
227-
}
228-
229210
@Override
230211
public IsBlockedResult isBlocked() {
231212
// TODO: Add an exchange service between async operation instead?

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.compute.operator.DriverContext;
1919
import org.elasticsearch.compute.operator.IsBlockedResult;
2020
import org.elasticsearch.compute.operator.Operator;
21+
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
2122
import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
2223
import org.elasticsearch.core.Releasable;
2324
import org.elasticsearch.core.Releasables;
@@ -93,6 +94,7 @@ public Operator get(DriverContext driverContext) {
9394
private final String lookupIndex;
9495
private final String matchField;
9596
private final List<NamedExpression> loadFields;
97+
private final ResponseHeadersCollector responseHeadersCollector;
9698
private final Source source;
9799
private long totalTerms = 0L;
98100
/**
@@ -129,6 +131,7 @@ public LookupFromIndexOperator(
129131
this.matchField = matchField;
130132
this.loadFields = loadFields;
131133
this.source = source;
134+
this.responseHeadersCollector = new ResponseHeadersCollector(lookupService.getThreadContext());
132135
}
133136

134137
@Override
@@ -148,7 +151,10 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
148151
lookupService.lookupAsync(
149152
request,
150153
parentTask,
151-
listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator()))
154+
ActionListener.runBefore(
155+
listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator())),
156+
responseHeadersCollector::collect
157+
)
152158
);
153159
}
154160

@@ -220,6 +226,7 @@ public IsBlockedResult isBlocked() {
220226
protected void doClose() {
221227
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
222228
// then cancel it when this operator terminates early (e.g., have enough result).
229+
responseHeadersCollector.finish();
223230
Releasables.close(ongoing);
224231
}
225232

0 commit comments

Comments
 (0)