@@ -380,12 +380,13 @@ protected void doRun() throws Exception {
380380 .get ();
381381 ensureYellowAndNoInitializingShards ("test" );
382382 request .query ("FROM test | LIMIT 10" );
383- request .pragmas (randomPragmas ());
383+ QueryPragmas pragmas = randomPragmas ();
384+ request .pragmas (pragmas );
384385 PlainActionFuture <EsqlQueryResponse > future = new PlainActionFuture <>();
385386 client .execute (EsqlQueryAction .INSTANCE , request , future );
386387 ExchangeService exchangeService = internalCluster ().getInstance (ExchangeService .class , dataNode );
387- boolean waitedForPages ;
388- final String sessionId ;
388+ final boolean waitedForPages ;
389+ final String exchangeId ;
389390 try {
390391 List <TaskInfo > foundTasks = new ArrayList <>();
391392 assertBusy (() -> {
@@ -399,13 +400,22 @@ protected void doRun() throws Exception {
399400 assertThat (tasks , hasSize (1 ));
400401 foundTasks .addAll (tasks );
401402 });
402- sessionId = foundTasks .get (0 ).taskId ().toString ();
403+ final String sessionId = foundTasks .get (0 ).taskId ().toString ();
403404 assertTrue (fetchingStarted .await (1 , TimeUnit .MINUTES ));
404- String exchangeId = exchangeService .sinkKeys ().stream ().filter (s -> s .startsWith (sessionId )).findFirst ().get ();
405+ List <String > sinkKeys = exchangeService .sinkKeys ()
406+ .stream ()
407+ .filter (
408+ s -> s .startsWith (sessionId )
409+ // exclude the node-level reduction sink
410+ && s .endsWith ("[n]" ) == false
411+ )
412+ .toList ();
413+ assertThat (sinkKeys .toString (), sinkKeys .size (), equalTo (1 ));
414+ exchangeId = sinkKeys .get (0 );
405415 ExchangeSinkHandler exchangeSink = exchangeService .getSinkHandler (exchangeId );
406416 waitedForPages = randomBoolean ();
407417 if (waitedForPages ) {
408- // do not fail exchange requests until we have some pages
418+ // do not fail exchange requests until we have some pages.
409419 assertBusy (() -> assertThat (exchangeSink .bufferSize (), greaterThan (0 )));
410420 }
411421 } finally {
@@ -417,7 +427,7 @@ protected void doRun() throws Exception {
417427 // As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is
418428 // longer than the assertBusy timeout.
419429 if (waitedForPages == false ) {
420- exchangeService .finishSinkHandler (sessionId , failure );
430+ exchangeService .finishSinkHandler (exchangeId , failure );
421431 }
422432 } finally {
423433 transportService .clearAllRules ();
0 commit comments