diff --git a/docs/changelog/126637.yaml b/docs/changelog/126637.yaml new file mode 100644 index 0000000000000..6b51566457bfc --- /dev/null +++ b/docs/changelog/126637.yaml @@ -0,0 +1,5 @@ +pr: 126637 +summary: Improve resiliency of `UpdateTimeSeriesRangeService` +area: TSDB +type: bug +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java index 93793124c7fcf..f967fc51890f3 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java @@ -127,14 +127,14 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr // getWriteIndex() selects the latest added index: Index head = dataStream.getWriteIndex(); - IndexMetadata im = project.getIndexSafe(head); - Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); - TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings()); - Instant newEnd = DataStream.getCanonicalTimestampBound( - now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS) - ); - if (newEnd.isAfter(currentEnd)) { - try { + try { + IndexMetadata im = project.getIndexSafe(head); + Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings()); + Instant newEnd = DataStream.getCanonicalTimestampBound( + now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS) + ); + if (newEnd.isAfter(currentEnd)) { Settings settings = Settings.builder() .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd)) .build(); @@ -151,17 +151,17 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr mBuilder.updateSettings(settings, head.getName()); // Verify that all temporal ranges of each backing index is still valid: dataStream.validate(mBuilder::get); - } catch (Exception e) { - LOGGER.error( - () -> format( - "unable to update [%s] for data stream [%s] and backing index [%s]", - IndexSettings.TIME_SERIES_END_TIME.getKey(), - dataStream.getName(), - head.getName() - ), - e - ); } + } catch (Exception e) { + LOGGER.error( + () -> format( + "unable to update [%s] for data stream [%s] and backing index [%s]", + IndexSettings.TIME_SERIES_END_TIME.getKey(), + dataStream.getName(), + head.getName() + ), + e + ); } } return mBuilder; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java index f7328fd3f99ef..03546c126818e 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java @@ -8,12 +8,19 @@ */ package org.elasticsearch.datastreams; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.filter.RegexFilter; +import org.apache.logging.log4j.message.Message; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -23,15 +30,22 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -42,6 +56,22 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase { + static MockAppender appender; + static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class); + + @BeforeClass + public static void classInit() throws IllegalAccessException { + appender = new MockAppender("mock_appender"); + appender.start(); + Loggers.addAppender(testLogger1, appender); + } + + @AfterClass + public static void classCleanup() { + Loggers.removeAppender(testLogger1, appender); + appender.stop(); + } + private ThreadPool threadPool; private UpdateTimeSeriesRangeService instance; @@ -199,6 +229,70 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() { assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start)); } + public void testUpdateTimeSeriesTemporalOneBadDataStream() { + String dataStreamName1 = "logs-app1"; + String dataStreamName2 = "logs-app2-broken"; + String dataStreamName3 = "logs-app3"; + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + + Instant start = now.minus(90, ChronoUnit.MINUTES); + Instant end = start.plus(30, ChronoUnit.MINUTES); + final var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId); + for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) { + DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end))); + } + + Settings settings = Settings.builder().put("index.mode", "logsdb").build(); + var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0); + mbBuilder.put(im, true); + var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2); + var ds2Indices = new ArrayList<>(ds2.getIndices()); + ds2Indices.add(im.getIndex()); + var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams()); + copy.put( + dataStreamName2, + new DataStream( + ds2.getName(), + ds2Indices, + 2, + ds2.getMetadata(), + ds2.isHidden(), + ds2.isReplicated(), + ds2.isSystem(), + ds2.isAllowCustomRouting(), + ds2.getIndexMode(), + ds2.getDataLifecycle(), + ds2.getDataStreamOptions(), + ds2.getFailureIndices(), + ds2.rolloverOnWrite(), + ds2.getAutoShardingEvent() + ) + ); + mbBuilder.dataStreams(copy, Map.of()); + + now = now.minus(45, ChronoUnit.MINUTES); + ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build(); + ClusterState result = instance.updateTimeSeriesTemporalRange(before, now); + assertThat(result, not(sameInstance(before))); + final var project = result.getMetadata().getProject(projectId); + final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS); + assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime)); + assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream + assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime)); + + String message = appender.getLastEventAndReset().getMessage().getFormattedMessage(); + assertThat( + message, + equalTo( + "unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and " + + "backing index [" + + im.getIndex().getName() + + "]" + ) + ); + } + public void testUpdateTimeSeriesTemporalRange_multipleProjects() { String dataStreamName = "logs-app1"; Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); @@ -253,4 +347,27 @@ static Instant getStartTime(ProjectMetadata project, String dataStreamName, int return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings); } + static class MockAppender extends AbstractAppender { + public LogEvent lastEvent; + + MockAppender(final String name) throws IllegalAccessException { + super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false); + } + + @Override + public void append(LogEvent event) { + lastEvent = event.toImmutable(); + } + + Message lastMessage() { + return lastEvent.getMessage(); + } + + public LogEvent getLastEventAndReset() { + LogEvent toReturn = lastEvent; + lastEvent = null; + return toReturn; + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 233826b168379..9c52b273d057a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -617,7 +617,7 @@ public static void getClusterStateWithDataStream( builder.put(dataStreamBuilder.build()); } - private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) { + public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) { Settings.Builder b = Settings.builder() .put(settings) .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())