99
1010package org .elasticsearch .action .downsample ;
1111
12+ import org .elasticsearch .TransportVersion ;
1213import org .elasticsearch .cluster .metadata .IndexMetadata ;
1314import org .elasticsearch .common .Rounding ;
1415import org .elasticsearch .common .Strings ;
1516import org .elasticsearch .common .io .stream .NamedWriteable ;
1617import org .elasticsearch .common .io .stream .StreamInput ;
1718import org .elasticsearch .common .io .stream .StreamOutput ;
19+ import org .elasticsearch .common .io .stream .Writeable ;
20+ import org .elasticsearch .core .Nullable ;
1821import org .elasticsearch .core .TimeValue ;
1922import org .elasticsearch .index .mapper .DataStreamTimestampFieldMapper ;
2023import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramAggregationBuilder ;
2831
2932import java .io .IOException ;
3033import java .time .ZoneId ;
34+ import java .util .Arrays ;
35+ import java .util .Locale ;
3136import java .util .Objects ;
3237
3338import static org .elasticsearch .xcontent .ConstructingObjectParser .constructorArg ;
39+ import static org .elasticsearch .xcontent .ConstructingObjectParser .optionalConstructorArg ;
3440
3541/**
3642 * This class holds the configuration details of a DownsampleAction that downsamples time series
3743 * (TSDB) indices. We have made great effort to simplify the rollup configuration and currently
38- * only requires a fixed time interval. So, it has the following format:
44+ * only requires a fixed time interval and optionally the sampling method . So, it has the following format:
3945 *
4046 * {
41- * "fixed_interval" : "1d",
47+ * "fixed_interval": "1d",
48+ * "sampling_method": "aggregate"
4249 * }
4350 *
4451 * fixed_interval is one or multiples of SI units and has no calendar-awareness (e.g. doesn't account
5360 * future extensions.
5461 */
5562public class DownsampleConfig implements NamedWriteable , ToXContentObject {
63+ public static final TransportVersion ADD_LAST_VALUE_DOWNSAMPLE_API = TransportVersion .fromName ("add_last_value_downsample_api" );
5664
5765 private static final String NAME = "downsample/action/config" ;
5866 public static final String FIXED_INTERVAL = "fixed_interval" ;
67+ public static final String SAMPLING_METHOD = "sampling_method" ;
5968 public static final String TIME_ZONE = "time_zone" ;
6069 public static final String DEFAULT_TIMEZONE = ZoneId .of ("UTC" ).getId ();
6170
6271 private static final String timestampField = DataStreamTimestampFieldMapper .DEFAULT_PATH ;
6372 private final DateHistogramInterval fixedInterval ;
6473 private final String timeZone = DEFAULT_TIMEZONE ;
6574 private final String intervalType = FIXED_INTERVAL ;
75+ @ Nullable
76+ private final SamplingMethod samplingMethod ;
6677
6778 private static final ConstructingObjectParser <DownsampleConfig , Void > PARSER ;
79+
6880 static {
6981 PARSER = new ConstructingObjectParser <>(NAME , a -> {
7082 DateHistogramInterval fixedInterval = (DateHistogramInterval ) a [0 ];
7183 if (fixedInterval != null ) {
72- return new DownsampleConfig (fixedInterval );
84+ return new DownsampleConfig (fixedInterval , ( SamplingMethod ) a [ 1 ] );
7385 } else {
7486 throw new IllegalArgumentException ("Parameter [" + FIXED_INTERVAL + "] is required." );
7587 }
@@ -81,24 +93,48 @@ public class DownsampleConfig implements NamedWriteable, ToXContentObject {
8193 new ParseField (FIXED_INTERVAL ),
8294 ObjectParser .ValueType .STRING
8395 );
96+ PARSER .declareField (
97+ optionalConstructorArg (),
98+ p -> SamplingMethod .fromString (p .text ()),
99+ new ParseField (SAMPLING_METHOD ),
100+ ObjectParser .ValueType .STRING
101+ );
84102 }
85103
86104 /**
87105 * Create a new {@link DownsampleConfig} using the given configuration parameters.
88106 * @param fixedInterval the fixed interval to use for computing the date histogram for the rolled up documents (required).
107+ * @deprecated please use {@link DownsampleConfig#DownsampleConfig(DateHistogramInterval, SamplingMethod)}, this method is being kept
108+ * until the sampling method is completely integrated with ILM and DLM.
89109 */
110+ @ Deprecated
90111 public DownsampleConfig (final DateHistogramInterval fixedInterval ) {
112+ this (fixedInterval , null );
113+ }
114+
115+ /**
116+ * Create a new {@link DownsampleConfig} using the given configuration parameters.
117+ * @param fixedInterval the fixed interval to use for computing the date histogram for the rolled up documents (required).
118+ * @param samplingMethod the method used to downsample metrics, when null it default to {@link SamplingMethod#AGGREGATE}.
119+ */
120+ public DownsampleConfig (final DateHistogramInterval fixedInterval , @ Nullable SamplingMethod samplingMethod ) {
91121 if (fixedInterval == null ) {
92122 throw new IllegalArgumentException ("Parameter [" + FIXED_INTERVAL + "] is required." );
93123 }
94124 this .fixedInterval = fixedInterval ;
125+ this .samplingMethod = samplingMethod ;
95126
96127 // validate interval
97128 createRounding (this .fixedInterval .toString (), this .timeZone );
98129 }
99130
100131 public DownsampleConfig (final StreamInput in ) throws IOException {
101132 fixedInterval = new DateHistogramInterval (in );
133+ if (in .getTransportVersion ().supports (ADD_LAST_VALUE_DOWNSAMPLE_API )) {
134+ samplingMethod = in .readOptionalWriteable (SamplingMethod ::read );
135+ } else {
136+ samplingMethod = null ;
137+ }
102138 }
103139
104140 /**
@@ -135,6 +171,9 @@ public static void validateSourceAndTargetIntervals(DownsampleConfig source, Dow
135171 @ Override
136172 public void writeTo (final StreamOutput out ) throws IOException {
137173 fixedInterval .writeTo (out );
174+ if (out .getTransportVersion ().supports (ADD_LAST_VALUE_DOWNSAMPLE_API )) {
175+ out .writeOptionalWriteable (samplingMethod );
176+ }
138177 }
139178
140179 /**
@@ -180,6 +219,21 @@ public Rounding.Prepared createRounding() {
180219 return createRounding (fixedInterval .toString (), timeZone );
181220 }
182221
222+ /**
223+ * @return the user configured sampling method
224+ */
225+ @ Nullable
226+ public SamplingMethod getSamplingMethod () {
227+ return samplingMethod ;
228+ }
229+
230+ /**
231+ * @return the sampling method that will be used based on this configuration.
232+ */
233+ public SamplingMethod getSamplingMethodOrDefault () {
234+ return SamplingMethod .getOrDefault (samplingMethod );
235+ }
236+
183237 @ Override
184238 public String getWriteableName () {
185239 return NAME ;
@@ -195,7 +249,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
195249 }
196250
197251 public XContentBuilder toXContentFragment (final XContentBuilder builder ) throws IOException {
198- return builder .field (FIXED_INTERVAL , fixedInterval .toString ());
252+ builder .field (FIXED_INTERVAL , fixedInterval .toString ());
253+ if (samplingMethod != null ) {
254+ builder .field (SAMPLING_METHOD , samplingMethod .label );
255+ }
256+ return builder ;
199257 }
200258
201259 public static DownsampleConfig fromXContent (final XContentParser parser ) throws IOException {
@@ -213,12 +271,13 @@ public boolean equals(final Object other) {
213271 final DownsampleConfig that = (DownsampleConfig ) other ;
214272 return Objects .equals (fixedInterval , that .fixedInterval )
215273 && Objects .equals (intervalType , that .intervalType )
216- && ZoneId .of (timeZone , ZoneId .SHORT_IDS ).getRules ().equals (ZoneId .of (that .timeZone , ZoneId .SHORT_IDS ).getRules ());
274+ && ZoneId .of (timeZone , ZoneId .SHORT_IDS ).getRules ().equals (ZoneId .of (that .timeZone , ZoneId .SHORT_IDS ).getRules ())
275+ && Objects .equals (samplingMethod , that .samplingMethod );
217276 }
218277
219278 @ Override
220279 public int hashCode () {
221- return Objects .hash (fixedInterval , intervalType , ZoneId .of (timeZone ));
280+ return Objects .hash (fixedInterval , intervalType , ZoneId .of (timeZone ), samplingMethod );
222281 }
223282
224283 @ Override
@@ -264,4 +323,95 @@ public static String generateDownsampleIndexName(
264323 }
265324 return prefix + fixedInterval + "-" + sourceIndexName ;
266325 }
326+
327+ public enum SamplingMethod implements Writeable {
328+ AGGREGATE ((byte ) 0 , "aggregate" ),
329+ LAST_VALUE ((byte ) 1 , "last_value" );
330+
331+ private final byte id ;
332+ private final String label ;
333+
334+ SamplingMethod (byte id , String label ) {
335+ this .id = id ;
336+ this .label = label ;
337+ }
338+
339+ byte id () {
340+ return id ;
341+ }
342+
343+ public static SamplingMethod read (StreamInput in ) throws IOException {
344+ var id = in .readByte ();
345+ return switch (id ) {
346+ case 0 -> AGGREGATE ;
347+ case 1 -> LAST_VALUE ;
348+ default -> throw new IllegalArgumentException (
349+ "Sampling method id ["
350+ + id
351+ + "] is not one of the accepted ids "
352+ + Arrays .stream (values ()).map (SamplingMethod ::id ).toList ()
353+ + "."
354+ );
355+ };
356+ }
357+
358+ /**
359+ * Parses the configured sampling method from string (case-insensitive).
360+ * @return the used sampling method, or null when the label is null.
361+ */
362+ @ Nullable
363+ public static SamplingMethod fromString (@ Nullable String label ) {
364+ if (label == null ) {
365+ return null ;
366+ }
367+ return switch (label .toLowerCase (Locale .ROOT )) {
368+ case "aggregate" -> AGGREGATE ;
369+ case "last_value" -> LAST_VALUE ;
370+ default -> throw new IllegalArgumentException (
371+ "Sampling method ["
372+ + label
373+ + "] is not one of the accepted methods "
374+ + Arrays .stream (values ()).map (SamplingMethod ::toString ).toList ()
375+ + "."
376+ );
377+ };
378+ }
379+
380+ /**
381+ * Retrieves the configured sampling method from the index metadata. In case that it is null,
382+ * it checks if the index is downsampled and returns the `aggregate` that was the only sampling
383+ * method before we introduced last value.
384+ * @return the used sampling method, or null if the index is not downsampled.
385+ */
386+ @ Nullable
387+ public static SamplingMethod fromIndexMetadata (IndexMetadata indexMetadata ) {
388+ SamplingMethod method = fromString (indexMetadata .getSettings ().get (IndexMetadata .INDEX_DOWNSAMPLE_METHOD_KEY ));
389+ if (method != null ) {
390+ return method ;
391+ }
392+ // Indices downsampled before the sampling method was introduced, will not have the sampling method in their metadata.
393+ // For this reason, we first verify that they are indeed downsampled, and then we return `aggregate` because this was
394+ // the only option available at the time.
395+ boolean isIndexDownsampled = indexMetadata .getSettings ().get (IndexMetadata .INDEX_DOWNSAMPLE_INTERVAL_KEY ) != null ;
396+ return isIndexDownsampled ? AGGREGATE : null ;
397+ }
398+
399+ /**
400+ * @return the sampling method that will be used based on this configuration. Default to {@link SamplingMethod#AGGREGATE}
401+ * when the provided sampling method is null.
402+ */
403+ public static SamplingMethod getOrDefault (@ Nullable SamplingMethod samplingMethod ) {
404+ return samplingMethod == null ? SamplingMethod .AGGREGATE : samplingMethod ;
405+ }
406+
407+ @ Override
408+ public void writeTo (StreamOutput out ) throws IOException {
409+ out .writeByte (id );
410+ }
411+
412+ @ Override
413+ public String toString () {
414+ return label ;
415+ }
416+ }
267417}
0 commit comments