Skip to content
Closed

Draft #144712

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions server/src/main/java/org/elasticsearch/common/time/DateUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,43 @@ public static long roundFloor(long utcMillis, final long unitMillis) {
}
}

/**
* Rounds the given utc milliseconds since the epoch down to the next offset unit millis.
*
* @param utcMillis the milliseconds since the epoch
* @param unitMillis the unit to round to
* @param offsetMillis the offset of the unit grid
* @return the rounded milliseconds since the epoch
*/
public static long roundFloor(long utcMillis, long unitMillis, long offsetMillis) {
return roundFloor(utcMillis - offsetMillis, unitMillis) + offsetMillis;
}

/**
* Returns the remainder r such that:
* <p>
* utcMillis = roundFloor(utcMillis, unitMillis) + r
* @param utcMillis the milliseconds since the epoch
* @param unitMillis the unit to round to
* @return offset from {@link #roundFloor(long, long)} within the unit
*/
public static long floorRemainder(long utcMillis, long unitMillis) {
return utcMillis - roundFloor(utcMillis, unitMillis);
}

/**
* Rounds the given utc milliseconds since the epoch up to the next offset unit millis.
*
* @param utcMillis the milliseconds since the epoch
* @param unitMillis the unit to round to
* @param offsetMillis the offset of the unit grid
* @return the rounded milliseconds since the epoch
*/
public static long roundCeil(long utcMillis, long unitMillis, long offsetMillis) {
long rounded = roundFloor(utcMillis, unitMillis, offsetMillis);
return rounded < utcMillis ? rounded + unitMillis : rounded;
}

/**
* Round down to the beginning of the quarter of the year of the specified time
* @param utcMillis the milliseconds since the epoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,39 @@ public void testRoundFloor() {
assertThat(rounded, is(result.toInstant().toEpochMilli()));
}

public void testFloorRemainder() {
assertThat(DateUtils.floorRemainder(0L, 5_000L), is(0L));
assertThat(DateUtils.floorRemainder(12_002L, 5_000L), is(2_002L));

// -1 / 5000: floor is -5000, remainder is 4999
assertThat(DateUtils.floorRemainder(-1L, 5_000L), is(4_999L));
// -7 / 5: floor is -10, remainder is 3
assertThat(DateUtils.floorRemainder(-7L, 5L), is(3L));
// -10 / 5: exactly on boundary, remainder is 0
assertThat(DateUtils.floorRemainder(-10L, 5L), is(0L));

// 2024-01-01T12:00:15Z = 1704110415000 ms; 1704110415000 % 13000 = 7000
long anchorMillis = Instant.parse("2024-01-01T12:00:15Z").toEpochMilli();
assertThat(anchorMillis, is(1_704_110_415_000L));
assertThat(DateUtils.floorRemainder(anchorMillis, 13_000L), is(7_000L));
}

public void testRoundWithOffset() {
long anchorMillis = Instant.parse("2024-01-01T12:00:15Z").toEpochMilli();
long unitMillis = 13_000L;
long offsetMillis = DateUtils.floorRemainder(anchorMillis, unitMillis);

assertThat(offsetMillis, is(7_000L));
assertThat(DateUtils.roundFloor(anchorMillis, unitMillis, offsetMillis), is(anchorMillis));
assertThat(DateUtils.roundFloor(anchorMillis - 1, unitMillis, offsetMillis), is(anchorMillis - unitMillis));
assertThat(DateUtils.roundCeil(anchorMillis, unitMillis, offsetMillis), is(anchorMillis));
assertThat(DateUtils.roundCeil(anchorMillis - 1, unitMillis, offsetMillis), is(anchorMillis));
assertThat(DateUtils.roundCeil(anchorMillis + 1, unitMillis, offsetMillis), is(anchorMillis + unitMillis));

assertThat(DateUtils.roundFloor(-1L, 5L, 2L), is(-3L));
assertThat(DateUtils.roundCeil(-1L, 5L, 2L), is(2L));
}

public void testRoundQuarterOfYear() {
assertThat(DateUtils.roundQuarterOfYear(0), is(0L));
long lastQuarter1969 = ZonedDateTime.of(1969, 10, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.core.Releasables;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -95,10 +96,16 @@ private void evaluateFinalWithWindow(
try {
next.evaluateIntermediate(intermediateBlocks, 0, selected);
Page page = new Page(intermediateBlocks);
finalAgg.aggregatorFunction().addIntermediateInput(0, selected, page);
GroupingAggregatorFunction finalAggFunction = finalAgg.aggregatorFunction();
boolean hasNullIntermediateState = hasNullIntermediateState(page);
if (hasNullIntermediateState) {
addNonNullIntermediateInput(finalAggFunction, selected, page);
} else {
finalAggFunction.addIntermediateInput(0, selected, page);
}
for (int i = 0; i < selected.getPositionCount(); i++) {
int groupId = selected.getInt(i);
mergeBucketsFromWindow(groupId, backwards, page, finalAgg.aggregatorFunction(), evaluationContext);
mergeBucketsFromWindow(groupId, backwards, page, finalAggFunction, evaluationContext, hasNullIntermediateState);
}
} finally {
Releasables.close(intermediateBlocks);
Expand Down Expand Up @@ -148,21 +155,81 @@ private void mergeBucketsFromWindow(
int[] groupIdToPositions,
Page page,
GroupingAggregatorFunction fn,
TimeSeriesGroupingAggregatorEvaluationContext context
TimeSeriesGroupingAggregatorEvaluationContext context,
boolean hasNullIntermediateState
) {
var groupIds = context.groupIdsFromWindow(startingGroupId, window);
if (groupIds.size() > 1) {
try (IntVector oneGroup = context.driverContext().blockFactory().newConstantIntVector(startingGroupId, 1)) {
for (int g : groupIds) {
if (g != startingGroupId) {
int position = groupIdToPositions[g];
fn.addIntermediateInput(position, oneGroup, page);
if (hasNullIntermediateState == false) {
fn.addIntermediateInput(position, oneGroup, page);
continue;
}
if (hasIntermediateState(page, position) == false) {
continue;
}
try (Page singlePositionPage = filterPage(page, position)) {
fn.addIntermediateInput(0, oneGroup, singlePositionPage);
}
}
}
}
}
}

private static void addNonNullIntermediateInput(GroupingAggregatorFunction fn, IntVector groups, Page page) {
int[] positions = positionsWithIntermediateState(page);
if (positions.length == 0) {
return;
}
try (IntVector filteredGroups = groups.filter(false, positions); Page filteredPage = filterPage(page, positions)) {
fn.addIntermediateInput(0, filteredGroups, filteredPage);
}
}

private static int[] positionsWithIntermediateState(Page page) {
int[] positions = new int[page.getPositionCount()];
int count = 0;
for (int position = 0; position < page.getPositionCount(); position++) {
if (hasIntermediateState(page, position)) {
positions[count++] = position;
}
}
return Arrays.copyOf(positions, count);
}

private static boolean hasNullIntermediateState(Page page) {
return positionsWithIntermediateState(page).length != page.getPositionCount();
}

private static boolean hasIntermediateState(Page page, int position) {
for (int i = 0; i < page.getBlockCount(); i++) {
if (page.getBlock(i).isNull(position)) {
return false;
}
}
return true;
}

private static Page filterPage(Page page, int... positions) {
Block[] filteredBlocks = new Block[page.getBlockCount()];
boolean success = false;
try {
for (int i = 0; i < filteredBlocks.length; i++) {
filteredBlocks[i] = page.getBlock(i).filter(false, positions);
}
success = true;
return new Page(positions.length, filteredBlocks);
} finally {
if (success == false) {
Releasables.closeExpectNoException(filteredBlocks);
}
}
}

@Override
public int intermediateBlockCount() {
return next.intermediateBlockCount();
Expand Down
Loading
Loading