Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
PutSampleConfigurationAction.Request putRequest;

XContentParser parser = request.contentParser();
SamplingConfiguration samplingConfig = SamplingConfiguration.fromXContent(parser);
SamplingConfiguration samplingConfig = SamplingConfiguration.fromXContentUserData(parser);
putRequest = new PutSampleConfigurationAction.Request(samplingConfig, getMasterNodeTimeout(request), getAckTimeout(request));

// Set the target index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Configuration for sampling raw documents in an index.
*/
public record SamplingConfiguration(double rate, Integer maxSamples, ByteSizeValue maxSize, TimeValue timeToLive, String condition)
implements
ToXContentObject,
SimpleDiffable<SamplingConfiguration> {
public record SamplingConfiguration(
double rate,
Integer maxSamples,
ByteSizeValue maxSize,
TimeValue timeToLive,
String condition,
Long creationTime
) implements ToXContentObject, SimpleDiffable<SamplingConfiguration> {

public static final String TYPE = "sampling_configuration";
private static final String RATE_FIELD_NAME = "rate";
Expand All @@ -43,6 +50,10 @@ public record SamplingConfiguration(double rate, Integer maxSamples, ByteSizeVal
private static final String TIME_TO_LIVE_IN_MILLIS_FIELD_NAME = "time_to_live_in_millis";
private static final String TIME_TO_LIVE_FIELD_NAME = "time_to_live";
private static final String CONDITION_FIELD_NAME = "if";
private static final String CREATION_TIME_IN_MILLIS_FIELD_NAME = "creation_time_in_millis";
private static final String CREATION_TIME_FIELD_NAME = "creation_time";

private static final String IS_USER_DATA_CONTEXT_KEY = "is_user_data";

// Constants for validation and defaults
public static final int MAX_SAMPLES_LIMIT = 10_000;
Expand All @@ -64,27 +75,37 @@ public record SamplingConfiguration(double rate, Integer maxSamples, ByteSizeVal
+ " days";
public static final String INVALID_CONDITION_MESSAGE = "condition script, if provided, must not be empty";

private static final ConstructingObjectParser<SamplingConfiguration, Void> PARSER = new ConstructingObjectParser<>(
TYPE,
false,
args -> {
private static final ConstructingObjectParser<SamplingConfiguration, Map<String, Boolean>> PARSER = new ConstructingObjectParser<
SamplingConfiguration,
Map<String, Boolean>>(TYPE, false, (args, context) -> {
Double rate = (Double) args[0];
Integer maxSamples = (Integer) args[1];
ByteSizeValue humanReadableMaxSize = (ByteSizeValue) args[2];
ByteSizeValue rawMaxSize = (ByteSizeValue) args[3];
TimeValue humanReadableTimeToLive = (TimeValue) args[4];
TimeValue rawTimeToLive = (TimeValue) args[5];
String condition = (String) args[6];
Long rawCreationTime = (Long) args[8];

if (context.get(IS_USER_DATA_CONTEXT_KEY)) {
validateInputs(
rate,
maxSamples,
determineValue(humanReadableMaxSize, rawMaxSize),
determineValue(humanReadableTimeToLive, rawTimeToLive),
condition
);
}

return new SamplingConfiguration(
rate,
maxSamples,
determineValue(humanReadableMaxSize, rawMaxSize),
determineValue(humanReadableTimeToLive, rawTimeToLive),
condition
condition,
rawCreationTime
);
}
);
});

static {
PARSER.declareDouble(constructorArg(), new ParseField(RATE_FIELD_NAME));
Expand Down Expand Up @@ -116,10 +137,18 @@ public record SamplingConfiguration(double rate, Integer maxSamples, ByteSizeVal
ObjectParser.ValueType.LONG
);
PARSER.declareString(optionalConstructorArg(), new ParseField(CONDITION_FIELD_NAME));
PARSER.declareField(optionalConstructorArg(), (p, c) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just delete this (and adjust the array index of rawCreationTime above), right? It doesn't seem to actually be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see it does make for a more meaningful user error message when they try to set creation_time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but also since we're using builder.timestampFieldsFromUnixEpochMillis(), it can lead to some weird errors if you don't add parsing instructions for both the raw and human readable fields. so i just added parsing instructions + ignored the human readable one up above

validateUserDataContext(c, CREATION_TIME_FIELD_NAME);
return Instant.parse(p.text()).toEpochMilli();
}, new ParseField(CREATION_TIME_FIELD_NAME), ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> {
validateUserDataContext(c, CREATION_TIME_IN_MILLIS_FIELD_NAME);
return p.longValue();
}, new ParseField(CREATION_TIME_IN_MILLIS_FIELD_NAME), ObjectParser.ValueType.LONG);
}

/**
* Constructor with validation and defaulting for optional fields.
* Constructor with defaulting for optional fields.
*
* @param rate The fraction of documents to sample (must be between 0 and 1)
* @param maxSamples The maximum number of documents to sample (optional, defaults to {@link #DEFAULT_MAX_SAMPLES})
Expand All @@ -129,15 +158,25 @@ public record SamplingConfiguration(double rate, Integer maxSamples, ByteSizeVal
* @param condition An optional condition script that sampled documents must satisfy (optional, can be null)
* @throws IllegalArgumentException If any of the parameters are invalid, according to the validation rules
*/
public SamplingConfiguration(double rate, Integer maxSamples, ByteSizeValue maxSize, TimeValue timeToLive, String condition) {
validateInputs(rate, maxSamples, maxSize, timeToLive, condition);

// Initialize record fields
public SamplingConfiguration(
double rate,
Integer maxSamples,
ByteSizeValue maxSize,
TimeValue timeToLive,
String condition,
Long creationTime
) {
this.rate = rate;
this.maxSamples = maxSamples == null ? DEFAULT_MAX_SAMPLES : maxSamples;
this.maxSize = maxSize == null ? ByteSizeValue.ofGb(DEFAULT_MAX_SIZE_GIGABYTES) : maxSize;
this.timeToLive = timeToLive == null ? TimeValue.timeValueDays(DEFAULT_TIME_TO_LIVE_DAYS) : timeToLive;
this.condition = condition;
this.creationTime = creationTime == null ? Instant.now().toEpochMilli() : creationTime;
}

// Convenience constructor without creationTime
public SamplingConfiguration(double rate, Integer maxSamples, ByteSizeValue maxSize, TimeValue timeToLive, String condition) {
this(rate, maxSamples, maxSize, timeToLive, condition, null);
}

/**
Expand All @@ -147,7 +186,7 @@ public SamplingConfiguration(double rate, Integer maxSamples, ByteSizeValue maxS
* @throws IOException If an I/O error occurs during deserialization
*/
public SamplingConfiguration(StreamInput in) throws IOException {
this(in.readDouble(), in.readInt(), ByteSizeValue.readFrom(in), in.readTimeValue(), in.readOptionalString());
this(in.readDouble(), in.readInt(), ByteSizeValue.readFrom(in), in.readTimeValue(), in.readOptionalString(), in.readLong());
}

// Write to StreamOutput
Expand All @@ -158,6 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeWriteable(this.maxSize);
out.writeTimeValue(this.timeToLive);
out.writeOptionalString(this.condition);
out.writeLong(this.creationTime);
}

// Serialize to XContent (JSON)
Expand All @@ -171,6 +211,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (condition != null) {
builder.field(CONDITION_FIELD_NAME, condition);
}
builder.timestampFieldsFromUnixEpochMillis(CREATION_TIME_IN_MILLIS_FIELD_NAME, CREATION_TIME_FIELD_NAME, creationTime);
builder.endObject();
return builder;
}
Expand All @@ -183,7 +224,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @throws IOException If parsing fails due to invalid JSON or I/O errors
*/
public static SamplingConfiguration fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
Map<String, Boolean> context = new HashMap<>();
context.put(IS_USER_DATA_CONTEXT_KEY, false);
return PARSER.parse(parser, context);
}

public static SamplingConfiguration fromXContentUserData(XContentParser parser) throws IOException {
return PARSER.parse(parser, Map.of(IS_USER_DATA_CONTEXT_KEY, true));
}

/**
Expand Down Expand Up @@ -249,4 +296,10 @@ private static <T> T determineValue(T humanReadableValue, T rawValue) {
return humanReadableValue != null ? humanReadableValue : rawValue;

}

private static void validateUserDataContext(Map<String, Boolean> context, String fieldName) {
if (context.get(IS_USER_DATA_CONTEXT_KEY) == Boolean.TRUE) {
throw new IllegalArgumentException("Creation time cannot be set by user (field: " + fieldName + ")");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -932,13 +932,14 @@ public Tuple<ClusterState, ClusterStateAckListener> executeTask(
) {
logger.debug(
"Updating sampling configuration for index [{}] with rate [{}],"
+ " maxSamples [{}], maxSize [{}], timeToLive [{}], condition[{}]",
+ " maxSamples [{}], maxSize [{}], timeToLive [{}], condition [{}], creationTime [{}]",
updateSamplingConfigurationTask.indexName,
updateSamplingConfigurationTask.samplingConfiguration.rate(),
updateSamplingConfigurationTask.samplingConfiguration.maxSamples(),
updateSamplingConfigurationTask.samplingConfiguration.maxSize(),
updateSamplingConfigurationTask.samplingConfiguration.timeToLive(),
updateSamplingConfigurationTask.samplingConfiguration.condition()
updateSamplingConfigurationTask.samplingConfiguration.condition(),
updateSamplingConfigurationTask.samplingConfiguration.creationTime()
);

// Get sampling metadata
Expand Down
Loading