Skip to content

Commit 11dd4f7

Browse files
authored
Truncate endTime to seconds in TimeSeries range updates (#95500) (#95530)
1 parent 23478ac commit 11dd4f7

File tree

6 files changed

+51
-13
lines changed

6 files changed

+51
-13
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,28 @@ public void testTimeRanges() throws Exception {
194194
Instant newStartTime = IndexSettings.TIME_SERIES_START_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));
195195
Instant newEndTime = IndexSettings.TIME_SERIES_END_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));
196196

197-
// Check whether the document lands in the newest backing index:
197+
// Check whether documents land in the newest backing index, covering the [newStartTime, newEndtime) timestamp range:
198+
time = newStartTime;
199+
{
200+
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
201+
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
202+
var indexResponse = client().index(indexRequest).actionGet();
203+
assertThat(indexResponse.getIndex(), equalTo(newBackingIndexName));
204+
}
198205
time = Instant.ofEpochMilli(randomLongBetween(newStartTime.toEpochMilli(), newEndTime.toEpochMilli() - 1));
199206
{
200207
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
201208
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
202209
var indexResponse = client().index(indexRequest).actionGet();
203210
assertThat(indexResponse.getIndex(), equalTo(newBackingIndexName));
204211
}
212+
time = Instant.ofEpochMilli(newEndTime.toEpochMilli() - 1);
213+
{
214+
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
215+
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
216+
var indexResponse = client().index(indexRequest).actionGet();
217+
assertThat(indexResponse.getIndex(), equalTo(newBackingIndexName));
218+
}
205219

206220
// Double check indexing against previous backing index:
207221
time = newStartTime.minusMillis(1);

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.io.IOException;
3131
import java.io.UncheckedIOException;
3232
import java.time.Instant;
33-
import java.time.temporal.ChronoUnit;
3433
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.Locale;
@@ -87,8 +86,8 @@ public Settings getAdditionalIndexSettings(
8786
final Instant start;
8887
final Instant end;
8988
if (dataStream == null || migrating) {
90-
start = resolvedAt.minusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
91-
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
89+
start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis()));
90+
end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis()));
9291
} else {
9392
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
9493
if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) {
@@ -103,9 +102,9 @@ public Settings getAdditionalIndexSettings(
103102
}
104103
start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings());
105104
if (start.isAfter(resolvedAt)) {
106-
end = start.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
105+
end = DataStream.getCanonicalTimestampBound(start.plusMillis(lookAheadTime.getMillis()));
107106
} else {
108-
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
107+
end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis()));
109108
}
110109
}
111110
assert start.isBefore(end) : "data stream backing index's start time is not before end time";

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {
107107
IndexMetadata im = current.metadata().getIndexSafe(head);
108108
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
109109
TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(im.getSettings());
110-
Instant newEnd = now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS);
110+
Instant newEnd = DataStream.getCanonicalTimestampBound(
111+
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
112+
);
111113
if (newEnd.isAfter(currentEnd)) {
112114
try {
113115
Settings settings = Settings.builder()

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ public void testUpdateTimeSeriesTemporalRange() {
8888
assertThat(getEndTime(result, dataStreamName, 0), equalTo(previousEndTime1));
8989
assertThat(getStartTime(result, dataStreamName, 1), equalTo(previousStartTime2));
9090
assertThat(getEndTime(result, dataStreamName, 1), not(equalTo(previousEndTime2)));
91-
assertThat(getEndTime(result, dataStreamName, 1), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
91+
assertThat(
92+
getEndTime(result, dataStreamName, 1),
93+
equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS))
94+
);
9295
}
9396

9497
public void testUpdateTimeSeriesTemporalRange_customLookAHeadTime() {
@@ -122,7 +125,10 @@ public void testUpdateTimeSeriesTemporalRange_customLookAHeadTime() {
122125
assertThat(getStartTime(result, dataStreamName, 0), equalTo(previousStartTime1));
123126
assertThat(getEndTime(result, dataStreamName, 0), equalTo(previousEndTime1));
124127
assertThat(getStartTime(result, dataStreamName, 1), equalTo(previousStartTime2));
125-
assertThat(getEndTime(result, dataStreamName, 1), equalTo(now.plus(lookAHeadTime).plus(timeSeriesPollInterval)));
128+
assertThat(
129+
getEndTime(result, dataStreamName, 1),
130+
equalTo(now.plus(lookAHeadTime).plus(timeSeriesPollInterval).truncatedTo(ChronoUnit.SECONDS))
131+
);
126132
}
127133

128134
public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
@@ -187,8 +193,14 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
187193
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(mbBuilder).build();
188194
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
189195
assertThat(result, not(sameInstance(before)));
190-
assertThat(getEndTime(result, dataStreamName1, 0), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
191-
assertThat(getEndTime(result, dataStreamName2, 0), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
196+
assertThat(
197+
getEndTime(result, dataStreamName1, 0),
198+
equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS))
199+
);
200+
assertThat(
201+
getEndTime(result, dataStreamName2, 0),
202+
equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS))
203+
);
192204
assertThat(getEndTime(result, dataStreamName3, 0), equalTo(start));
193205
}
194206

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.io.IOException;
3434
import java.time.Instant;
35+
import java.time.temporal.ChronoUnit;
3536
import java.util.ArrayList;
3637
import java.util.Collection;
3738
import java.util.Comparator;
@@ -710,4 +711,15 @@ public int hashCode() {
710711
return Objects.hash(name);
711712
}
712713
}
714+
715+
/**
716+
* Modifies the passed Instant object to be used as a bound for a timestamp field in TimeSeries. It needs to be called in both backing
717+
* index construction (rollover) and index selection for doc insertion. Failure to do so may lead to errors due to document timestamps
718+
* exceeding the end time of the selected backing index for insertion.
719+
* @param time The initial Instant object that's used to generate the canonical time
720+
* @return A canonical Instant object to be used as a timestamp bound
721+
*/
722+
public static Instant getCanonicalTimestampBound(Instant time) {
723+
return time.truncatedTo(ChronoUnit.SECONDS);
724+
}
713725
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.xcontent.XContentType;
2626

2727
import java.time.Instant;
28-
import java.time.temporal.ChronoUnit;
2928
import java.util.ArrayList;
3029
import java.util.List;
3130
import java.util.Locale;
@@ -369,7 +368,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) {
369368
} else {
370369
timestamp = getTimestampFromParser(request.source(), request.getContentType());
371370
}
372-
timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS);
371+
timestamp = org.elasticsearch.cluster.metadata.DataStream.getCanonicalTimestampBound(timestamp);
373372
Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata);
374373
if (result == null) {
375374
String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);

0 commit comments

Comments
 (0)