Skip to content

Commit eca05d2

Browse files
committed
Add logic to handle time bucket offset in window functions
1 parent cfd3d7a commit eca05d2

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/WindowGroupingAggregatorFunction.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,14 @@ private void evaluateFinalWithWindow(
101101
next.evaluateIntermediate(intermediateBlocks, 0, selected);
102102
Page page = new Page(intermediateBlocks);
103103
GroupingAggregatorFunction finalAggFunction = finalAgg.aggregatorFunction();
104-
int[] positionsWithState = positionsWithIntermediateState(page);
105-
boolean hasNullIntermediateState = positionsWithState.length != page.getPositionCount();
104+
boolean hasNullIntermediateState = hasNullIntermediateState(page);
106105
if (hasNullIntermediateState) {
107-
addNonNullIntermediateInput(finalAggFunction, selected, page, positionsWithState);
106+
addNonNullIntermediateInput(finalAggFunction, selected, page);
108107
} else {
109108
finalAggFunction.addIntermediateInput(0, selected, page);
110109
}
111110
for (int i = 0; i < selected.getPositionCount(); i++) {
112-
groupId = selected.getInt(i);
111+
int groupId = selected.getInt(i);
113112
mergeBucketsFromWindow(groupId, backwards, page, finalAggFunction, evaluationContext, hasNullIntermediateState);
114113
}
115114
} finally {
@@ -186,14 +185,12 @@ private void mergeBucketsFromWindow(
186185
}
187186
}
188187

189-
private static void addNonNullIntermediateInput(GroupingAggregatorFunction fn, IntVector groups, Page page, int[] positionsWithState) {
190-
if (positionsWithState.length == 0) {
188+
private static void addNonNullIntermediateInput(GroupingAggregatorFunction fn, IntVector groups, Page page) {
189+
int[] positions = positionsWithIntermediateState(page);
190+
if (positions.length == 0) {
191191
return;
192192
}
193-
try (
194-
IntVector filteredGroups = groups.filter(false, positionsWithState);
195-
Page filteredPage = filterPage(page, positionsWithState)
196-
) {
193+
try (IntVector filteredGroups = groups.filter(false, positions); Page filteredPage = filterPage(page, positions)) {
197194
fn.addIntermediateInput(0, filteredGroups, filteredPage);
198195
}
199196
}
@@ -209,6 +206,10 @@ private static int[] positionsWithIntermediateState(Page page) {
209206
return Arrays.copyOf(positions, count);
210207
}
211208

209+
private static boolean hasNullIntermediateState(Page page) {
210+
return positionsWithIntermediateState(page).length != page.getPositionCount();
211+
}
212+
212213
private static boolean hasIntermediateState(Page page, int position) {
213214
for (int i = 0; i < page.getBlockCount(); i++) {
214215
if (page.getBlock(i).isNull(position)) {

0 commit comments

Comments
 (0)