Skip to content

Commit 671f01b

Browse files
[8.19] Fix geo_centroid over geo_shape merging multiple shards (#144637) (#144755) (#144965)
* Fix geo_centroid over geo_shape merging multiple shards (#144637) (#144755) As described in #144504 we are seeing different results between `geo_centroid` in QueryDSL and ES|QL's `ST_CENTROID_AGG` when the data is `geo_shape` (or cartesian shapes). The root cause turned out to be that `geo_centriod` was using point-only logic for merging intermediate aggregation results between shards, which means it was considering doc-count only for the weighted merge, and missing: * Area weights (large polygons count more than small ones, and long lines count more than short ones) * Dimensional type (shards with only points should not count at all if other shards contain lines or polygons) This PR contains appropriate unit tests, but manual testing on the OSM dataset we used in the benchmarking leads to: | test | ESQL doc count | ESQL centroid | Query DSL doc count| QueryDSL centroid | | --- | --- | --- | --- | --- | | original | 3818897 | POINT (0.7135620032440881 54.048676344286505) | 3818897 | {lat: 52.91002138728532, lon: 4.350566187678227} | | after area-weight fix | 3818897 | POINT (0.7135620032440897 54.04867634428662) | 3818897 | {lat: 51.63239198052063, lon: 8.3761283280999} | | after dimensional-type fix | 3818897 | POINT (0.7135620032440892 54.04867634428658) | 2017758 | {lat: 54.048676344287394, lon: 0.7135620032440999 } | Note: the small deviations on the 15th and 16th decimal point of the ES|QL results actually occur when simply re-running the same query over and over and are likely a result of floating point errors related to the order in which results are added to the [Kahan summation](https://en.wikipedia.org/wiki/Kahan_summation_algorithm) and are not related to the fixes. * Make cartesian and geo centroids more similar for easier review
1 parent 4c74987 commit 671f01b

File tree

13 files changed

+355
-29
lines changed

13 files changed

+355
-29
lines changed

docs/changelog/144637.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
area: Geo
2+
issues:
3+
- 144504
4+
pr: 144637
5+
summary: Fix `geo_centroid` over `geo_shape` merging multiple shards
6+
type: bug

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCentroid.java

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
package org.elasticsearch.search.aggregations.metrics;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.common.geo.SpatialPoint;
1314
import org.elasticsearch.common.io.stream.StreamInput;
1415
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.lucene.spatial.DimensionalShapeType;
1517
import org.elasticsearch.search.aggregations.AggregationReduceContext;
1618
import org.elasticsearch.search.aggregations.AggregatorReducer;
1719
import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -28,15 +30,31 @@
2830
* Serialization and merge logic for {@link GeoCentroidAggregator}.
2931
*/
3032
public abstract class InternalCentroid extends InternalAggregation implements CentroidAggregation {
33+
34+
private static final TransportVersion SHAPE_CENTROID_SUPPORT = TransportVersion.fromName("geo_centroid_shape_weighted_sums");
35+
36+
/**
37+
* Holds the raw weighted sums and dimensional shape type needed for correct cross-shard reduction
38+
* of shape centroids. This is {@code null} for geo_point centroids and for results from old nodes,
39+
* avoiding any memory overhead in the common geo_point case.
40+
*/
41+
public record ShapeData(double firstWeightedSum, double secondWeightedSum, double totalWeight, DimensionalShapeType shapeType) {}
42+
3143
protected final SpatialPoint centroid;
3244
protected final long count;
45+
protected final ShapeData shapeData;
3346

3447
public InternalCentroid(String name, SpatialPoint centroid, long count, Map<String, Object> metadata) {
48+
this(name, centroid, count, null, metadata);
49+
}
50+
51+
public InternalCentroid(String name, SpatialPoint centroid, long count, ShapeData shapeData, Map<String, Object> metadata) {
3552
super(name, metadata);
3653
assert (centroid == null) == (count == 0);
3754
this.centroid = centroid;
3855
assert count >= 0;
3956
this.count = count;
57+
this.shapeData = shapeData;
4058
}
4159

4260
protected abstract SpatialPoint centroidFromStream(StreamInput in) throws IOException;
@@ -55,6 +73,20 @@ protected InternalCentroid(StreamInput in) throws IOException {
5573
} else {
5674
centroid = null;
5775
}
76+
if (in.getTransportVersion().supports(SHAPE_CENTROID_SUPPORT)) {
77+
if (in.readBoolean()) {
78+
shapeData = new ShapeData(
79+
in.readDouble(),
80+
in.readDouble(),
81+
in.readDouble(),
82+
DimensionalShapeType.fromOrdinalByte(in.readByte())
83+
);
84+
} else {
85+
shapeData = null;
86+
}
87+
} else {
88+
shapeData = null;
89+
}
5890
}
5991

6092
@Override
@@ -66,6 +98,17 @@ protected void doWriteTo(StreamOutput out) throws IOException {
6698
} else {
6799
out.writeBoolean(false);
68100
}
101+
if (out.getTransportVersion().supports(SHAPE_CENTROID_SUPPORT)) {
102+
if (shapeData != null) {
103+
out.writeBoolean(true);
104+
out.writeDouble(shapeData.firstWeightedSum);
105+
out.writeDouble(shapeData.secondWeightedSum);
106+
out.writeDouble(shapeData.totalWeight);
107+
out.writeByte((byte) shapeData.shapeType.ordinal());
108+
} else {
109+
out.writeBoolean(false);
110+
}
111+
}
69112
}
70113

71114
@Override
@@ -80,33 +123,81 @@ public long count() {
80123

81124
protected abstract InternalCentroid copyWith(SpatialPoint result, long count);
82125

83-
/** Create a new centroid with by reducing from the sums and total count */
126+
/** Create a new centroid by reducing from the sums and total count (count-weighted path for geo_point). */
84127
protected abstract InternalCentroid copyWith(double firstSum, double secondSum, long totalCount);
85128

129+
/** Create a new centroid from shape-aware weighted sums (area-weighted path for geo_shape). */
130+
protected abstract InternalCentroid copyWithShapeFields(ShapeData shapeData, long count);
131+
86132
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
87133
return new AggregatorReducer() {
88134

135+
// Count-weighted accumulator (geo_point or old nodes)
89136
double firstSum = Double.NaN;
90137
double secondSum = Double.NaN;
91138
long totalCount = 0;
92139

140+
// Shape-aware accumulator (geo_shape)
141+
double combinedFirstWeighted = 0;
142+
double combinedSecondWeighted = 0;
143+
double combinedWeight = 0;
144+
long shapeCount = 0;
145+
DimensionalShapeType combinedShapeType = DimensionalShapeType.POINT;
146+
boolean hasShapeValues = false;
147+
93148
@Override
94149
public void accept(InternalAggregation aggregation) {
95150
InternalCentroid centroidAgg = (InternalCentroid) aggregation;
96151
if (centroidAgg.count > 0) {
97-
totalCount += centroidAgg.count;
98-
if (Double.isNaN(firstSum)) {
99-
firstSum = centroidAgg.count * extractFirst(centroidAgg.centroid);
100-
secondSum = centroidAgg.count * extractSecond(centroidAgg.centroid);
101-
} else {
102-
firstSum += centroidAgg.count * extractFirst(centroidAgg.centroid);
103-
secondSum += centroidAgg.count * extractSecond(centroidAgg.centroid);
152+
if (centroidAgg.shapeData != null && centroidAgg.shapeData.totalWeight > 0) {
153+
// Shape-aware path: respect dimensional type priority
154+
int cmp = centroidAgg.shapeData.shapeType.compareTo(combinedShapeType);
155+
if (hasShapeValues == false || cmp > 0) {
156+
// First shape value or higher dimension — reset
157+
combinedFirstWeighted = centroidAgg.shapeData.firstWeightedSum;
158+
combinedSecondWeighted = centroidAgg.shapeData.secondWeightedSum;
159+
combinedWeight = centroidAgg.shapeData.totalWeight;
160+
shapeCount = centroidAgg.count;
161+
combinedShapeType = centroidAgg.shapeData.shapeType;
162+
hasShapeValues = true;
163+
} else if (cmp == 0) {
164+
// Same dimension — accumulate
165+
combinedFirstWeighted += centroidAgg.shapeData.firstWeightedSum;
166+
combinedSecondWeighted += centroidAgg.shapeData.secondWeightedSum;
167+
combinedWeight += centroidAgg.shapeData.totalWeight;
168+
shapeCount += centroidAgg.count;
169+
}
170+
// cmp < 0: lower dimension — ignore
171+
} else if (centroidAgg.centroid != null) {
172+
// Count-weighted path (geo_point or BWC from old node)
173+
if (hasShapeValues) {
174+
// BWC: approximate old-node shape result as same dimension, count as weight
175+
combinedFirstWeighted += centroidAgg.count * extractFirst(centroidAgg.centroid);
176+
combinedSecondWeighted += centroidAgg.count * extractSecond(centroidAgg.centroid);
177+
combinedWeight += centroidAgg.count;
178+
shapeCount += centroidAgg.count;
179+
} else {
180+
totalCount += centroidAgg.count;
181+
if (Double.isNaN(firstSum)) {
182+
firstSum = centroidAgg.count * extractFirst(centroidAgg.centroid);
183+
secondSum = centroidAgg.count * extractSecond(centroidAgg.centroid);
184+
} else {
185+
firstSum += centroidAgg.count * extractFirst(centroidAgg.centroid);
186+
secondSum += centroidAgg.count * extractSecond(centroidAgg.centroid);
187+
}
188+
}
104189
}
105190
}
106191
}
107192

108193
@Override
109194
public InternalAggregation get() {
195+
if (hasShapeValues) {
196+
return copyWithShapeFields(
197+
new ShapeData(combinedFirstWeighted, combinedSecondWeighted, combinedWeight, combinedShapeType),
198+
shapeCount
199+
);
200+
}
110201
return copyWith(firstSum, secondSum, totalCount);
111202
}
112203
};

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ public InternalGeoCentroid(String name, SpatialPoint centroid, long count, Map<S
2828
super(name, centroid, count, metadata);
2929
}
3030

31+
/**
32+
* Constructor for shape centroid results that carry raw weighted sums for correct cross-shard reduction.
33+
*/
34+
public InternalGeoCentroid(String name, SpatialPoint centroid, long count, ShapeData shapeData, Map<String, Object> metadata) {
35+
super(name, centroid, count, shapeData, metadata);
36+
}
37+
3138
/**
3239
* Read from a stream.
3340
*/
@@ -75,6 +82,14 @@ protected InternalGeoCentroid copyWith(double firstSum, double secondSum, long t
7582
return copyWith(result, totalCount);
7683
}
7784

85+
@Override
86+
protected InternalGeoCentroid copyWithShapeFields(ShapeData shapeData, long count) {
87+
final GeoPoint result = shapeData.totalWeight() > 0
88+
? new GeoPoint(shapeData.firstWeightedSum() / shapeData.totalWeight(), shapeData.secondWeightedSum() / shapeData.totalWeight())
89+
: null;
90+
return new InternalGeoCentroid(name, result, count, shapeData, getMetadata());
91+
}
92+
7893
@Override
7994
protected String nameFirst() {
8095
return "lat";
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9325000,9250010,9185023,8841087
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
initial_8.19.13,8841086
1+
geo_centroid_shape_weighted_sums,8841087
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
search_context_missing_nodes_exception,9185021
1+
geo_centroid_shape_weighted_sums,9185023
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
search_context_missing_nodes_exception,9250007
1+
geo_centroid_shape_weighted_sums,9250010
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
dfs_search_timed_out,9294000
1+
geo_centroid_shape_weighted_sums,9325000

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/metrics/CartesianShapeCentroidAggregator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.search.aggregations.LeafBucketCollector;
1919
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
2020
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
21-
import org.elasticsearch.search.aggregations.metrics.InternalGeoCentroid;
2221
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
2322
import org.elasticsearch.search.aggregations.support.AggregationContext;
2423
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@@ -126,17 +125,21 @@ public InternalAggregation buildAggregation(long bucket) {
126125
if (bucket >= counts.size()) {
127126
return buildEmptyAggregation();
128127
}
129-
final long bucketCount = counts.get(bucket);
128+
final double bucketXSum = lonSum.get(bucket); // x-coordinate sum (named "lon" for historical reasons)
129+
final double bucketYSum = latSum.get(bucket); // y-coordinate sum (named "lat" for historical reasons)
130130
final double bucketWeight = weightSum.get(bucket);
131-
final CartesianPoint bucketCentroid = (bucketWeight > 0)
132-
? new CartesianPoint(lonSum.get(bucket) / bucketWeight, latSum.get(bucket) / bucketWeight)
131+
final long bucketCount = counts.get(bucket);
132+
final DimensionalShapeType bucketShapeType = DimensionalShapeType.fromOrdinalByte(dimensionalShapeTypes.get(bucket));
133+
final CartesianPoint bucketCentroid = bucketWeight > 0
134+
? new CartesianPoint(bucketXSum / bucketWeight, bucketYSum / bucketWeight)
133135
: null;
134-
return new InternalCartesianCentroid(name, bucketCentroid, bucketCount, metadata());
136+
var shapeData = new InternalCartesianCentroid.ShapeData(bucketXSum, bucketYSum, bucketWeight, bucketShapeType);
137+
return new InternalCartesianCentroid(name, bucketCentroid, bucketCount, shapeData, metadata());
135138
}
136139

137140
@Override
138141
public InternalAggregation buildEmptyAggregation() {
139-
return InternalGeoCentroid.empty(name, metadata());
142+
return InternalCartesianCentroid.empty(name, metadata());
140143
}
141144

142145
@Override

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/metrics/GeoShapeCentroidAggregator.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,14 @@ public InternalAggregation buildAggregation(long bucket) {
126126
if (bucket >= counts.size()) {
127127
return buildEmptyAggregation();
128128
}
129-
final long bucketCount = counts.get(bucket);
129+
final double bucketLatSum = latSum.get(bucket);
130+
final double bucketLonSum = lonSum.get(bucket);
130131
final double bucketWeight = weightSum.get(bucket);
131-
final GeoPoint bucketCentroid = (bucketWeight > 0)
132-
? new GeoPoint(latSum.get(bucket) / bucketWeight, lonSum.get(bucket) / bucketWeight)
133-
: null;
134-
return new InternalGeoCentroid(name, bucketCentroid, bucketCount, metadata());
132+
final long bucketCount = counts.get(bucket);
133+
final DimensionalShapeType bucketShapeType = DimensionalShapeType.fromOrdinalByte(dimensionalShapeTypes.get(bucket));
134+
final GeoPoint bucketCentroid = bucketWeight > 0 ? new GeoPoint(bucketLatSum / bucketWeight, bucketLonSum / bucketWeight) : null;
135+
var shapeData = new InternalGeoCentroid.ShapeData(bucketLatSum, bucketLonSum, bucketWeight, bucketShapeType);
136+
return new InternalGeoCentroid(name, bucketCentroid, bucketCount, shapeData, metadata());
135137
}
136138

137139
@Override

0 commit comments

Comments
 (0)