Skip to content

Commit 7fb4075

Browse files
committed
Get exchangeId into the exchange sink/source profile output
1 parent b7e3a1e commit 7fb4075

File tree

16 files changed

+82
-31
lines changed

16 files changed

+82
-31
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,12 @@ public void registerTransportHandler(TransportService transportService) {
123123
* @throws IllegalStateException if a sink handler for the given id already exists
124124
*/
125125
public ExchangeSinkHandler createSinkHandler(String exchangeId, int maxBufferSize) {
126-
ExchangeSinkHandler sinkHandler = new ExchangeSinkHandler(blockFactory, maxBufferSize, threadPool.relativeTimeInMillisSupplier());
126+
ExchangeSinkHandler sinkHandler = new ExchangeSinkHandler(
127+
exchangeId,
128+
blockFactory,
129+
maxBufferSize,
130+
threadPool.relativeTimeInMillisSupplier()
131+
);
127132
if (sinks.putIfAbsent(exchangeId, sinkHandler) != null) {
128133
throw new IllegalStateException("sink exchanger for id [" + exchangeId + "] already exists");
129134
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ public interface ExchangeSink {
4040
* Whether the sink is blocked on adding more pages
4141
*/
4242
IsBlockedResult waitForWriting();
43+
44+
/**
45+
* Session ID for this exchange
46+
*/
47+
String exchangeId();
4348
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ public final class ExchangeSinkHandler {
4040
private final SubscribableListener<Void> completionFuture;
4141
private final LongSupplier nowInMillis;
4242
private final AtomicLong lastUpdatedInMillis;
43+
private final String exchangeId;
4344
private final BlockFactory blockFactory;
4445

45-
public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSupplier nowInMillis) {
46+
public ExchangeSinkHandler(String exchangeId, BlockFactory blockFactory, int maxBufferSize, LongSupplier nowInMillis) {
47+
this.exchangeId = exchangeId;
4648
this.blockFactory = blockFactory;
4749
this.buffer = new ExchangeBuffer(maxBufferSize);
4850
this.completionFuture = SubscribableListener.newForked(buffer::addCompletionListener);
@@ -51,11 +53,13 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup
5153
}
5254

5355
private class ExchangeSinkImpl implements ExchangeSink {
56+
private final String exchangeId;
5457
boolean finished;
5558
private final Runnable onPageFetched;
5659
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();
5760

58-
ExchangeSinkImpl(Runnable onPageFetched) {
61+
ExchangeSinkImpl(String exchangeId, Runnable onPageFetched) {
62+
this.exchangeId = exchangeId;
5963
this.onPageFetched = onPageFetched;
6064
onChanged();
6165
buffer.addCompletionListener(onFinished);
@@ -96,6 +100,11 @@ public void addCompletionListener(ActionListener<Void> listener) {
96100
public IsBlockedResult waitForWriting() {
97101
return buffer.waitForWriting();
98102
}
103+
104+
@Override
105+
public String exchangeId() {
106+
return exchangeId;
107+
}
99108
}
100109

101110
/**
@@ -168,7 +177,7 @@ private void notifyListeners() {
168177
* @see ExchangeSinkOperator
169178
*/
170179
public ExchangeSink createExchangeSink(Runnable onPageFetched) {
171-
return new ExchangeSinkImpl(onPageFetched);
180+
return new ExchangeSinkImpl(exchangeId, onPageFetched);
172181
}
173182

174183
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void close() {
9494

9595
@Override
9696
public String toString() {
97-
return "ExchangeSinkOperator";
97+
return "ExchangeSinkOperator[" + sink.exchangeId() + "]";
9898
}
9999

100100
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public interface ExchangeSource {
3939
* Allows callers to stop reading from the source when it's blocked
4040
*/
4141
IsBlockedResult waitForReading();
42+
43+
/**
44+
* Session ID for this exchange
45+
*/
46+
String exchangeId();
4247
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
3131
*/
3232
public final class ExchangeSourceHandler {
33+
private final String exchangeId;
3334
private final ExchangeBuffer buffer;
3435
private final Executor fetchExecutor;
3536

@@ -49,7 +50,8 @@ public final class ExchangeSourceHandler {
4950
* which could otherwise be allocated for other purposes.
5051
* @param fetchExecutor the executor used to fetch pages.
5152
*/
52-
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
53+
public ExchangeSourceHandler(String exchangeId, int maxBufferSize, Executor fetchExecutor) {
54+
this.exchangeId = exchangeId;
5355
this.buffer = new ExchangeBuffer(maxBufferSize);
5456
this.fetchExecutor = fetchExecutor;
5557
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
@@ -63,9 +65,11 @@ private void checkFailure() {
6365
}
6466

6567
private class ExchangeSourceImpl implements ExchangeSource {
68+
private final String exchangeId1;
6669
private boolean finished;
6770

68-
ExchangeSourceImpl() {
71+
ExchangeSourceImpl(String exchangeId) {
72+
exchangeId1 = exchangeId;
6973
outstandingSources.trackNewInstance();
7074
}
7175

@@ -86,6 +90,11 @@ public IsBlockedResult waitForReading() {
8690
return buffer.waitForReading();
8791
}
8892

93+
@Override
94+
public String exchangeId() {
95+
return exchangeId1;
96+
}
97+
8998
@Override
9099
public void finish() {
91100
if (finished == false) {
@@ -106,7 +115,7 @@ public int bufferSize() {
106115
* @see ExchangeSinkOperator
107116
*/
108117
public ExchangeSource createExchangeSource() {
109-
return new ExchangeSourceImpl();
118+
return new ExchangeSourceImpl(exchangeId);
110119
}
111120

112121
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void close() {
8989

9090
@Override
9191
public String toString() {
92-
return "ExchangeSourceOperator";
92+
return "ExchangeSourceOperator[" + source.exchangeId() + "]";
9393
}
9494

9595
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public void testEarlyTermination() {
289289
final var sourceOperator = new CannedSourceOperator(inPages.iterator());
290290
final int maxAllowedRows = between(1, 100);
291291
final AtomicInteger processedRows = new AtomicInteger(0);
292-
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
292+
var sinkHandler = new ExchangeSinkHandler("test", driverContext.blockFactory(), positions, System::currentTimeMillis);
293293
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
294294
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
295295
@Override
@@ -324,8 +324,8 @@ public void testResumeOnEarlyFinish() throws Exception {
324324
DriverContext driverContext = driverContext();
325325
ThreadPool threadPool = threadPool();
326326
try {
327-
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
328-
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
327+
var sourceHandler = new ExchangeSourceHandler("test", between(1, 5), threadPool.executor("esql"));
328+
var sinkHandler = new ExchangeSinkHandler("test", driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
329329
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
330330
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
331331
Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,16 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
205205
Collection<List<Page>> splitInput = randomSplits(input, randomIntBetween(2, 4));
206206
BlockFactory factory = blockFactory();
207207
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(
208+
"test",
208209
factory,
209210
randomIntBetween(2, 10),
210211
threadPool.relativeTimeInMillisSupplier()
211212
);
212-
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
213+
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
214+
"test",
215+
randomIntBetween(1, 4),
216+
threadPool.executor(ESQL_TEST_EXECUTOR)
217+
);
213218
sourceExchanger.addRemoteSink(
214219
sinkExchanger::fetchPageAsync,
215220
randomBoolean(),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public void testBasic() throws Exception {
102102
for (int i = 0; i < pages.length; i++) {
103103
pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2));
104104
}
105-
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
105+
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler("test", blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
106106
AtomicInteger pagesAddedToSink = new AtomicInteger();
107107
ExchangeSink sink1 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
108108
ExchangeSink sink2 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet);
109-
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR));
109+
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler("test", 3, threadPool.executor(ESQL_TEST_EXECUTOR));
110110
ExchangeSource source = sourceExchanger.createExchangeSource();
111111
AtomicInteger pagesAddedToSource = new AtomicInteger();
112112
PlainActionFuture<Void> remoteSinkFuture = new PlainActionFuture<>();
@@ -358,7 +358,7 @@ private static Driver createDriver(
358358

359359
public void testConcurrentWithHandlers() {
360360
BlockFactory blockFactory = blockFactory();
361-
var sourceExchanger = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
361+
var sourceExchanger = new ExchangeSourceHandler("test", randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
362362
PlainActionFuture<Void> remoteSinksFuture = new PlainActionFuture<>();
363363
try (RefCountingListener refs = new RefCountingListener(remoteSinksFuture)) {
364364
List<ExchangeSinkHandler> sinkHandlers = new ArrayList<>();
@@ -367,7 +367,12 @@ public void testConcurrentWithHandlers() {
367367
if (sinkHandlers.isEmpty() == false && randomBoolean()) {
368368
sinkHandler = randomFrom(sinkHandlers);
369369
} else {
370-
sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier());
370+
sinkHandler = new ExchangeSinkHandler(
371+
"test",
372+
blockFactory,
373+
randomExchangeBuffer(),
374+
threadPool.relativeTimeInMillisSupplier()
375+
);
371376
sourceExchanger.addRemoteSink(
372377
sinkHandler::fetchPageAsync,
373378
randomBoolean(),
@@ -396,7 +401,7 @@ public void testConcurrentWithHandlers() {
396401

397402
public void testExchangeSourceContinueOnFailure() {
398403
BlockFactory blockFactory = blockFactory();
399-
var exchangeSourceHandler = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
404+
var exchangeSourceHandler = new ExchangeSourceHandler("test", randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
400405
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
401406
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
402407
Set<Integer> expectedSeqNos = ConcurrentCollections.newConcurrentSet();
@@ -407,7 +412,12 @@ public void testExchangeSourceContinueOnFailure() {
407412
PlainActionFuture<Void> remoteSinksFuture = new PlainActionFuture<>();
408413
try (RefCountingListener refs = new RefCountingListener(remoteSinksFuture)) {
409414
Supplier<ExchangeSink> exchangeSink = () -> {
410-
var sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier());
415+
var sinkHandler = new ExchangeSinkHandler(
416+
"test",
417+
blockFactory,
418+
randomExchangeBuffer(),
419+
threadPool.relativeTimeInMillisSupplier()
420+
);
411421
int failAfter = randomBoolean() ? Integer.MAX_VALUE : randomIntBetween(0, 100);
412422
AtomicInteger fetched = new AtomicInteger();
413423
int instance = randomIntBetween(1, 3);
@@ -472,7 +482,7 @@ public void testClosingSinks() {
472482
IntBlock block2 = blockFactory.newConstantIntBlockWith(1, 2);
473483
Page p1 = new Page(block1);
474484
Page p2 = new Page(block2);
475-
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
485+
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler("test", blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
476486
ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {});
477487
sink.addPage(p1);
478488
sink.addPage(p2);
@@ -487,7 +497,7 @@ public void testClosingSinks() {
487497
}
488498

489499
public void testFinishEarly() throws Exception {
490-
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic());
500+
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler("test", 20, threadPool.generic());
491501
Semaphore permits = new Semaphore(between(1, 5));
492502
BlockFactory blockFactory = blockFactory();
493503
Queue<Page> pages = ConcurrentCollections.newQueue();
@@ -558,7 +568,7 @@ public void testConcurrentWithTransportActions() {
558568
try (exchange0; exchange1; node0; node1) {
559569
String exchangeId = "exchange";
560570
Task task = new Task(1, "", "", "", null, Collections.emptyMap());
561-
var sourceHandler = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
571+
var sourceHandler = new ExchangeSourceHandler(exchangeId, randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR));
562572
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer());
563573
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
564574
sourceHandler.addRemoteSink(
@@ -628,7 +638,7 @@ public void sendResponse(TransportResponse transportResponse) {
628638
try (exchange0; exchange1; node0; node1) {
629639
String exchangeId = "exchange";
630640
Task task = new Task(1, "", "", "", null, Collections.emptyMap());
631-
var sourceHandler = new ExchangeSourceHandler(randomIntBetween(1, 128), threadPool.executor(ESQL_TEST_EXECUTOR));
641+
var sourceHandler = new ExchangeSourceHandler(exchangeId, randomIntBetween(1, 128), threadPool.executor(ESQL_TEST_EXECUTOR));
632642
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
633643
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
634644
PlainActionFuture<Void> remoteSinkFuture = new PlainActionFuture<>();
@@ -662,7 +672,7 @@ public void sendResponse(TransportResponse transportResponse) {
662672
public void testNoCyclicException() throws Exception {
663673
PlainActionFuture<Void> future = new PlainActionFuture<>();
664674
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(future)) {
665-
var exchangeSourceHandler = new ExchangeSourceHandler(between(10, 100), threadPool.generic());
675+
var exchangeSourceHandler = new ExchangeSourceHandler("test", between(10, 100), threadPool.generic());
666676
int numSinks = between(5, 10);
667677
for (int i = 0; i < numSinks; i++) {
668678
RemoteSink remoteSink = (allSourcesFinished, listener) -> threadPool.schedule(

0 commit comments

Comments
 (0)