Skip to content

Commit 81041b4

Browse files
[TEST] Assert DSL merge policy respects end date (#113038)
[TEST] Assert DSL merge policy respects end date Backing indexes with an end date in the future may still get writes, so DSL should not apply the merge policy (first configuring the settings on the index, then doing the force merge) until that time has passed. The implementation already does this, because `DataStreamLifecycleService.run()` calls `timeSeriesIndicesStillWithinTimeBounds` and adds the resulting indices to `indicesToExcludeForRemainingRun` before calling `maybeExecuteForceMerge`. This change simply adds a unit test to ensure that this behaviour does not regress. Closes #109030
1 parent 63e0897 commit 81041b4

File tree

1 file changed

+50
-1
lines changed

1 file changed

+50
-1
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
120120
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
121121
import static org.elasticsearch.test.ClusterServiceUtils.setState;
122+
import static org.hamcrest.Matchers.arrayContaining;
122123
import static org.hamcrest.Matchers.contains;
123124
import static org.hamcrest.Matchers.containsInAnyOrder;
124125
import static org.hamcrest.Matchers.containsString;
@@ -326,7 +327,55 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() {
326327
TransportRequest deleteIndexRequest = clientSeenRequests.get(1);
327328
assertThat(deleteIndexRequest, instanceOf(DeleteIndexRequest.class));
328329
// only the first generation index should be eligible for retention
329-
assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), is(new String[] { dataStream.getIndices().get(0).getName() }));
330+
assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), arrayContaining(dataStream.getIndices().getFirst().getName()));
331+
}
332+
333+
public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() {
334+
Instant currentTime = Instant.ofEpochMilli(now).truncatedTo(ChronoUnit.MILLIS);
335+
// These ranges are on the edge of each other temporal boundaries.
336+
Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
337+
Instant end1 = currentTime.minus(4, ChronoUnit.HOURS);
338+
Instant start2 = currentTime.minus(4, ChronoUnit.HOURS);
339+
Instant end2 = currentTime.plus(2, ChronoUnit.HOURS);
340+
Instant start3 = currentTime.plus(2, ChronoUnit.HOURS);
341+
Instant end3 = currentTime.plus(4, ChronoUnit.HOURS);
342+
343+
String dataStreamName = "logs_my-app_prod";
344+
var clusterState = DataStreamTestHelper.getClusterStateWithDataStream(
345+
dataStreamName,
346+
List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3))
347+
);
348+
Metadata.Builder builder = Metadata.builder(clusterState.metadata());
349+
DataStream dataStream = builder.dataStream(dataStreamName);
350+
// Overwrite the data stream in the cluster state to set the lifecycle policy, with no retention policy (i.e. infinite retention).
351+
builder.put(
352+
dataStream.copy()
353+
.setName(dataStreamName)
354+
.setGeneration(dataStream.getGeneration() + 1)
355+
.setLifecycle(DataStreamLifecycle.newBuilder().build())
356+
.build()
357+
);
358+
clusterState = ClusterState.builder(clusterState).metadata(builder).build();
359+
360+
dataStreamLifecycleService.run(clusterState);
361+
// There should be two client requests: one rollover, and one to update the merge policy settings. N.B. The merge policy settings
362+
// will always be updated before the force merge is done, see testMergePolicySettingsAreConfiguredBeforeForcemerge.
363+
assertThat(clientSeenRequests.size(), is(2));
364+
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
365+
TransportRequest updateSettingsRequest = clientSeenRequests.get(1);
366+
assertThat(updateSettingsRequest, instanceOf(UpdateSettingsRequest.class));
367+
// Only the first generation index should be eligible for merging. The other have end dates in the future.
368+
assertThat(
369+
((UpdateSettingsRequest) updateSettingsRequest).indices(),
370+
arrayContaining(dataStream.getIndices().getFirst().getName())
371+
);
372+
assertThat(
373+
((UpdateSettingsRequest) updateSettingsRequest).settings().keySet(),
374+
containsInAnyOrder(
375+
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(),
376+
MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()
377+
)
378+
);
330379
}
331380

332381
public void testRetentionSkippedWhilstDownsamplingInProgress() {

0 commit comments

Comments
 (0)