Skip to content

Commit c62df16

Browse files
authored
Fix Bucket ordering for partial reduction in date histogram and histogram aggregation (#108184)
#105359. we changed the bucket ordering for partial reduces which causes issues when the output is shared with a node running on an older version. This commit reorders the output to the expected order for previous versions.
1 parent fe8cb9f commit c62df16

File tree

6 files changed

+309
-2
lines changed

6 files changed

+309
-2
lines changed

docs/changelog/108184.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 108184
2+
summary: Fix Bucket ordering for partial reduction in date histogram and histogram
3+
aggregation
4+
area: Aggregations
5+
type: bug
6+
issues:
7+
- 108181
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.upgrades;
10+
11+
import org.apache.http.HttpHost;
12+
import org.elasticsearch.Version;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.RestClient;
16+
import org.elasticsearch.common.Strings;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.test.rest.ESRestTestCase;
19+
import org.elasticsearch.test.rest.ObjectPath;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
23+
import java.io.IOException;
24+
import java.net.URLEncoder;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.stream.Collectors;
31+
32+
import static org.hamcrest.Matchers.empty;
33+
import static org.hamcrest.Matchers.in;
34+
import static org.hamcrest.Matchers.not;
35+
36+
/**
37+
* This test ensure aggregation compatibility in cross cluster search
38+
*/
39+
public class AggregationsIT extends ESRestTestCase {
40+
41+
private static final String CLUSTER_ALIAS = "remote_cluster";
42+
private static final String localIndex = "test_bwc_index";
43+
private static final String remoteIndex = "test_bwc_remote_index";
44+
private static int docs;
45+
46+
@Override
47+
protected boolean preserveClusterUponCompletion() {
48+
return true;
49+
}
50+
51+
static List<SearchStatesIT.Node> getNodes(RestClient restClient) throws IOException {
52+
Response response = restClient.performRequest(new Request("GET", "_nodes"));
53+
ObjectPath objectPath = ObjectPath.createFromResponse(response);
54+
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
55+
final List<SearchStatesIT.Node> nodes = new ArrayList<>();
56+
for (String id : nodeMap.keySet()) {
57+
final String name = objectPath.evaluate("nodes." + id + ".name");
58+
final Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version"));
59+
final String transportAddress = objectPath.evaluate("nodes." + id + ".transport.publish_address");
60+
final String httpAddress = objectPath.evaluate("nodes." + id + ".http.publish_address");
61+
final Map<String, Object> attributes = objectPath.evaluate("nodes." + id + ".attributes");
62+
nodes.add(new SearchStatesIT.Node(id, name, version, transportAddress, httpAddress, attributes));
63+
}
64+
return nodes;
65+
}
66+
67+
static List<HttpHost> parseHosts(String props) {
68+
final String address = System.getProperty(props);
69+
assertNotNull("[" + props + "] is not configured", address);
70+
String[] stringUrls = address.split(",");
71+
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
72+
for (String stringUrl : stringUrls) {
73+
int portSeparator = stringUrl.lastIndexOf(':');
74+
if (portSeparator < 0) {
75+
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
76+
}
77+
String host = stringUrl.substring(0, portSeparator);
78+
int port = Integer.parseInt(stringUrl.substring(portSeparator + 1));
79+
hosts.add(new HttpHost(host, port, "http"));
80+
}
81+
assertThat("[" + props + "] is empty", hosts, not(empty()));
82+
return hosts;
83+
}
84+
85+
static RestClient newLocalClient() {
86+
return RestClient.builder(randomFrom(parseHosts("tests.rest.cluster"))).build();
87+
}
88+
89+
static RestClient newRemoteClient() {
90+
return RestClient.builder(randomFrom(parseHosts("tests.rest.remote_cluster"))).build();
91+
}
92+
93+
@Before
94+
private void configureClusters() throws Exception {
95+
if (docs == 0) {
96+
try (RestClient localClient = newLocalClient(); RestClient remoteClient = newRemoteClient()) {
97+
configureRemoteClusters(localClient, getNodes(remoteClient));
98+
docs = between(10, 100);
99+
createindex(localClient, localIndex);
100+
createindex(remoteClient, remoteIndex);
101+
}
102+
}
103+
}
104+
105+
@After
106+
private void clearClusters() throws Exception {
107+
try (RestClient localClient = newLocalClient(); RestClient remoteClient = newRemoteClient()) {
108+
deleteIndex(localClient, localIndex);
109+
deleteIndex(remoteClient, remoteIndex);
110+
docs = 0;
111+
}
112+
}
113+
114+
private void createindex(RestClient client, String index) throws IOException {
115+
final String mapping = """
116+
"properties": {
117+
"date": { "type": "date" },
118+
"number": { "type": "integer" }
119+
}
120+
""";
121+
createIndex(client, index, Settings.EMPTY, mapping);
122+
for (int i = 0; i < docs; i++) {
123+
Request createDoc = new Request("POST", "/" + index + "/_doc/id_" + i);
124+
createDoc.setJsonEntity(Strings.format("""
125+
{ "date": %s, "number": %s }
126+
""", i * 1000 * 60, i));
127+
assertOK(client.performRequest(createDoc));
128+
}
129+
refresh(client, index);
130+
}
131+
132+
private static void configureRemoteClusters(RestClient localClient, List<SearchStatesIT.Node> remoteNodes) throws Exception {
133+
final String remoteClusterSettingPrefix = "cluster.remote." + CLUSTER_ALIAS + ".";
134+
final Settings remoteConnectionSettings;
135+
final List<String> seeds = remoteNodes.stream()
136+
.filter(n -> n.attributes().containsKey("gateway"))
137+
.map(n -> n.transportAddress())
138+
.collect(Collectors.toList());
139+
remoteConnectionSettings = Settings.builder()
140+
.putNull(remoteClusterSettingPrefix + "proxy_address")
141+
.put(remoteClusterSettingPrefix + "mode", "sniff")
142+
.putList(remoteClusterSettingPrefix + "seeds", seeds)
143+
.build();
144+
updateClusterSettings(localClient, remoteConnectionSettings);
145+
assertBusy(() -> {
146+
final Response resp = localClient.performRequest(new Request("GET", "/_remote/info"));
147+
assertOK(resp);
148+
final ObjectPath objectPath = ObjectPath.createFromResponse(resp);
149+
assertNotNull(objectPath.evaluate(CLUSTER_ALIAS));
150+
assertTrue(objectPath.evaluate(CLUSTER_ALIAS + ".connected"));
151+
}, 60, TimeUnit.SECONDS);
152+
}
153+
154+
public void testDateHistogram() throws Exception {
155+
for (int i = 0; i < 3; i++) {
156+
try (RestClient localClient = newLocalClient()) {
157+
String in = URLEncoder.encode(localIndex + ",remote_cluster:" + remoteIndex, StandardCharsets.UTF_8);
158+
Request request = new Request("POST", "/" + in + "/_search");
159+
request.setJsonEntity(
160+
"{\"aggs\": { \"hist\": { \"date_histogram\": { \"field\": \"date\", \"calendar_interval\": \"minute\" }}}}"
161+
);
162+
ObjectPath response = ObjectPath.createFromResponse(localClient.performRequest(request));
163+
assertEquals(docs, response.evaluateArraySize("aggregations.hist.buckets"));
164+
for (int j = 0; j < docs; j++) {
165+
assertEquals(2, (int) response.evaluate("aggregations.hist.buckets." + j + ".doc_count"));
166+
}
167+
}
168+
}
169+
}
170+
171+
public void testHistogram() throws Exception {
172+
for (int i = 0; i < 3; i++) {
173+
try (RestClient localClient = newLocalClient()) {
174+
String in = URLEncoder.encode(localIndex + ",remote_cluster:" + remoteIndex, StandardCharsets.UTF_8);
175+
Request request = new Request("POST", "/" + in + "/_search");
176+
request.setJsonEntity("{\"aggs\": { \"hist\": { \"histogram\": { \"field\": \"number\", \"interval\": 1 }}}}");
177+
ObjectPath response = ObjectPath.createFromResponse(localClient.performRequest(request));
178+
assertEquals(docs, response.evaluateArraySize("aggregations.hist.buckets"));
179+
for (int j = 0; j < docs; j++) {
180+
assertEquals(2, (int) response.evaluate("aggregations.hist.buckets." + j + ".doc_count"));
181+
}
182+
}
183+
}
184+
}
185+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.backwards;
10+
11+
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.Strings;
16+
import org.elasticsearch.test.rest.ESRestTestCase;
17+
import org.elasticsearch.test.rest.ObjectPath;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Test that index enough data to trigger concurrency.
23+
*/
24+
public class HistogramIT extends ESRestTestCase {
25+
26+
private static final String index = "idx";
27+
private static final int numBuckets = 1000;
28+
private static final int docsPerBuckets = 1000;
29+
30+
private int indexDocs(int numDocs, int id) throws Exception {
31+
final Request request = new Request("POST", "/_bulk");
32+
final StringBuilder builder = new StringBuilder();
33+
for (int i = 0; i < numDocs; ++i) {
34+
Object[] args = new Object[] { index, id++, i * 1000 * 60, i };
35+
builder.append(Strings.format("""
36+
{ "index" : { "_index" : "%s", "_id": "%s" } }
37+
{"date" : %s, "number" : %s }
38+
""", args));
39+
}
40+
request.setJsonEntity(builder.toString());
41+
assertOK(client().performRequest(request));
42+
return id;
43+
}
44+
45+
public void testWithConcurrency() throws Exception {
46+
final String mapping = """
47+
"properties": {
48+
"date": { "type": "date" },
49+
"number": { "type": "integer" }
50+
}
51+
""";
52+
final Settings.Builder settings = Settings.builder()
53+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 3)
54+
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
55+
createIndex(index, settings.build(), mapping);
56+
// We want to trigger concurrency so we need to index enough documents
57+
int id = 1;
58+
for (int i = 0; i < docsPerBuckets; i++) {
59+
id = indexDocs(numBuckets, id);
60+
refreshAllIndices();
61+
}
62+
// Check date histogram
63+
assertDateHistogram();
64+
// Check histogram
65+
assertHistogram();
66+
}
67+
68+
private void assertDateHistogram() throws IOException {
69+
final Request request = new Request("POST", index + "/_search");
70+
request.setJsonEntity("""
71+
{
72+
"aggs": {
73+
"hist": {
74+
"date_histogram": {
75+
"field": "date",
76+
"calendar_interval": "minute"
77+
}
78+
}
79+
}
80+
}""");
81+
final Response response = client().performRequest(request);
82+
assertOK(response);
83+
ObjectPath o = ObjectPath.createFromResponse(response);
84+
assertEquals(numBuckets, o.evaluateArraySize("aggregations.hist.buckets"));
85+
for (int j = 0; j < numBuckets; j++) {
86+
assertEquals(docsPerBuckets, (int) o.evaluate("aggregations.hist.buckets." + j + ".doc_count"));
87+
}
88+
}
89+
90+
private void assertHistogram() throws IOException {
91+
final Request request = new Request("POST", index + "/_search");
92+
request.setJsonEntity("""
93+
{
94+
"aggs": {
95+
"hist": {
96+
"histogram": {
97+
"field": "number",
98+
"interval": "1"
99+
}
100+
}
101+
}
102+
}""");
103+
final Response response = client().performRequest(request);
104+
assertOK(response);
105+
ObjectPath o = ObjectPath.createFromResponse(response);
106+
assertEquals(numBuckets, o.evaluateArraySize("aggregations.hist.buckets"));
107+
for (int j = 0; j < numBuckets; j++) {
108+
assertEquals(docsPerBuckets, (int) o.evaluate("aggregations.hist.buckets." + j + ".doc_count"));
109+
}
110+
}
111+
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,8 @@ public void accept(InternalAggregation aggregation) {
456456
@Override
457457
public InternalAggregation get() {
458458
List<Bucket> reducedBuckets = reducer.get();
459+
reducedBuckets.sort(Comparator.comparingLong(b -> b.key));
459460
if (reduceContext.isFinalReduce()) {
460-
reducedBuckets.sort(Comparator.comparingLong(b -> b.key));
461461
if (minDocCount == 0) {
462462
addEmptyBuckets(reducedBuckets, reduceContext);
463463
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,8 @@ public void accept(InternalAggregation aggregation) {
399399
@Override
400400
public InternalAggregation get() {
401401
List<Bucket> reducedBuckets = reducer.get();
402+
reducedBuckets.sort(Comparator.comparingDouble(b -> b.key));
402403
if (reduceContext.isFinalReduce()) {
403-
reducedBuckets.sort(Comparator.comparingDouble(b -> b.key));
404404
if (minDocCount == 0) {
405405
addEmptyBuckets(reducedBuckets, reduceContext);
406406
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,6 +1735,10 @@ protected static CreateIndexResponse createIndex(RestClient client, String name,
17351735
return createIndex(client, name, settings, null, null);
17361736
}
17371737

1738+
protected static CreateIndexResponse createIndex(RestClient client, String name, Settings settings, String mapping) throws IOException {
1739+
return createIndex(client, name, settings, mapping, null);
1740+
}
1741+
17381742
protected static CreateIndexResponse createIndex(String name, Settings settings, String mapping) throws IOException {
17391743
return createIndex(name, settings, mapping, null);
17401744
}

0 commit comments

Comments
 (0)