Skip to content

Commit fc24726

Browse files
Fix missing Lookup Join Warnings
1 parent 928f89f commit fc24726

File tree

4 files changed

+22
-6
lines changed

4 files changed

+22
-6
lines changed

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,6 @@ tests:
390390
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
391391
method: testLookupExplosionManyMatchesManyFields
392392
issue: https://github.com/elastic/elasticsearch/issues/143347
393-
- class: org.elasticsearch.xpack.esql.action.LookupJoinIT
394-
method: testMultiValueJoinKeyWarnings
395-
issue: https://github.com/elastic/elasticsearch/issues/143350
396393
- class: org.elasticsearch.xpack.esql.CsvIT
397394
method: test {csv-spec:k8s-timeseries-irate.*}
398395
issue: https://github.com/elastic/elasticsearch/issues/143368
@@ -432,9 +429,6 @@ tests:
432429
- class: org.elasticsearch.xpack.logsdb.StandardToLogsDbIndexModeRollingUpgradeIT
433430
method: testIndexing
434431
issue: https://github.com/elastic/elasticsearch/issues/143426
435-
- class: org.elasticsearch.xpack.esql.CsvIT
436-
method: test {csv-spec:lookup-join.MvJoinKeyOnFromAfterStats}
437-
issue: https://github.com/elastic/elasticsearch/issues/143427
438432
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
439433
method: test {yaml=search.retrievers/result-diversification/10_mmr_result_diversification_retriever/Test MMR result diversification single index float type}
440434
issue: https://github.com/elastic/elasticsearch/issues/143430

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/BidirectionalBatchExchangeClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.compute.data.Page;
1818
import org.elasticsearch.compute.operator.FailureCollector;
1919
import org.elasticsearch.compute.operator.IsBlockedResult;
20+
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
2021
import org.elasticsearch.core.Nullable;
2122
import org.elasticsearch.tasks.Task;
2223
import org.elasticsearch.tasks.TaskCancelledException;
@@ -118,6 +119,8 @@ public interface ServerSetupCallback {
118119
@Nullable
119120
private final BiConsumer<String, String> lookupPlanConsumer;
120121

122+
private final ResponseHeadersCollector responseHeadersCollector;
123+
121124
/**
122125
* Create a new BidirectionalBatchExchangeClient.
123126
*
@@ -155,6 +158,7 @@ public BidirectionalBatchExchangeClient(
155158
this.lookupPlanConsumer = lookupPlanConsumer;
156159
this.maxWorkers = maxWorkers;
157160
this.serverNodeSupplier = serverNodeSupplier;
161+
this.responseHeadersCollector = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext());
158162
logger.debug(
159163
"[LookupJoinClient] Created BidirectionalBatchExchangeClient: sharedExchangeId={}, maxBufferSize={}, maxWorkers={}",
160164
sharedExchangeId,
@@ -368,6 +372,7 @@ private void sendBatchExchangeStatusRequest(Worker worker) {
368372
worker.serverToClientId,
369373
executor,
370374
ActionListener.<BatchExchangeStatusResponse>wrap(response -> {
375+
responseHeadersCollector.collect();
371376
logger.debug(
372377
"[LookupJoinClient] Received batch exchange status response for worker={}, success={}",
373378
worker.workerId,
@@ -386,6 +391,7 @@ private void sendBatchExchangeStatusRequest(Worker worker) {
386391
worker.statusRef.onFailure(failure);
387392
}
388393
}, failure -> {
394+
responseHeadersCollector.collect();
389395
logger.error(
390396
"[LookupJoinClient] Failed to receive batch exchange status response for worker={}: {}",
391397
worker.workerId,
@@ -830,6 +836,15 @@ private void stop() {
830836
}
831837
}
832838

839+
/**
840+
* Applies collected response headers (e.g. warnings from server-side drivers) to the current
841+
* thread's context. Must be called on the coordinator driver thread (typically from the
842+
* operator's {@code close()} method) so that {@code DriverRunner} can propagate them.
843+
*/
844+
public void finishCollectingResponseHeaders() {
845+
responseHeadersCollector.finish();
846+
}
847+
833848
/**
834849
* Closes the client. The driver guarantees all workers have completed (via
835850
* {@link #waitForServerResponse()} blocking on {@link #allWorkersCompleted}) before

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/BidirectionalBatchExchangeServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.compute.operator.Driver;
2121
import org.elasticsearch.compute.operator.DriverContext;
2222
import org.elasticsearch.compute.operator.Operator;
23+
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
2324
import org.elasticsearch.compute.operator.SinkOperator;
2425
import org.elasticsearch.core.Nullable;
2526
import org.elasticsearch.core.Releasable;
@@ -68,6 +69,7 @@ public final class BidirectionalBatchExchangeServer extends BidirectionalBatchEx
6869
private final DiscoveryNode clientNode; // Client node for transport connection
6970
private PlainActionFuture<Void> driverFuture; // Future for driver completion
7071
private ThreadContext threadContext; // Thread context for starting driver
72+
private ResponseHeadersCollector responseHeadersCollector;
7173
private volatile boolean driverPrepared = false; // Whether driver has been prepared but not started
7274
private volatile boolean driverStarted = false; // Whether driver has been started (client sent BatchExchangeStatusRequest)
7375
private ScheduledFuture<?> clientReadyTimeoutFuture; // Timeout for client to send BatchExchangeStatusRequest
@@ -269,6 +271,7 @@ private void onClientReady() {
269271
private ActionListener<Void> createDriverCompletionListener() {
270272
return ActionListener.wrap(ignored -> {
271273
logger.debug("[LookupJoinServer] Driver completion listener onResponse called (success) for exchangeId={}", serverToClientId);
274+
responseHeadersCollector.collect();
272275
driverFuture.onResponse(null);
273276
logger.debug("[LookupJoinServer] Batch processing completed successfully for exchangeId={}", serverToClientId);
274277
// Close server resources BEFORE releasing the driver ref
@@ -292,6 +295,7 @@ private ActionListener<Void> createDriverCompletionListener() {
292295
serverToClientId,
293296
failure != null ? failure.getMessage() : "unknown"
294297
);
298+
responseHeadersCollector.collect();
295299
// Complete the future first so close() won't throw
296300
driverFuture.onFailure(failure);
297301
// Close server resources BEFORE releasing the driver ref
@@ -313,6 +317,7 @@ private ActionListener<Void> createDriverCompletionListener() {
313317
* The listener is stored when BatchExchangeStatusRequest is received, before processing starts.
314318
*/
315319
private void sendBatchExchangeStatusResponse(@Nullable Exception failure) {
320+
responseHeadersCollector.finish();
316321
ActionListener<BatchExchangeStatusResponse> listener = batchExchangeStatusListener;
317322
if (listener != null) {
318323
logger.debug(
@@ -453,6 +458,7 @@ private void startBatchProcessing(
453458

454459
// Store thread context for later driver startup
455460
this.threadContext = threadContext;
461+
this.responseHeadersCollector = new ResponseHeadersCollector(threadContext);
456462

457463
// Handler was already registered in initialize(), no need to register again
458464
logger.debug(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ public void close() {
643643
} catch (Exception e) {
644644
logger.error("Error finishing client", e);
645645
}
646+
client.finishCollectingResponseHeaders();
646647
try {
647648
client.close();
648649
} catch (Exception e) {

0 commit comments

Comments
 (0)