From f2a9996dc20b447e85809c8f831a7499350cf96b Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 25 Feb 2025 14:26:21 +0100 Subject: [PATCH 1/2] add shards to LuceneSource --- .../elasticsearch/compute/lucene/LuceneOperator.java | 11 ++++++++++- .../compute/lucene/LuceneSliceQueue.java | 5 +++-- .../compute/lucene/LuceneTopNSourceOperator.java | 1 + .../compute/lucene/LuceneCountOperatorTests.java | 2 +- .../compute/lucene/LuceneMaxOperatorTestCase.java | 2 +- .../compute/lucene/LuceneMinOperatorTestCase.java | 2 +- .../compute/lucene/LuceneSourceOperatorTests.java | 2 +- .../lucene/LuceneTopNSourceOperatorScoringTests.java | 4 +++- .../compute/lucene/LuceneTopNSourceOperatorTests.java | 4 +++- .../xpack/esql/action/EsqlActionTaskIT.java | 2 +- 10 files changed, 25 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 2f72c309b5f21..6039fe2d5e1d6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -231,12 +232,20 @@ int position() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.getClass().getSimpleName()).append("["); - sb.append("maxPageSize = ").append(maxPageSize); + sb.append("shards = ").append(sortedUnion(processedShards, sliceQueue.remainingShards())); + sb.append(", maxPageSize = ").append(maxPageSize); describe(sb); sb.append("]"); return sb.toString(); } + private static Set sortedUnion(Collection a, Collection b) { + var result = new TreeSet(); + result.addAll(a); + result.addAll(b); + return result; + } + protected abstract void describe(StringBuilder sb); @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 9633051781f4a..32340ace03424 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -45,8 +46,8 @@ public int totalSlices() { return totalSlices; } - public Iterable getSlices() { - return slices; + public Collection remainingShards() { + return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList(); } public static LuceneSliceQueue create( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index d25cb3a870da7..193307ae4b811 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -50,6 +50,7 @@ * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN) */ public final class LuceneTopNSourceOperator extends LuceneOperator { + public static class Factory extends LuceneOperator.Factory { private final int maxPageSize; private final List> sorts; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index 61c7582c74245..12884cba5afa6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -94,7 +94,7 @@ private LuceneCountOperator.Factory simple(DataPartitioning dataPartitioning, in @Override protected Matcher expectedToStringOfSimple() { - return matchesRegex("LuceneCountOperator\\[maxPageSize = \\d+, remainingDocs=100]"); + return matchesRegex("LuceneCountOperator\\[shards = \\[test], maxPageSize = \\d+, remainingDocs=100]"); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java index f6fba20a28889..369e85d165046 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java @@ -194,7 +194,7 @@ private void testMax(Supplier contexts, int size, int limit) { @Override protected final Matcher expectedToStringOfSimple() { - return matchesRegex("LuceneMinMaxOperator\\[maxPageSize = \\d+, remainingDocs=100]"); + return matchesRegex("LuceneMinMaxOperator\\[shards = \\[test\\], maxPageSize = \\d+, remainingDocs=100]"); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java index 3033efa50f373..be9331a68097e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java @@ -194,7 +194,7 @@ private void testMin(Supplier contexts, int size, int limit) { @Override protected final Matcher expectedToStringOfSimple() { - return matchesRegex("LuceneMinMaxOperator\\[maxPageSize = \\d+, remainingDocs=100]"); + return matchesRegex("LuceneMinMaxOperator\\[shards = \\[test], maxPageSize = \\d+, remainingDocs=100]"); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 42c9f49a2db7c..cff4d016c14fc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -99,7 +99,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i @Override protected Matcher expectedToStringOfSimple() { - return matchesRegex("LuceneSourceOperator\\[maxPageSize = \\d+, remainingDocs = \\d+]"); + return matchesRegex("LuceneSourceOperator\\[shards = \\[test], maxPageSize = \\d+, remainingDocs = \\d+]"); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 3af21ba37d088..4b26e7cbe56cc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -109,7 +109,9 @@ public Optional buildSort(List> sorts) { @Override protected Matcher expectedToStringOfSimple() { - return matchesRegex("LuceneTopNSourceOperator\\[maxPageSize = \\d+, limit = 100, scoreMode = COMPLETE, sorts = \\[\\{.+}]]"); + return matchesRegex( + "LuceneTopNSourceOperator\\[shards = \\[test], maxPageSize = \\d+, limit = 100, scoreMode = COMPLETE, sorts = \\[\\{.+}]]" + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 92eaa78eedcd7..80271477dac22 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -115,7 +115,9 @@ public Optional buildSort(List> sorts) { @Override protected Matcher expectedToStringOfSimple() { var s = scoring ? "COMPLETE" : "TOP_DOCS"; - return matchesRegex("LuceneTopNSourceOperator\\[maxPageSize = \\d+, limit = 100, scoreMode = " + s + ", sorts = \\[\\{.+}]]"); + return matchesRegex( + "LuceneTopNSourceOperator\\[shards = \\[test], maxPageSize = \\d+, limit = 100, scoreMode = " + s + ", sorts = \\[\\{.+}]]" + ); } @Override diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index b15e4cfe739f0..d5986c8d97362 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -106,7 +106,7 @@ public void testTaskContents() throws Exception { String taskDescription = status.taskDescription(); for (DriverStatus.OperatorStatus o : status.activeOperators()) { logger.info("status {}", o); - if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) { + if (o.operator().startsWith("LuceneSourceOperator[")) { assertThat(taskDescription, equalTo("data")); LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices())); From 526da8436c4e5144caecb7a876588337485ce360 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 26 Feb 2025 13:09:53 +0100 Subject: [PATCH 2/2] upd --- .../java/org/elasticsearch/compute/lucene/LuceneOperator.java | 2 +- .../java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 6039fe2d5e1d6..cb862d5dc277c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -232,7 +232,7 @@ int position() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.getClass().getSimpleName()).append("["); - sb.append("shards = ").append(sortedUnion(processedShards, sliceQueue.remainingShards())); + sb.append("shards = ").append(sortedUnion(processedShards, sliceQueue.remainingShardsIdentifiers())); sb.append(", maxPageSize = ").append(maxPageSize); describe(sb); sb.append("]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 32340ace03424..0407e0f726044 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -46,7 +46,7 @@ public int totalSlices() { return totalSlices; } - public Collection remainingShards() { + public Collection remainingShardsIdentifiers() { return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList(); }