Skip to content

Commit bf80af5

Browse files
committed
Use the sampling method in the data stream lifecycle
1 parent 9107d76 commit bf80af5

File tree

3 files changed

+168
-34
lines changed

3 files changed

+168
-34
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,10 @@ Set<Index> maybeExecuteDownsampling(ProjectState projectState, DataStream dataSt
524524
// - has matching downsample rounds
525525
// - is read-only
526526
// So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round
527-
affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, project));
527+
var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod();
528+
affectedIndices.addAll(
529+
waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, downsamplingMethod, project)
530+
);
528531
}
529532
}
530533

@@ -541,6 +544,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
541544
DataStream dataStream,
542545
IndexMetadata backingIndex,
543546
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds,
547+
DownsampleConfig.SamplingMethod downsamplingMethod,
544548
ProjectMetadata project
545549
) {
546550
assert dataStream.getIndices().contains(backingIndex.getIndex())
@@ -568,7 +572,8 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
568572
INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()),
569573
round,
570574
lastRound,
571-
index,
575+
downsamplingMethod,
576+
backingIndex,
572577
targetDownsampleIndexMeta.getIndex()
573578
);
574579
if (downsamplingNotComplete.isEmpty() == false) {
@@ -580,7 +585,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
580585
// no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time
581586
// to kick off downsampling
582587
affectedIndices.add(index);
583-
downsampleIndexOnce(round, project.id(), indexName, downsampleIndexName);
588+
downsampleIndexOnce(round, downsamplingMethod, project.id(), backingIndex, downsampleIndexName);
584589
}
585590
}
586591
}
@@ -592,16 +597,22 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
592597
*/
593598
private void downsampleIndexOnce(
594599
DataStreamLifecycle.DownsamplingRound round,
600+
DownsampleConfig.SamplingMethod requestedDownsamplingMethod,
595601
ProjectId projectId,
596-
String sourceIndex,
602+
IndexMetadata sourceIndexMetadata,
597603
String downsampleIndexName
598604
) {
605+
var sourceIndexSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata(sourceIndexMetadata);
606+
String sourceIndex = sourceIndexMetadata.getIndex().getName();
599607
DownsampleAction.Request request = new DownsampleAction.Request(
600608
TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */,
601609
sourceIndex,
602610
downsampleIndexName,
603611
null,
604-
new DownsampleConfig(round.fixedInterval(), null)
612+
new DownsampleConfig(
613+
round.fixedInterval(),
614+
sourceIndexSamplingMethod == null ? requestedDownsamplingMethod : sourceIndexSamplingMethod
615+
)
605616
);
606617
transportActionsDeduplicator.executeOnce(
607618
Tuple.tuple(projectId, request),
@@ -632,11 +643,12 @@ private Set<Index> evaluateDownsampleStatus(
632643
IndexMetadata.DownsampleTaskStatus downsampleStatus,
633644
DataStreamLifecycle.DownsamplingRound currentRound,
634645
DataStreamLifecycle.DownsamplingRound lastRound,
635-
Index backingIndex,
646+
DownsampleConfig.SamplingMethod downsamplingMethod,
647+
IndexMetadata backingIndex,
636648
Index downsampleIndex
637649
) {
638650
Set<Index> affectedIndices = new HashSet<>();
639-
String indexName = backingIndex.getName();
651+
String indexName = backingIndex.getIndex().getName();
640652
String downsampleIndexName = downsampleIndex.getName();
641653
return switch (downsampleStatus) {
642654
case UNKNOWN -> {
@@ -683,15 +695,15 @@ private Set<Index> evaluateDownsampleStatus(
683695
// NOTE that the downsample request is made through the deduplicator so it will only really be executed if
684696
// there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a
685697
// master failover and data stream lifecycle needed to restart
686-
downsampleIndexOnce(currentRound, projectId, indexName, downsampleIndexName);
687-
affectedIndices.add(backingIndex);
698+
downsampleIndexOnce(currentRound, downsamplingMethod, projectId, backingIndex, downsampleIndexName);
699+
affectedIndices.add(backingIndex.getIndex());
688700
yield affectedIndices;
689701
}
690702
case SUCCESS -> {
691703
if (dataStream.getIndices().contains(downsampleIndex) == false) {
692704
// at this point the source index is part of the data stream and the downsample index is complete but not
693705
// part of the data stream. we need to replace the source index with the downsample index in the data stream
694-
affectedIndices.add(backingIndex);
706+
affectedIndices.add(backingIndex.getIndex());
695707
replaceBackingIndexWithDownsampleIndexOnce(projectId, dataStream, indexName, downsampleIndexName);
696708
}
697709
yield affectedIndices;

0 commit comments

Comments
 (0)