Skip to content

Commit c46e047

Browse files
authored
fix(stdlib): sort limit skips chunks with no rows (#5211)
The sort limit code did not skip chunks with no rows which caused it to attempt to compare invalid chunks with each other. This skips those rows but still ensures that the schema is created for them. This is similar to the same problem that was identified in #4701. That PR only addressed when the last or only chunk was zero while this one ensures that the aggregation step also handles this situation properly.
1 parent 9738cff commit c46e047

File tree

4 files changed

+35
-2
lines changed

4 files changed

+35
-2
lines changed

libflux/go/libflux/buildinfo.gen.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ var sourceHashes = map[string]string{
576576
"stdlib/universe/simple_max_test.flux": "a63b3f530e4d81451e3c71a1abeea50cff02e743c9313ab3ffd5bc3b3ce9ad2e",
577577
"stdlib/universe/skew_test.flux": "7782d41c563c77ba9f4176fa1b5f4f6107e418b7ea301e4896398dbcb514315a",
578578
"stdlib/universe/sort2_test.flux": "1d2043c0d0b38abb8dc61fc1baa6d6052fae63fea55cc6e67fd1600911513bdb",
579-
"stdlib/universe/sort_limit_test.flux": "5ae162efb25cb25c3f18b871ab30bfd5e827acb329fd92d8926011658df8bd82",
579+
"stdlib/universe/sort_limit_test.flux": "80a7a25384b86db401ffd6fd2ea58c8e4b0646f522fa121e92a00793c0a28039",
580580
"stdlib/universe/sort_rules_test.flux": "0770ae42e99b04167ca5bef8340a310b224baf1ba1928997273de9663b64684a",
581581
"stdlib/universe/sort_test.flux": "f69ebb5972762078e759af3c1cd3d852431a569dce74f3c379709c9e174bfa31",
582582
"stdlib/universe/spread_test.flux": "1ddf25e4d86b6365da254229fc8f77bd838c24a79e6a08c9c4c50330ace0a6a3",

stdlib/universe/sort.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func (s *sortTableMergeHeap) ValueLen() int {
339339

340340
func (s *sortTableMergeHeap) Table(limit int, mem memory.Allocator) (flux.Table, error) {
341341
if s.ValueLen() == 0 {
342-
// Degenerate case where there are no rows to merge sort
342+
// Degenerate case where there are no rows to merge sort.
343343
for len(s.items) > 0 {
344344
s.Pop()
345345
}

stdlib/universe/sort_limit.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ func (s *sortLimitTransformation) Aggregate(chunk table.Chunk, state interface{}
8080

8181
}
8282

83+
// If the chunk is empty, ignore this chunk.
84+
// We still return the merge heap though as we still need to recognize that
85+
// the group key/schema exist.
86+
if chunk.Len() == 0 {
87+
return mh, true, nil
88+
}
89+
8390
if err := s.appendChunk(mh, chunk, mem); err != nil {
8491
return nil, false, err
8592
}

stdlib/universe/sort_limit_test.flux

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,29 @@ testcase sort_limit_multi_successor {
170170

171171
testing.diff(got: got, want: want)
172172
}
173+
174+
testcase sort_limit_empty_chunk {
175+
got =
176+
array.from(
177+
rows: [
178+
{_time: 2022-01-11T00:00:00Z, t0: "aa", _value: 12.0},
179+
{_time: 2022-01-11T00:00:00Z, t0: "ab", _value: 10.0},
180+
{_time: 2022-01-11T00:00:00Z, t0: "ba", _value: 18.0},
181+
{_time: 2022-01-11T00:00:00Z, t0: "bb", _value: 4.0},
182+
],
183+
)
184+
|> group(columns: ["t0"])
185+
|> filter(fn: (r) => r.t0 =~ /b/, onEmpty: "keep")
186+
|> group()
187+
|> top(n: 2)
188+
189+
want =
190+
array.from(
191+
rows: [
192+
{_time: 2022-01-11T00:00:00Z, t0: "ba", _value: 18.0},
193+
{_time: 2022-01-11T00:00:00Z, t0: "ab", _value: 10.0},
194+
],
195+
)
196+
197+
testing.diff(got: got, want: want)
198+
}

0 commit comments

Comments
 (0)