Skip to content

Commit f68f271

Browse files
committed
Merge branch 'refs/heads/main' into avoid-regex-extract-attributes-removal
2 parents 7c0ff44 + 4c0c9b6 commit f68f271

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

distribution/docker/src/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<% if (docker_base == 'iron_bank') { %>
2323
ARG BASE_REGISTRY=registry1.dso.mil
2424
ARG BASE_IMAGE=ironbank/redhat/ubi/ubi9
25-
ARG BASE_TAG=9.5
25+
ARG BASE_TAG=9.6
2626
<% } %>
2727
2828
################################################################################

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ public boolean maybeFetchFullEntry(
500500
ActionListener<Void> listener
501501
) {
502502
int finalRegion = getEndingRegion(length);
503+
// TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better?
503504
if (freeRegionCount() < finalRegion) {
504505
// Not enough room to download a full file without evicting existing data, so abort
505506
listener.onResponse(null);
@@ -571,7 +572,7 @@ public void maybeFetchRegion(
571572
final Executor fetchExecutor,
572573
final ActionListener<Boolean> listener
573574
) {
574-
if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) {
575+
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
575576
// no free page available and no old enough unused region to be evicted
576577
logger.info("No free regions, skipping loading region [{}]", region);
577578
listener.onResponse(false);
@@ -619,7 +620,7 @@ public void maybeFetchRange(
619620
final Executor fetchExecutor,
620621
final ActionListener<Boolean> listener
621622
) {
622-
if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) {
623+
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
623624
// no free page available and no old enough unused region to be evicted
624625
logger.info("No free regions, skipping loading region [{}]", region);
625626
listener.onResponse(false);
@@ -671,7 +672,11 @@ private static void throwAlreadyClosed(String message) {
671672
throw new AlreadyClosedException(message);
672673
}
673674

674-
// used by tests
675+
/**
676+
* NOTE: Method is package private mostly to allow checking the number of fee regions in tests.
677+
* However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try
678+
* to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation.
679+
*/
675680
int freeRegionCount() {
676681
return freeRegions.size();
677682
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,6 @@ private class DataNodeRequestExecutor {
248248
}
249249

250250
void start() {
251-
parentTask.addListener(
252-
() -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled()))
253-
);
254251
runBatch(0);
255252
}
256253

@@ -419,7 +416,12 @@ private void runComputeOnDataNode(
419416
var parentListener = computeListener.acquireAvoid();
420417
try {
421418
// run compute with target shards
419+
var externalSink = exchangeService.getSinkHandler(externalId);
422420
var internalSink = exchangeService.createSinkHandler(request.sessionId(), request.pragmas().exchangeBufferSize());
421+
task.addListener(() -> {
422+
exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled()));
423+
exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(task.getReasonCancelled()));
424+
});
423425
DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor(
424426
request,
425427
task,
@@ -431,10 +433,6 @@ private void runComputeOnDataNode(
431433
);
432434
dataNodeRequestExecutor.start();
433435
// run the node-level reduction
434-
var externalSink = exchangeService.getSinkHandler(externalId);
435-
task.addListener(
436-
() -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled()))
437-
);
438436
var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor);
439437
exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
440438
var reductionListener = computeListener.acquireCompute();

0 commit comments

Comments
 (0)