Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121552
summary: Fix a bug in TOP
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ public void collect(BytesRef value, int bucket) {
*/
public void merge(int bucket, BytesRefBucketedSort other, int otherBucket) {
long otherRootIndex = other.common.rootIndex(otherBucket);
if (otherRootIndex >= other.values.size()) {
long otherEnd = other.common.endIndex(otherRootIndex);
if (otherEnd >= other.values.size()) {
Copy link
Contributor

@alex-spies alex-spies Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me how this guards against oversizing of other.values. Without being too familiar with this code, couldn't it happen in theory that the oversizing is by more than a whole bucketsize, and then otherEnd may still be < other.values.size() while the value may still not have been collected?

Would it make sense to, alternatively, check if the value at the otherRootIndex is not null? Per the invariant check, if any value was ever collected into the bucket, then the root value must not be null, right? So the condition would be otherRootIndex >= other.values.size() || other.values.get(otherRootIndex) == null.

If I'm reading this correctly, the present code would still trigger a failed assertion at other.checkInvariant(otherBucket) if the other.values array was oversized by bucketSize or more, and no value was ever collected into that bucket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always fill the newly allocated gather offsets with the appropriate encoded value. We wanted to make sure we didn't have to do that test.

I suppose ideally we would size in units of whole buckets. I think I could do that actually. That'd be better. No reason to have this half grown array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't resizing in units of whole buckets still have the problem that you may have sized the array such that there's enough room for one more bucket, but the bucket itself is not yet in use/empty? Or do we only ever resize when we immediately put a value into the bucket AND never go over the size of a single bucket when resizing? If the latter is true, then I think we need to put a comment here as it's non-obvious IMHO.

Put differently, I understand that the current condition otherRootIndex >= other.values.size() is insufficient to determine all cases when the other bucket is actually empty. And it'd be great if the condition that we end up using is somewhat reasonably easy to prove correct; as when it's not correct, we get AssertErrors, which cause whole test suites to fail and be muted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose ideally we would size in units of whole buckets.

I've pushed a patch that does that. It's a little bigger but feels nicer.

// The value was never collected.
return;
}
other.checkInvariant(otherBucket);
long otherStart = other.startIndex(otherBucket, otherRootIndex);
long otherEnd = other.common.endIndex(otherRootIndex);
// TODO: This can be improved for heapified buckets by making use of the heap structures
for (long i = otherStart; i < otherEnd; i++) {
collect(other.values.get(i).bytesRefView(), bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,42 @@ public final void testMergeThisBigger() {
}
}

public final void testMergePastEnd() {
int buckets = 10000;
int bucketSize = between(1, 1000);
int target = between(0, buckets);
List<V> values = randomList(buckets, buckets, this::randomValue);
Collections.sort(values);
try (T sort = build(SortOrder.ASC, bucketSize)) {
// Add a single value to the main sort.
for (int b = 0; b < buckets; b++) {
collect(sort, values.get(b), b);
}

try (T other = build(SortOrder.ASC, bucketSize)) {
// Add *all* values to the target bucket of the secondary sort.
for (int i = 0; i < values.size(); i++) {
if (i != target) {
collect(other, values.get(i), target);
}
}

// Merge all buckets pairwise. Most of the secondary ones are empty.
for (int b = 0; b < buckets; b++) {
merge(sort, b, other, b);
}
}

for (int b = 0; b < buckets; b++) {
if (b == target) {
assertBlock(sort, b, values.subList(0, bucketSize));
} else {
assertBlock(sort, b, List.of(values.get(b)));
}
}
}
}

protected void assertBlock(T sort, int groupId, List<V> values) {
var blockFactory = TestBlockFactory.getNonBreakingInstance();

Expand Down