3737import org .elasticsearch .compute .operator .SinkOperator ;
3838import org .elasticsearch .compute .operator .SourceOperator ;
3939import org .elasticsearch .compute .test .MockBlockFactory ;
40- import org .elasticsearch .compute .test .TestDriverFactory ;
4140import org .elasticsearch .core .ReleasableRef ;
4241import org .elasticsearch .core .TimeValue ;
4342import org .elasticsearch .tasks .Task ;
@@ -301,15 +300,25 @@ Set<Integer> runConcurrentTest(
301300 int numSources = randomIntBetween (1 , 8 );
302301 List <Driver > drivers = new ArrayList <>(numSinks + numSources );
303302 for (int i = 0 ; i < numSinks ; i ++) {
304- ExchangeSinkOperator sinkOperator = new ExchangeSinkOperator (exchangeSink .get (), Function .identity ());
305303 DriverContext dc = driverContext ();
306- Driver d = TestDriverFactory .create (dc , seqNoGenerator .get (dc ), List .of (), sinkOperator , () -> {});
304+ Driver d = createDriver (
305+ "test-session:1" ,
306+ "sink-" + i ,
307+ dc ,
308+ seqNoGenerator .get (dc ),
309+ new ExchangeSinkOperator (exchangeSink .get (), Function .identity ())
310+ );
307311 drivers .add (d );
308312 }
309313 for (int i = 0 ; i < numSources ; i ++) {
310- ExchangeSourceOperator sourceOperator = new ExchangeSourceOperator (exchangeSource .get ());
311314 DriverContext dc = driverContext ();
312- Driver d = TestDriverFactory .create (dc , sourceOperator , List .of (), seqNoCollector .get (dc ));
315+ Driver d = createDriver (
316+ "test-session:2" ,
317+ "source-" + i ,
318+ dc ,
319+ new ExchangeSourceOperator (exchangeSource .get ()),
320+ seqNoCollector .get (dc )
321+ );
313322 drivers .add (d );
314323 }
315324 PlainActionFuture <Void > future = new PlainActionFuture <>();
@@ -323,6 +332,28 @@ protected void start(Driver driver, ActionListener<Void> listener) {
323332 return seqNoCollector .receivedSeqNos ;
324333 }
325334
335+ private static Driver createDriver (
336+ String sessionId ,
337+ String description ,
338+ DriverContext dc ,
339+ SourceOperator sourceOperator ,
340+ SinkOperator sinkOperator
341+ ) {
342+ return new Driver (
343+ sessionId ,
344+ "test" ,
345+ 0 ,
346+ 0 ,
347+ dc ,
348+ () -> description ,
349+ sourceOperator ,
350+ List .of (),
351+ sinkOperator ,
352+ Driver .DEFAULT_STATUS_INTERVAL ,
353+ () -> {}
354+ );
355+ }
356+
326357 public void testConcurrentWithHandlers () {
327358 BlockFactory blockFactory = blockFactory ();
328359 var sourceExchanger = new ExchangeSourceHandler (randomExchangeBuffer (), threadPool .executor (ESQL_TEST_EXECUTOR ));
0 commit comments