Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126637.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126637
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
area: TSDB
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr

// getWriteIndex() selects the latest added index:
Index head = dataStream.getWriteIndex();
IndexMetadata im = project.getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
try {
try {
IndexMetadata im = project.getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
Settings settings = Settings.builder()
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
.build();
Expand All @@ -151,17 +151,17 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
mBuilder.updateSettings(settings, head.getName());
// Verify that all temporal ranges of each backing index is still valid:
dataStream.validate(mBuilder::get);
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
}
return mBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
*/
package org.elasticsearch.datastreams;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -23,15 +30,22 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
Expand All @@ -42,6 +56,22 @@

public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {

static MockAppender appender;
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);

@BeforeClass
public static void classInit() throws IllegalAccessException {
appender = new MockAppender("mock_appender");
appender.start();
Loggers.addAppender(testLogger1, appender);
}

@AfterClass
public static void classCleanup() {
Loggers.removeAppender(testLogger1, appender);
appender.stop();
}

private ThreadPool threadPool;
private UpdateTimeSeriesRangeService instance;

Expand Down Expand Up @@ -199,6 +229,68 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start));
}

public void testUpdateTimeSeriesTemporalOneBadDataStream() {
String dataStreamName1 = "logs-app1";
String dataStreamName2 = "logs-app2-broken";
String dataStreamName3 = "logs-app3";
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);

Instant start = now.minus(90, ChronoUnit.MINUTES);
Instant end = start.plus(30, ChronoUnit.MINUTES);
final var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId);
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
}

Settings settings = Settings.builder().put("index.mode", "logsdb").build();
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
mbBuilder.put(im, true);
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
var ds2Indices = new ArrayList<>(ds2.getIndices());
ds2Indices.add(im.getIndex());
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
copy.put(
dataStreamName2,
new DataStream(
ds2.getName(),
ds2Indices,
2,
ds2.getMetadata(),
ds2.isHidden(),
ds2.isReplicated(),
ds2.isSystem(),
ds2.isAllowCustomRouting(),
ds2.getIndexMode(),
ds2.getDataLifecycle(),
ds2.getDataStreamOptions(),
ds2.getFailureIndices(),
ds2.rolloverOnWrite(),
ds2.getAutoShardingEvent()
)
);
mbBuilder.dataStreams(copy, Map.of());

now = now.minus(45, ChronoUnit.MINUTES);
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build();
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
assertThat(result, not(sameInstance(before)));
final var project = result.getMetadata().getProject(projectId);
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime));
assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime));

String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
assertThat(
message,
equalTo(
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
+ "backing index [.ds-logs-app2-broken-2025.04.10-000002]"
)
);
}

public void testUpdateTimeSeriesTemporalRange_multipleProjects() {
String dataStreamName = "logs-app1";
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Expand Down Expand Up @@ -253,4 +345,27 @@ static Instant getStartTime(ProjectMetadata project, String dataStreamName, int
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
}

static class MockAppender extends AbstractAppender {
public LogEvent lastEvent;

MockAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
}

@Override
public void append(LogEvent event) {
lastEvent = event.toImmutable();
}

Message lastMessage() {
return lastEvent.getMessage();
}

public LogEvent getLastEventAndReset() {
LogEvent toReturn = lastEvent;
lastEvent = null;
return toReturn;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public static void getClusterStateWithDataStream(
builder.put(dataStreamBuilder.build());
}

private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
Settings.Builder b = Settings.builder()
.put(settings)
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
Expand Down
Loading