Skip to content

Commit be7f2b2

Browse files
authored
Merge branch 'main' into fine-grained-loggers
2 parents 2a87032 + 190bd93 commit be7f2b2

File tree

12 files changed

+32
-36
lines changed

12 files changed

+32
-36
lines changed

docs/changelog/124610.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124610
2+
summary: Remove page alignment in exchange sink
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ private static PolicyManager createPolicyManager() {
213213
new FilesEntitlement(serverModuleFileDatas)
214214
)
215215
),
216+
new Scope("java.desktop", List.of(new LoadNativeLibrariesEntitlement())),
216217
new Scope("org.apache.httpcomponents.httpclient", List.of(new OutboundNetworkEntitlement())),
217218
new Scope("io.netty.transport", List.of(new InboundNetworkEntitlement(), new OutboundNetworkEntitlement())),
218219
new Scope(

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -564,11 +564,7 @@ private void notEntitled(String message, Class<?> callerClass, ModuleEntitlement
564564
var exception = new NotEntitledException(message);
565565
// Don't emit a log for muted classes, e.g. classes containing self tests
566566
if (mutedClasses.contains(callerClass) == false) {
567-
String frameInfoSuffix = StackWalker.getInstance(RETAIN_CLASS_REFERENCE)
568-
.walk(this::findRequestingFrame)
569-
.map(frame -> "\n\tat " + frame)
570-
.orElse("");
571-
entitlements.logger().warn("Not entitled: " + message + frameInfoSuffix);
567+
entitlements.logger().warn("Not entitled:", exception);
572568
}
573569
throw exception;
574570
}

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,12 +342,6 @@ tests:
342342
- class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests
343343
method: testUpdateIndexTemplateToDataStreamLifecyclePreference
344344
issue: https://github.com/elastic/elasticsearch/issues/124837
345-
- class: org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateInlineEvalsTests
346-
method: testGroupingAliasingMoved_To_LeftSideOfJoin
347-
issue: https://github.com/elastic/elasticsearch/issues/124839
348-
- class: org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateInlineEvalsTests
349-
method: testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression
350-
issue: https://github.com/elastic/elasticsearch/issues/124838
351345
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
352346
method: testAutomaticForceMerge
353347
issue: https://github.com/elastic/elasticsearch/issues/124846

qa/ccs-unavailable-clusters/src/javaRestTest/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,12 @@ private static MockTransportService startTransport(
9191
EsExecutors.DIRECT_EXECUTOR_SERVICE,
9292
SearchShardsRequest::new,
9393
(request, channel, task) -> {
94-
channel.sendResponse(new SearchShardsResponse(List.of(), List.of(), Collections.emptyMap()));
94+
var searchShardsResponse = new SearchShardsResponse(List.of(), List.of(), Collections.emptyMap());
95+
try {
96+
channel.sendResponse(searchShardsResponse);
97+
} finally {
98+
searchShardsResponse.decRef();
99+
}
95100
}
96101
);
97102
newService.registerRequestHandler(
@@ -119,7 +124,12 @@ private static MockTransportService startTransport(
119124
builder.add(node);
120125
}
121126
ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build();
122-
channel.sendResponse(new ClusterStateResponse(clusterName, build, false));
127+
var clusterStateResponse = new ClusterStateResponse(clusterName, build, false);
128+
try {
129+
channel.sendResponse(clusterStateResponse);
130+
} finally {
131+
clusterStateResponse.decRef();
132+
}
123133
}
124134
);
125135
newService.start();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.io.IOException;
2525
import java.util.Objects;
26-
import java.util.function.Function;
2726
import java.util.function.Supplier;
2827

2928
/**
@@ -32,17 +31,14 @@
3231
public class ExchangeSinkOperator extends SinkOperator {
3332

3433
private final ExchangeSink sink;
35-
private final Function<Page, Page> transformer;
3634
private int pagesReceived;
3735
private long rowsReceived;
3836

39-
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
40-
implements
41-
SinkOperatorFactory {
37+
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
4238

4339
@Override
4440
public SinkOperator get(DriverContext driverContext) {
45-
return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
41+
return new ExchangeSinkOperator(exchangeSinks.get());
4642
}
4743

4844
@Override
@@ -51,9 +47,8 @@ public String describe() {
5147
}
5248
}
5349

54-
public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
50+
public ExchangeSinkOperator(ExchangeSink sink) {
5551
this.sink = sink;
56-
this.transformer = transformer;
5752
}
5853

5954
@Override
@@ -84,7 +79,7 @@ public boolean needsInput() {
8479
protected void doAddInput(Page page) {
8580
pagesReceived++;
8681
rowsReceived += page.getPositionCount();
87-
sink.addPage(transformer.apply(page));
82+
sink.addPage(page);
8883
}
8984

9085
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.CyclicBarrier;
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
48-
import java.util.function.Function;
4948
import java.util.function.LongSupplier;
5049

5150
import static org.hamcrest.Matchers.either;
@@ -328,7 +327,7 @@ public void testEarlyTermination() {
328327
final int maxAllowedRows = between(1, 100);
329328
final AtomicInteger processedRows = new AtomicInteger(0);
330329
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
331-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
330+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
332331
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
333332
@Override
334333
public Block eval(Page page) {
@@ -365,7 +364,7 @@ public void testResumeOnEarlyFinish() throws Exception {
365364
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
366365
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
367366
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
368-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
367+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
369368
Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);
370369
PlainActionFuture<Void> future = new PlainActionFuture<>();
371370
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.Iterator;
4040
import java.util.List;
4141
import java.util.Set;
42-
import java.util.function.Function;
4342
import java.util.stream.Collectors;
4443
import java.util.stream.IntStream;
4544
import java.util.stream.Stream;
@@ -242,7 +241,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
242241
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
243242
intermediateOperatorItr.next()
244243
),
245-
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity())
244+
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}))
246245
)
247246
);
248247
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicBoolean;
6767
import java.util.concurrent.atomic.AtomicInteger;
68-
import java.util.function.Function;
6968
import java.util.function.Supplier;
7069
import java.util.stream.Collectors;
7170
import java.util.stream.IntStream;
@@ -305,7 +304,7 @@ Set<Integer> runConcurrentTest(
305304
"sink-" + i,
306305
dc,
307306
seqNoGenerator.get(dc),
308-
new ExchangeSinkOperator(exchangeSink.get(), Function.identity())
307+
new ExchangeSinkOperator(exchangeSink.get())
309308
);
310309
drivers.add(d);
311310
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
2727
);
2828

2929
private final List<Attribute> output;
30+
// TODO: remove this flag
3031
private final boolean intermediateAgg;
3132

3233
public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {

0 commit comments

Comments
 (0)