Skip to content

Commit e24f89d

Browse files
committed
SQL: Fix wrong results when sorting on aggregate (#43154)
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909ed)
1 parent f032c27 commit e24f89d

File tree

16 files changed

+293
-101
lines changed

16 files changed

+293
-101
lines changed

x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer
3232
@Override
3333
protected int fetchSize() {
3434
// using a smaller fetchSize for nested documents' tests to uncover bugs
35-
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
35+
// similar to https://github.com/elastic/elasticsearch/issues/35176 quicker
3636
return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
3737
}
3838
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ public interface ErrorsTestCase {
2121
void testSelectGroupByScore() throws Exception;
2222
void testSelectScoreSubField() throws Exception;
2323
void testSelectScoreInScalar() throws Exception;
24+
void testHardLimitForSortOnAggregate() throws Exception;
2425
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ protected SecurityConfig securityConfig() {
5656
return null;
5757
}
5858

59-
protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
60-
Request request = new Request("PUT", "/" + index + "/doc/1");
59+
protected void index(String index, int docId, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
60+
Request request = new Request("PUT", "/" + index + "/doc/" + docId);
6161
request.addParameter("refresh", "true");
6262
XContentBuilder builder = JsonXContent.contentBuilder().startObject();
6363
body.accept(builder);
@@ -66,6 +66,10 @@ protected void index(String index, CheckedConsumer<XContentBuilder, IOException>
6666
client().performRequest(request);
6767
}
6868

69+
protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
70+
index(index, 1, body);
71+
}
72+
6973
public String command(String command) throws IOException {
7074
return cli.command(command);
7175
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,15 @@ public void testSelectScoreInScalar() throws Exception {
9797
assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine());
9898
}
9999

100+
@Override
101+
public void testHardLimitForSortOnAggregate() throws Exception {
102+
index("test", body -> body.field("a", 1).field("b", 2));
103+
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000");
104+
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END,
105+
commandResult);
106+
}
107+
100108
public static void assertFoundOneProblem(String commandResult) {
101109
assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult);
102110
}
103-
104111
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,32 @@ public void testInvalidFetchSize() throws IOException {
4949
assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END,
5050
command("fetch size = " + Long.MAX_VALUE));
5151
}
52+
53+
// Test for issue: https://github.com/elastic/elasticsearch/issues/42851
54+
// Even though fetch size and limit are smaller than the noRows, all buckets
55+
// should be processed to achieve the global ordering of the aggregate function.
56+
public void testOrderingOnAggregate() throws IOException {
57+
Request request = new Request("PUT", "/test/doc/_bulk");
58+
request.addParameter("refresh", "true");
59+
StringBuilder bulk = new StringBuilder();
60+
for (int i = 1; i <= 100; i++) {
61+
bulk.append("{\"index\":{}}\n");
62+
bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n");
63+
}
64+
request.setJsonEntity(bulk.toString());
65+
client().performRequest(request);
66+
67+
assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4"));
68+
assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"",
69+
command("fetch separator = \" -- fetch sep -- \""));
70+
assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)"));
71+
assertThat(readLine(), containsString("----------"));
72+
for (int i = 100; i > 80; i--) {
73+
if (i < 100 && i % 4 == 0) {
74+
assertThat(readLine(), containsString(" -- fetch sep -- "));
75+
}
76+
assertThat(readLine(), containsString(Integer.toString(i)));
77+
}
78+
assertEquals("", readLine());
79+
}
5280
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,14 @@ public void testSelectScoreInScalar() throws Exception {
116116
assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function"));
117117
}
118118
}
119+
120+
@Override
121+
public void testHardLimitForSortOnAggregate() throws Exception {
122+
index("test", body -> body.field("a", 1).field("b", 2));
123+
try (Connection c = esJdbc()) {
124+
SQLException e = expectThrows(SQLException.class, () ->
125+
c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery());
126+
assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage());
127+
}
128+
}
119129
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ public SqlSpecTestCase(String fileName, String groupName, String testName, Integ
7575
this.query = query;
7676
}
7777

78+
@Override
79+
protected int fetchSize() {
80+
// using a smaller fetchSize for nested documents' tests to uncover bugs
81+
// similar to https://github.com/elastic/elasticsearch/issues/42581
82+
return randomIntBetween(1, 20);
83+
}
84+
7885
@Override
7986
protected final void doTest() throws Throwable {
8087
// we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,13 @@ public void testSelectScoreInScalar() throws Exception {
281281
containsString("line 1:12: [SCORE()] cannot be an argument to a function"));
282282
}
283283

284+
@Override
285+
public void testHardLimitForSortOnAggregate() throws Exception {
286+
index("{\"a\": 1, \"b\": 2}");
287+
expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"),
288+
containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]"));
289+
}
290+
284291
protected void expectBadRequest(CheckedSupplier<Map<String, Object>, Exception> code, Matcher<String> errorMessageMatcher) {
285292
try {
286293
Map<String, Object> result = code.get();

x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy
9999
SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary);
100100

101101
aggWithTieBreakerDescAsc
102-
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC;
102+
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50;
103103

104104
aggWithTieBreakerDescDesc
105-
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC;
105+
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50;
106106

107107
aggWithTieBreakerAscDesc
108-
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC;
108+
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50;
109109

110110
aggWithMixOfOrdinals
111111
SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3;

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ public void onResponse(SearchResponse r) {
133133
return;
134134
}
135135

136-
updateCompositeAfterKey(r, query);
137-
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), indices);
136+
boolean hasAfterKey = updateCompositeAfterKey(r, query);
137+
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
138+
hasAfterKey ? serializeQuery(query) : null, indices);
138139
listener.onResponse(rowSet);
139140
} catch (Exception ex) {
140141
listener.onFailure(ex);
@@ -167,7 +168,7 @@ static CompositeAggregation getComposite(SearchResponse response) {
167168
throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass());
168169
}
169170

170-
static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
171+
static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
171172
CompositeAggregation composite = getComposite(r);
172173

173174
if (composite == null) {
@@ -176,22 +177,25 @@ static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next)
176177

177178
Map<String, Object> afterKey = composite.afterKey();
178179
// a null after-key means done
179-
if (afterKey != null) {
180-
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
181-
// update after-key with the new value
182-
if (aggBuilder instanceof CompositeAggregationBuilder) {
183-
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
184-
comp.aggregateAfter(afterKey);
185-
} else {
186-
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
187-
}
180+
if (afterKey == null) {
181+
return false;
182+
}
183+
184+
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
185+
// update after-key with the new value
186+
if (aggBuilder instanceof CompositeAggregationBuilder) {
187+
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
188+
comp.aggregateAfter(afterKey);
189+
return true;
190+
} else {
191+
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
188192
}
189193
}
190194

191195
/**
192196
* Deserializes the search source from a byte array.
193197
*/
194-
static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
198+
private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
195199
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) {
196200
return new SearchSourceBuilder(in);
197201
}

0 commit comments

Comments
 (0)