Skip to content

Commit 8b12924

Browse files
committed
Fixing unit tests
1 parent 10a7947 commit 8b12924

File tree

2 files changed

+72
-21
lines changed

2 files changed

+72
-21
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,13 @@ private void run(ProjectState projectState) {
360360
final var project = projectState.metadata();
361361
int affectedIndices = 0;
362362
int affectedDataStreams = 0;
363+
List<DataStreamLifecycleAction> actions = List.of(
364+
this::maybeExecuteRollover,
365+
this::timeSeriesIndicesStillWithinTimeBounds,
366+
this::maybeExecuteRetention,
367+
this::maybeExecuteForceMerge,
368+
this::maybeExecuteDownsampling
369+
);
363370
for (DataStream dataStream : project.dataStreams().values()) {
364371
clearErrorStoreForUnmanagedIndices(project, dataStream);
365372
var dataLifecycleEnabled = dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled();
@@ -368,14 +375,6 @@ private void run(ProjectState projectState) {
368375
if (dataLifecycleEnabled == false && failuresLifecycleEnabled == false) {
369376
continue;
370377
}
371-
List<DataStreamLifecycleAction> actions = List.of(
372-
this::maybeExecuteRollover,
373-
DataStreamLifecycleService::timeSeriesIndicesStillWithinTimeBounds,
374-
this::maybeExecuteRetention,
375-
this::maybeExecuteForceMerge,
376-
this::maybeExecuteDownsampling
377-
);
378-
379378
// the following indices should not be considered for the remainder of this service run, for various reasons.
380379
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
381380
for (DataStreamLifecycleAction action : actions) {
@@ -394,11 +393,20 @@ private void run(ProjectState projectState) {
394393
);
395394
}
396395

396+
private Set<Index> timeSeriesIndicesStillWithinTimeBounds(
397+
ProjectState projectState,
398+
DataStream dataStream,
399+
Set<Index> indicesToExcludeForRemainingRun
400+
) {
401+
return timeSeriesIndicesStillWithinTimeBounds(projectState, dataStream, indicesToExcludeForRemainingRun, nowSupplier);
402+
}
403+
397404
// visible for testing
398405
static Set<Index> timeSeriesIndicesStillWithinTimeBounds(
399406
ProjectState projectState,
400407
DataStream dataStream,
401-
Set<Index> indicesToExcludeForRemainingRun
408+
Set<Index> indicesToExcludeForRemainingRun,
409+
LongSupplier nowSupplier
402410
) {
403411
// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
404412
// deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -412,6 +420,15 @@ static Set<Index> timeSeriesIndicesStillWithinTimeBounds(
412420
Instant configuredEndTime = IndexSettings.TIME_SERIES_END_TIME.get(backingIndex.getSettings());
413421
assert configuredEndTime != null
414422
: "a time series index must have an end time configured but [" + index.getName() + "] does not";
423+
if (nowSupplier.getAsLong() <= configuredEndTime.toEpochMilli()) {
424+
logger.trace(
425+
"Data stream lifecycle will not perform any operations in this run on time series index [{}] because "
426+
+ "its configured [{}] end time has not lapsed",
427+
index.getName(),
428+
configuredEndTime
429+
);
430+
tsIndicesWithinBounds.add(index);
431+
}
415432
}
416433
}
417434
return tsIndicesWithinBounds;

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,8 @@ public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() {
357357
clusterState = ClusterState.builder(clusterState).putProjectMetadata(builder).build();
358358

359359
dataStreamLifecycleService.run(clusterState);
360-
// There should be two client requests: one rollover, and one to update the merge policy settings. N.B. The merge policy settings
360+
// There should be two client requests: one rollover, and one to update the merge policy settings. One of the non-write indices is
361+
// still within the time bounds to be written to, so it is not force merged. N.B. The merge policy settings
361362
// will always be updated before the force merge is done, see testMergePolicySettingsAreConfiguredBeforeForcemerge.
362363
assertThat(clientSeenRequests.size(), is(2));
363364
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
@@ -961,7 +962,13 @@ public void onFailure(Exception e) {
961962
final var projectId = randomProjectIdOrDefault();
962963
String targetIndex = randomAlphaOfLength(20);
963964
DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask task =
964-
new DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask(listener, projectId, targetIndex, threadPool);
965+
new DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask(
966+
listener,
967+
projectId,
968+
targetIndex,
969+
FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY,
970+
Long.toString(threadPool.absoluteTimeInMillis())
971+
);
965972
{
966973
Exception exception = new RuntimeException("task failed");
967974
task.onFailure(exception);
@@ -1441,14 +1448,19 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() {
14411448
);
14421449
final ProjectState project = clusterState.projectState(projectId);
14431450
DataStream dataStream = project.metadata().dataStreams().get(dataStreamName);
1451+
dataStream = dataStream.copy()
1452+
.setName(dataStreamName)
1453+
.setGeneration(dataStream.getGeneration() + 1)
1454+
.setLifecycle(DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build())
1455+
.build();
14441456
{
1445-
// test for an index for which `now` is outside its time bounds
1446-
Index firstGenIndex = dataStream.getIndices().get(0);
1457+
// test for an index for which `now` is outside its time bounds by excluding the two that are not
14471458
Set<Index> indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds(
14481459
// the end_time for the first generation has lapsed
14491460
project,
14501461
dataStream,
1451-
Set.of()
1462+
Set.of(dataStream.getIndices().get(1), dataStream.getIndices().get(2)),
1463+
currentTime::toEpochMilli
14521464
);
14531465
assertThat(indices.size(), is(0));
14541466
}
@@ -1458,21 +1470,43 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() {
14581470
// the end_time for the first generation has lapsed, but the other 2 generations are still within bounds
14591471
project,
14601472
dataStream,
1461-
Set.of()
1473+
Set.of(),
1474+
currentTime::toEpochMilli
14621475
);
14631476
assertThat(indices.size(), is(2));
14641477
assertThat(indices, containsInAnyOrder(dataStream.getIndices().get(1), dataStream.getIndices().get(2)));
14651478
}
14661479

14671480
{
1481+
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(project.metadata());
14681482
// non time_series indices are not within time bounds (they don't have any)
1469-
IndexMetadata indexMeta = IndexMetadata.builder(randomAlphaOfLengthBetween(10, 30))
1470-
.settings(indexSettings(1, 1).put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersion.current()))
1471-
.build();
1472-
1473-
ProjectState newProject = project.updateProject(ProjectMetadata.builder(project.metadata()).put(indexMeta, true).build());
1483+
for (Index index : dataStream.getIndices()) {
1484+
IndexMetadata indexMeta = IndexMetadata.builder(index.getName())
1485+
.settings(
1486+
indexSettings(1, 1).put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersion.current())
1487+
.put(IndexSettings.MODE.getKey(), IndexMode.STANDARD)
1488+
)
1489+
.build();
1490+
projectMetadataBuilder.put(indexMeta, true);
1491+
}
14741492

1475-
Set<Index> indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds(newProject, dataStream, Set.of());
1493+
List<Index> standardIndices = dataStream.getIndices()
1494+
.stream()
1495+
.map(index -> new Index(index.getName(), index.getUUID()))
1496+
.toList();
1497+
ProjectState newProject = project.updateProject(projectMetadataBuilder.build());
1498+
dataStream = dataStream.copy()
1499+
.setName(dataStreamName)
1500+
.setGeneration(dataStream.getGeneration() + 1)
1501+
.setBackingIndices(DataStream.DataStreamIndices.backingIndicesBuilder(standardIndices).build())
1502+
.setIndexMode(IndexMode.STANDARD)
1503+
.build();
1504+
Set<Index> indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds(
1505+
newProject,
1506+
dataStream,
1507+
Set.of(),
1508+
currentTime::toEpochMilli
1509+
);
14761510
assertThat(indices.size(), is(0));
14771511
}
14781512
}

0 commit comments

Comments
 (0)