Skip to content

Commit 4dfee46

Browse files
authored
[8.x] Improve resiliency of UpdateTimeSeriesRangeService (elastic#126679)
* [8.x] Improve resiliency of UpdateTimeSeriesRangeService Backporting elastic#126637 to 8.x branch. If updating the `index.time_series.end_time` fails for one data stream, then UpdateTimeSeriesRangeService should continue updating this setting for other data streams. The following error was observed in the wild: ``` [2025-04-07T08:50:39,698][WARN ][o.e.d.UpdateTimeSeriesRangeService] [node-01] failed to update tsdb data stream end times java.lang.IllegalArgumentException: [index.time_series.end_time] requires [index.mode=time_series] at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:636) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:619) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.settings.Setting.get(Setting.java:563) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.settings.Setting.get(Setting.java:535) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService.updateTimeSeriesTemporalRange(UpdateTimeSeriesRangeService.java:111) ~[?:?] at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService$UpdateTimeSeriesExecutor.execute(UpdateTimeSeriesRangeService.java:210) ~[?:?] at org.elasticsearch.cluster.service.MasterService.innerExecuteTasks(MasterService.java:1075) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:1038) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService.executeAndPublishBatch(MasterService.java:245) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.lambda$run$2(MasterService.java:1691) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.run(MasterService.java:1688) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$5.lambda$doRun$0(MasterService.java:1283) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.cluster.service.MasterService$5.doRun(MasterService.java:1262) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023) ~[elasticsearch-8.17.3.jar:?] at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-8.17.3.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.lang.Thread.run(Thread.java:1575) ~[?:?] ``` Which resulted in a situation, that causes the `index.time_series.end_time` index setting not being updated for any data stream. This then caused data loss as metrics couldn't be indexed, because no suitable backing index could be resolved: ``` the document timestamp [2025-03-26T15:26:10.000Z] is outside of ranges of currently writable indices [[2025-01-31T07:22:43.000Z,2025-02-15T07:24:06.000Z][2025-02-15T07:24:06.000Z,2025-03-02T07:34:07.000Z][2025-03-02T07:34:07.000Z,2025-03-10T12:45:37.000Z][2025-03-10T12:45:37.000Z,2025-03-10T14:30:37.000Z][2025-03-10T14:30:37.000Z,2025-03-25T12:50:40.000Z][2025-03-25T12:50:40.000Z,2025-03-25T14:35:40.000Z ``` * fix test compile error
1 parent cf74d28 commit 4dfee46

File tree

4 files changed

+139
-19
lines changed

4 files changed

+139
-19
lines changed

docs/changelog/126637.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126637
2+
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
3+
area: TSDB
4+
type: bug
5+
issues: []

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {
107107

108108
// getWriteIndex() selects the latest added index:
109109
Index head = dataStream.getWriteIndex();
110-
IndexMetadata im = current.metadata().getIndexSafe(head);
111-
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
112-
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
113-
Instant newEnd = DataStream.getCanonicalTimestampBound(
114-
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
115-
);
116-
if (newEnd.isAfter(currentEnd)) {
117-
try {
110+
try {
111+
IndexMetadata im = current.metadata().getIndexSafe(head);
112+
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
113+
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
114+
Instant newEnd = DataStream.getCanonicalTimestampBound(
115+
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
116+
);
117+
if (newEnd.isAfter(currentEnd)) {
118118
Settings settings = Settings.builder()
119119
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
120120
.build();
@@ -131,17 +131,17 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {
131131
mBuilder.updateSettings(settings, head.getName());
132132
// Verify that all temporal ranges of each backing index is still valid:
133133
dataStream.validate(mBuilder::get);
134-
} catch (Exception e) {
135-
LOGGER.error(
136-
() -> format(
137-
"unable to update [%s] for data stream [%s] and backing index [%s]",
138-
IndexSettings.TIME_SERIES_END_TIME.getKey(),
139-
dataStream.getName(),
140-
head.getName()
141-
),
142-
e
143-
);
144134
}
135+
} catch (Exception e) {
136+
LOGGER.error(
137+
() -> format(
138+
"unable to update [%s] for data stream [%s] and backing index [%s]",
139+
IndexSettings.TIME_SERIES_END_TIME.getKey(),
140+
dataStream.getName(),
141+
head.getName()
142+
),
143+
e
144+
);
145145
}
146146
}
147147

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,18 @@
88
*/
99
package org.elasticsearch.datastreams;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.filter.RegexFilter;
16+
import org.apache.logging.log4j.message.Message;
1117
import org.elasticsearch.cluster.ClusterState;
1218
import org.elasticsearch.cluster.metadata.DataStream;
1319
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1420
import org.elasticsearch.cluster.metadata.Metadata;
1521
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.logging.Loggers;
1623
import org.elasticsearch.common.settings.ClusterSettings;
1724
import org.elasticsearch.common.settings.Settings;
1825
import org.elasticsearch.core.TimeValue;
@@ -22,15 +29,22 @@
2229
import org.elasticsearch.threadpool.TestThreadPool;
2330
import org.elasticsearch.threadpool.ThreadPool;
2431
import org.junit.After;
32+
import org.junit.AfterClass;
2533
import org.junit.Before;
34+
import org.junit.BeforeClass;
2635

2736
import java.time.Duration;
2837
import java.time.Instant;
2938
import java.time.temporal.ChronoUnit;
3039
import java.time.temporal.TemporalAmount;
40+
import java.util.ArrayList;
41+
import java.util.HashMap;
3142
import java.util.List;
43+
import java.util.Map;
3244
import java.util.Set;
3345

46+
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
47+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
3448
import static org.hamcrest.Matchers.containsString;
3549
import static org.hamcrest.Matchers.equalTo;
3650
import static org.hamcrest.Matchers.not;
@@ -41,6 +55,22 @@
4155

4256
public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
4357

58+
static MockAppender appender;
59+
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
60+
61+
@BeforeClass
62+
public static void classInit() throws IllegalAccessException {
63+
appender = new MockAppender("mock_appender");
64+
appender.start();
65+
Loggers.addAppender(testLogger1, appender);
66+
}
67+
68+
@AfterClass
69+
public static void classCleanup() {
70+
Loggers.removeAppender(testLogger1, appender);
71+
appender.stop();
72+
}
73+
4474
private ThreadPool threadPool;
4575
private UpdateTimeSeriesRangeService instance;
4676

@@ -191,6 +221,68 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
191221
assertThat(getEndTime(result, dataStreamName3, 0), equalTo(start));
192222
}
193223

224+
public void testUpdateTimeSeriesTemporalOneBadDataStream() {
225+
String dataStreamName1 = "logs-app1";
226+
String dataStreamName2 = "logs-app2-broken";
227+
String dataStreamName3 = "logs-app3";
228+
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
229+
230+
Instant start = now.minus(90, ChronoUnit.MINUTES);
231+
Instant end = start.plus(30, ChronoUnit.MINUTES);
232+
Metadata.Builder mbBuilder = new Metadata.Builder();
233+
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
234+
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
235+
}
236+
237+
Settings settings = Settings.builder().put("index.mode", "logsdb").build();
238+
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
239+
mbBuilder.put(im, true);
240+
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
241+
var ds2Indices = new ArrayList<>(ds2.getIndices());
242+
ds2Indices.add(im.getIndex());
243+
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
244+
copy.put(
245+
dataStreamName2,
246+
new DataStream(
247+
ds2.getName(),
248+
ds2Indices,
249+
2,
250+
ds2.getMetadata(),
251+
ds2.isHidden(),
252+
ds2.isReplicated(),
253+
ds2.isSystem(),
254+
ds2.isAllowCustomRouting(),
255+
ds2.getIndexMode(),
256+
ds2.getDataLifecycle(),
257+
ds2.getDataStreamOptions(),
258+
ds2.getFailureIndices(),
259+
ds2.rolloverOnWrite(),
260+
ds2.getAutoShardingEvent()
261+
)
262+
);
263+
mbBuilder.dataStreams(copy, Map.of());
264+
265+
now = now.minus(45, ChronoUnit.MINUTES);
266+
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(mbBuilder).build();
267+
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
268+
assertThat(result, not(sameInstance(before)));
269+
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
270+
assertThat(getEndTime(result, dataStreamName1, 0), equalTo(expectedEndTime));
271+
assertThat(getEndTime(result, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
272+
assertThat(getEndTime(result, dataStreamName3, 0), equalTo(expectedEndTime));
273+
274+
String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
275+
assertThat(
276+
message,
277+
equalTo(
278+
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
279+
+ "backing index ["
280+
+ im.getIndex().getName()
281+
+ "]"
282+
)
283+
);
284+
}
285+
194286
public void testUpdatePollInterval() {
195287
instance.scheduleTask();
196288
assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(5)));
@@ -220,4 +312,27 @@ static Instant getStartTime(ClusterState state, String dataStreamName, int index
220312
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
221313
}
222314

315+
static class MockAppender extends AbstractAppender {
316+
public LogEvent lastEvent;
317+
318+
MockAppender(final String name) throws IllegalAccessException {
319+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
320+
}
321+
322+
@Override
323+
public void append(LogEvent event) {
324+
lastEvent = event.toImmutable();
325+
}
326+
327+
Message lastMessage() {
328+
return lastEvent.getMessage();
329+
}
330+
331+
public LogEvent getLastEventAndReset() {
332+
LogEvent toReturn = lastEvent;
333+
lastEvent = null;
334+
return toReturn;
335+
}
336+
}
337+
223338
}

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ public static void getClusterStateWithDataStream(
599599
builder.put(dataStreamBuilder.build());
600600
}
601601

602-
private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
602+
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
603603
Settings.Builder b = Settings.builder()
604604
.put(settings)
605605
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())

0 commit comments

Comments
 (0)