diff --git a/docs/changelog/126637.yaml b/docs/changelog/126637.yaml new file mode 100644 index 0000000000000..6b51566457bfc --- /dev/null +++ b/docs/changelog/126637.yaml @@ -0,0 +1,5 @@ +pr: 126637 +summary: Improve resiliency of `UpdateTimeSeriesRangeService` +area: TSDB +type: bug +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java index 44a64254f8da9..34262b334de14 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java @@ -107,14 +107,14 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) { // getWriteIndex() selects the latest added index: Index head = dataStream.getWriteIndex(); - IndexMetadata im = current.metadata().getIndexSafe(head); - Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); - TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings()); - Instant newEnd = DataStream.getCanonicalTimestampBound( - now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS) - ); - if (newEnd.isAfter(currentEnd)) { - try { + try { + IndexMetadata im = current.metadata().getIndexSafe(head); + Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings()); + Instant newEnd = DataStream.getCanonicalTimestampBound( + now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS) + ); + if (newEnd.isAfter(currentEnd)) { Settings settings = Settings.builder() .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd)) .build(); @@ -131,17 +131,17 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) { mBuilder.updateSettings(settings, head.getName()); // Verify that all temporal ranges of each backing index is still valid: dataStream.validate(mBuilder::get); - } catch (Exception e) { - LOGGER.error( - () -> format( - "unable to update [%s] for data stream [%s] and backing index [%s]", - IndexSettings.TIME_SERIES_END_TIME.getKey(), - dataStream.getName(), - head.getName() - ), - e - ); } + } catch (Exception e) { + LOGGER.error( + () -> format( + "unable to update [%s] for data stream [%s] and backing index [%s]", + IndexSettings.TIME_SERIES_END_TIME.getKey(), + dataStream.getName(), + head.getName() + ), + e + ); } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java index 8378526e6bdae..9b069b31a50ce 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java @@ -8,11 +8,18 @@ */ package org.elasticsearch.datastreams; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.filter.RegexFilter; +import org.apache.logging.log4j.message.Message; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -22,15 +29,22 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -41,6 +55,22 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase { + static MockAppender appender; + static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class); + + @BeforeClass + public static void classInit() throws IllegalAccessException { + appender = new MockAppender("mock_appender"); + appender.start(); + Loggers.addAppender(testLogger1, appender); + } + + @AfterClass + public static void classCleanup() { + Loggers.removeAppender(testLogger1, appender); + appender.stop(); + } + private ThreadPool threadPool; private UpdateTimeSeriesRangeService instance; @@ -191,6 +221,68 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() { assertThat(getEndTime(result, dataStreamName3, 0), equalTo(start)); } + public void testUpdateTimeSeriesTemporalOneBadDataStream() { + String dataStreamName1 = "logs-app1"; + String dataStreamName2 = "logs-app2-broken"; + String dataStreamName3 = "logs-app3"; + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + + Instant start = now.minus(90, ChronoUnit.MINUTES); + Instant end = start.plus(30, ChronoUnit.MINUTES); + Metadata.Builder mbBuilder = new Metadata.Builder(); + for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) { + DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end))); + } + + Settings settings = Settings.builder().put("index.mode", "logsdb").build(); + var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0); + mbBuilder.put(im, true); + var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2); + var ds2Indices = new ArrayList<>(ds2.getIndices()); + ds2Indices.add(im.getIndex()); + var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams()); + copy.put( + dataStreamName2, + new DataStream( + ds2.getName(), + ds2Indices, + 2, + ds2.getMetadata(), + ds2.isHidden(), + ds2.isReplicated(), + ds2.isSystem(), + ds2.isAllowCustomRouting(), + ds2.getIndexMode(), + ds2.getLifecycle(), + ds2.getDataStreamOptions(), + ds2.getFailureIndices(), + ds2.rolloverOnWrite(), + ds2.getAutoShardingEvent() + ) + ); + mbBuilder.dataStreams(copy, Map.of()); + + now = now.minus(45, ChronoUnit.MINUTES); + ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(mbBuilder).build(); + ClusterState result = instance.updateTimeSeriesTemporalRange(before, now); + assertThat(result, not(sameInstance(before))); + final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS); + assertThat(getEndTime(result, dataStreamName1, 0), equalTo(expectedEndTime)); + assertThat(getEndTime(result, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream + assertThat(getEndTime(result, dataStreamName3, 0), equalTo(expectedEndTime)); + + String message = appender.getLastEventAndReset().getMessage().getFormattedMessage(); + assertThat( + message, + equalTo( + "unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and " + + "backing index [" + + im.getIndex().getName() + + "]" + ) + ); + } + public void testUpdatePollInterval() { instance.scheduleTask(); assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(5))); @@ -220,4 +312,27 @@ static Instant getStartTime(ClusterState state, String dataStreamName, int index return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings); } + static class MockAppender extends AbstractAppender { + public LogEvent lastEvent; + + MockAppender(final String name) throws IllegalAccessException { + super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false); + } + + @Override + public void append(LogEvent event) { + lastEvent = event.toImmutable(); + } + + Message lastMessage() { + return lastEvent.getMessage(); + } + + public LogEvent getLastEventAndReset() { + LogEvent toReturn = lastEvent; + lastEvent = null; + return toReturn; + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 845536792343d..fee1bec38682d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -588,7 +588,7 @@ public static void getClusterStateWithDataStream( builder.put(dataStreamBuilder.build()); } - private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) { + public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) { Settings.Builder b = Settings.builder() .put(settings) .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())