Skip to content

Commit 62b21a5

Browse files
committed
Improve resiliency of UpdateTimeSeriesRangeService
If updating the `index.time_series.end_time` fails for one data stream, then UpdateTimeSeriesRangeService should continue updating this setting for other data streams. The following error was observed in the wild: ``` [2025-04-07T08:50:39,698][WARN ][o.e.d.UpdateTimeSeriesRangeService] [node-01] failed to update tsdb data stream end times java.lang.IllegalArgumentException: [index.time_series.end_time] requires [index.mode=time_series] at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:636) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:619) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.settings.Setting.get(Setting.java:563) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.settings.Setting.get(Setting.java:535) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService.updateTimeSeriesTemporalRange(UpdateTimeSeriesRangeService.java:111) ~[?:?] at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService$UpdateTimeSeriesExecutor.execute(UpdateTimeSeriesRangeService.java:210) ~[?:?] at org.elasticsearch.cluster.service.MasterService.innerExecuteTasks(MasterService.java:1075) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:1038) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService.executeAndPublishBatch(MasterService.java:245) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.lambda$run$2(MasterService.java:1691) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.run(MasterService.java:1688) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$5.lambda$doRun$0(MasterService.java:1283) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$5.doRun(MasterService.java:1262) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-8.17.3.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.lang.Thread.run(Thread.java:1575) ~[?:?] ``` Which resulted in a situation, that causes the `index.time_series.end_time` index setting not being updated for any data stream. This then caused data loss as metrics couldn't be indexed, because no suitable backing index could be resolved: ``` the document timestamp [2025-03-26T15:26:10.000Z] is outside of ranges of currently writable indices [[2025-01-31T07:22:43.000Z,2025-02-15T07:24:06.000Z][2025-02-15T07:24:06.000Z,2025-03-02T07:34:07.000Z][2025-03-02T07:34:07.000Z,2025-03-10T12:45:37.000Z][2025-03-10T12:45:37.000Z,2025-03-10T14:30:37.000Z][2025-03-10T14:30:37.000Z,2025-03-25T12:50:40.000Z][2025-03-25T12:50:40.000Z,2025-03-25T14:35:40.000Z ```
1 parent 75f54a4 commit 62b21a5

File tree

3 files changed

+135
-19
lines changed

3 files changed

+135
-19
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,14 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
127127

128128
// getWriteIndex() selects the latest added index:
129129
Index head = dataStream.getWriteIndex();
130-
IndexMetadata im = project.getIndexSafe(head);
131-
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
132-
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
133-
Instant newEnd = DataStream.getCanonicalTimestampBound(
134-
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
135-
);
136-
if (newEnd.isAfter(currentEnd)) {
137-
try {
130+
try {
131+
IndexMetadata im = project.getIndexSafe(head);
132+
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
133+
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
134+
Instant newEnd = DataStream.getCanonicalTimestampBound(
135+
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
136+
);
137+
if (newEnd.isAfter(currentEnd)) {
138138
Settings settings = Settings.builder()
139139
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
140140
.build();
@@ -151,17 +151,17 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
151151
mBuilder.updateSettings(settings, head.getName());
152152
// Verify that all temporal ranges of each backing index is still valid:
153153
dataStream.validate(mBuilder::get);
154-
} catch (Exception e) {
155-
LOGGER.error(
156-
() -> format(
157-
"unable to update [%s] for data stream [%s] and backing index [%s]",
158-
IndexSettings.TIME_SERIES_END_TIME.getKey(),
159-
dataStream.getName(),
160-
head.getName()
161-
),
162-
e
163-
);
164154
}
155+
} catch (Exception e) {
156+
LOGGER.error(
157+
() -> format(
158+
"unable to update [%s] for data stream [%s] and backing index [%s]",
159+
IndexSettings.TIME_SERIES_END_TIME.getKey(),
160+
dataStream.getName(),
161+
head.getName()
162+
),
163+
e
164+
);
165165
}
166166
}
167167
return mBuilder;

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
*/
99
package org.elasticsearch.datastreams;
1010

11+
import org.apache.logging.log4j.Level;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.apache.logging.log4j.core.LogEvent;
15+
import org.apache.logging.log4j.core.appender.AbstractAppender;
16+
import org.apache.logging.log4j.core.filter.RegexFilter;
17+
import org.apache.logging.log4j.message.Message;
1118
import org.elasticsearch.cluster.ClusterState;
1219
import org.elasticsearch.cluster.metadata.DataStream;
1320
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1421
import org.elasticsearch.cluster.metadata.ProjectId;
1522
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1623
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.logging.Loggers;
1725
import org.elasticsearch.common.settings.ClusterSettings;
1826
import org.elasticsearch.common.settings.Settings;
1927
import org.elasticsearch.core.TimeValue;
@@ -23,15 +31,22 @@
2331
import org.elasticsearch.threadpool.TestThreadPool;
2432
import org.elasticsearch.threadpool.ThreadPool;
2533
import org.junit.After;
34+
import org.junit.AfterClass;
2635
import org.junit.Before;
36+
import org.junit.BeforeClass;
2737

2838
import java.time.Duration;
2939
import java.time.Instant;
3040
import java.time.temporal.ChronoUnit;
3141
import java.time.temporal.TemporalAmount;
42+
import java.util.ArrayList;
43+
import java.util.HashMap;
3244
import java.util.List;
45+
import java.util.Map;
3346
import java.util.Set;
3447

48+
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
49+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
3550
import static org.hamcrest.Matchers.containsString;
3651
import static org.hamcrest.Matchers.equalTo;
3752
import static org.hamcrest.Matchers.not;
@@ -42,6 +57,22 @@
4257

4358
public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
4459

60+
static MockAppender appender;
61+
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
62+
63+
@BeforeClass
64+
public static void classInit() throws IllegalAccessException {
65+
appender = new MockAppender("mock_appender");
66+
appender.start();
67+
Loggers.addAppender(testLogger1, appender);
68+
}
69+
70+
@AfterClass
71+
public static void classCleanup() {
72+
Loggers.removeAppender(testLogger1, appender);
73+
appender.stop();
74+
}
75+
4576
private ThreadPool threadPool;
4677
private UpdateTimeSeriesRangeService instance;
4778

@@ -199,6 +230,68 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
199230
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start));
200231
}
201232

233+
public void testUpdateTimeSeriesTemporalOneBadDataStream() {
234+
String dataStreamName1 = "logs-app1";
235+
String dataStreamName2 = "logs-app2-broken";
236+
String dataStreamName3 = "logs-app3";
237+
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
238+
239+
Instant start = now.minus(90, ChronoUnit.MINUTES);
240+
Instant end = start.plus(30, ChronoUnit.MINUTES);
241+
final var projectId = randomProjectIdOrDefault();
242+
ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId);
243+
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
244+
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
245+
}
246+
247+
Settings settings = Settings.builder().put("index.mode", "logsdb").build();
248+
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
249+
mbBuilder.put(im, true);
250+
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
251+
var ds2Indices = new ArrayList<>(ds2.getIndices());
252+
ds2Indices.add(im.getIndex());
253+
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
254+
copy.put(
255+
dataStreamName2,
256+
new DataStream(
257+
ds2.getName(),
258+
ds2Indices,
259+
2,
260+
ds2.getMetadata(),
261+
ds2.isHidden(),
262+
ds2.isReplicated(),
263+
ds2.isSystem(),
264+
ds2.isAllowCustomRouting(),
265+
ds2.getIndexMode(),
266+
ds2.getDataLifecycle(),
267+
ds2.getDataStreamOptions(),
268+
ds2.getFailureIndices(),
269+
ds2.rolloverOnWrite(),
270+
ds2.getAutoShardingEvent()
271+
)
272+
);
273+
mbBuilder.dataStreams(copy, Map.of());
274+
275+
now = now.minus(45, ChronoUnit.MINUTES);
276+
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build();
277+
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
278+
assertThat(result, not(sameInstance(before)));
279+
final var project = result.getMetadata().getProject(projectId);
280+
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
281+
assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime));
282+
assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
283+
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime));
284+
285+
String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
286+
assertThat(
287+
message,
288+
equalTo(
289+
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
290+
+ "backing index [.ds-logs-app2-broken-2025.04.10-000002]"
291+
)
292+
);
293+
}
294+
202295
public void testUpdateTimeSeriesTemporalRange_multipleProjects() {
203296
String dataStreamName = "logs-app1";
204297
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
@@ -253,4 +346,27 @@ static Instant getStartTime(ProjectMetadata project, String dataStreamName, int
253346
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
254347
}
255348

349+
static class MockAppender extends AbstractAppender {
350+
public LogEvent lastEvent;
351+
352+
MockAppender(final String name) throws IllegalAccessException {
353+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
354+
}
355+
356+
@Override
357+
public void append(LogEvent event) {
358+
lastEvent = event.toImmutable();
359+
}
360+
361+
Message lastMessage() {
362+
return lastEvent.getMessage();
363+
}
364+
365+
public LogEvent getLastEventAndReset() {
366+
LogEvent toReturn = lastEvent;
367+
lastEvent = null;
368+
return toReturn;
369+
}
370+
}
371+
256372
}

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ public static void getClusterStateWithDataStream(
617617
builder.put(dataStreamBuilder.build());
618618
}
619619

620-
private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
620+
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
621621
Settings.Builder b = Settings.builder()
622622
.put(settings)
623623
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())

0 commit comments

Comments
 (0)