Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a1bed05
Deleting sample when sampling configuration is deleted or changed
masseyke Oct 7, 2025
2d37869
Merge branch 'main' into random-sampling-cluster-changed
masseyke Oct 7, 2025
e9c5479
minor performance improvements
masseyke Oct 7, 2025
7aead54
Merge branch 'main' into random-sampling-cluster-changed
masseyke Oct 7, 2025
018717b
Add put sample config action
seanzatzdev Oct 8, 2025
ae4a977
Merge branch 'main' into putsamplingconfig
seanzatzdev Oct 8, 2025
9647e52
Update server/src/main/java/org/elasticsearch/ingest/SamplingService.…
seanzatzdev Oct 8, 2025
0d77476
Update server/src/main/java/org/elasticsearch/action/admin/indices/sa…
seanzatzdev Oct 8, 2025
ccaf2d7
respond to PR comments
seanzatzdev Oct 8, 2025
95fcb1e
[CI] Auto commit changes from spotless
Oct 8, 2025
c376005
Update docs/changelog/136148.yaml
seanzatzdev Oct 8, 2025
7ea3e8d
Add upper bound on # of sampling configs & nit fixes
seanzatzdev Oct 8, 2025
f8c436f
Update server/src/test/java/org/elasticsearch/ingest/UpdateSamplingCo…
seanzatzdev Oct 8, 2025
c6af926
nit fix
seanzatzdev Oct 8, 2025
652b6ca
Make MAX_CONFIGURATIONS_SETTING
seanzatzdev Oct 8, 2025
711ee48
[CI] Auto commit changes from spotless
Oct 8, 2025
346ed35
update max sampling config test with further mocking
seanzatzdev Oct 8, 2025
acdff98
[CI] Auto commit changes from spotless
Oct 8, 2025
2a61a06
naming changes
seanzatzdev Oct 8, 2025
8bd9c67
remove extraneous logging
seanzatzdev Oct 9, 2025
47aa460
[CI] Auto commit changes from spotless
Oct 9, 2025
484ab42
merging main
masseyke Oct 9, 2025
5af5119
Delete docs/changelog/136148.yaml
seanzatzdev Oct 9, 2025
fd61f44
respond to PR comments
seanzatzdev Oct 9, 2025
f72baf8
[CI] Auto commit changes from spotless
Oct 9, 2025
2cd6b62
fix failing test
seanzatzdev Oct 10, 2025
3d1da2d
Merge branch 'main' into random-sampling-cluster-changed
masseyke Oct 10, 2025
61e2fc0
copilot feedback
masseyke Oct 10, 2025
c42e69d
Merge branch 'random-sampling-cluster-changed' into random-sampling-e…
masseyke Oct 10, 2025
52b6a56
merging putsamplingconfig
masseyke Oct 10, 2025
582f251
Random Sampling TTL prototype
masseyke Oct 10, 2025
6013d7f
[CI] Auto commit changes from spotless
Oct 10, 2025
db33687
merging main
masseyke Oct 10, 2025
f75a2e4
Add creationDate to SamplingConfiguration
seanzatzdev Oct 11, 2025
44d1341
Update server/src/main/java/org/elasticsearch/action/admin/indices/sa…
seanzatzdev Oct 11, 2025
f4c9939
nit fixes
seanzatzdev Oct 11, 2025
89355b9
Add creationTime tests
seanzatzdev Oct 13, 2025
1e942da
[CI] Auto commit changes from spotless
Oct 13, 2025
afa554d
Merge branch 'main' into ttl-in-config
seanzatzdev Oct 13, 2025
68e6efe
nit fixes
seanzatzdev Oct 13, 2025
6bcd7ea
add to logging
seanzatzdev Oct 13, 2025
312921b
fix typo
seanzatzdev Oct 13, 2025
8c7f2e7
merging main
masseyke Oct 13, 2025
9c39e86
merging ttl-in-config branch
masseyke Oct 13, 2025
ea53317
fixing failed merge
masseyke Oct 13, 2025
bf14861
[CI] Auto commit changes from spotless
Oct 13, 2025
c9f5baa
merging main
masseyke Oct 14, 2025
db39ae1
Merge branch 'random-sampling-everything' of github.com:masseyke/elas…
masseyke Oct 14, 2025
b5a9cd6
adding lifecycle parts
masseyke Oct 14, 2025
fe134c4
adding an integration test
masseyke Oct 14, 2025
b1a37b8
[CI] Auto commit changes from spotless
Oct 14, 2025
dcc6e20
cleanup
masseyke Oct 14, 2025
fce9dcc
Merge branch 'random-sampling-everything' of github.com:masseyke/elas…
masseyke Oct 14, 2025
b842304
merging main
masseyke Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction;
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.ingest.SamplingService.TTL_POLL_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class SamplingServiceIT extends ESIntegTestCase {
public void testTTL() throws Exception {
assertAcked(
clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Map.of(TTL_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)))
);
String indexName = randomIdentifier();
client().execute(TransportCreateIndexAction.TYPE, new CreateIndexRequest(indexName)).actionGet();
ensureYellow(indexName);
PutSampleConfigurationAction.Request putSampleConfigRequest = new PutSampleConfigurationAction.Request(
new SamplingConfiguration(1.0d, 10, null, TimeValue.timeValueSeconds(1), null),
TimeValue.THIRTY_SECONDS,
TimeValue.THIRTY_SECONDS
).indices(indexName);
client().execute(PutSampleConfigurationAction.INSTANCE, putSampleConfigRequest).actionGet();
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 20; i++) {
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.source(Map.of("foo", randomBoolean() ? 3L : randomLong(), "bar", randomBoolean()));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().execute(TransportBulkAction.TYPE, bulkRequest).actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName))
.actionGet();
assertThat(getSampleResponse.getSample().size(), equalTo(10));
assertBusy(() -> {
assertThrows(
ResourceNotFoundException.class,
() -> client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName)).actionGet()
);
});
}

@After
public void cleanup() {
Map<String, Object> clearedSettings = new HashMap<>();
clearedSettings.put(TTL_POLL_INTERVAL_SETTING.getKey(), null);
assertAcked(
clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(clearedSettings)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestSettings;
import org.elasticsearch.ingest.SamplingService;
import org.elasticsearch.monitor.fs.FsHealthService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -650,6 +651,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
SamplingService.TTL_POLL_INTERVAL_SETTING
);
}
169 changes: 153 additions & 16 deletions server/src/main/java/org/elasticsearch/ingest/SamplingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
Expand All @@ -29,10 +30,17 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.scheduler.SchedulerEngine;
import org.elasticsearch.common.scheduler.TimeValueSchedule;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,6 +61,10 @@

import java.io.IOException;
import java.lang.ref.SoftReference;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -67,13 +79,20 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class SamplingService implements ClusterStateListener {
public class SamplingService extends AbstractLifecycleComponent implements ClusterStateListener, SchedulerEngine.Listener {
public static final boolean RANDOM_SAMPLING_FEATURE_FLAG = new FeatureFlag("random_sampling").isEnabled();
private static final Logger logger = LogManager.getLogger(SamplingService.class);
private static final String TTL_JOB_ID = "sampling_ttl";
public static final Setting<TimeValue> TTL_POLL_INTERVAL_SETTING = Setting.timeSetting(
"random.sampling.ttl.poll.interval",
TimeValue.timeValueMinutes(30),
TimeValue.timeValueSeconds(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private final ScriptService scriptService;
private final ClusterService clusterService;
private final ProjectResolver projectResolver;
private final LongSupplier relativeMillisTimeSupplier;
private final LongSupplier statsTimeSupplier = System::nanoTime;
private final MasterServiceTaskQueue<UpdateSamplingConfigurationTask> updateSamplingConfigurationTaskQueue;
private final MasterServiceTaskQueue<DeleteSampleConfigurationTask> deleteSamplingConfigurationTaskQueue;
Expand All @@ -85,24 +104,41 @@ public class SamplingService implements ClusterStateListener {
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private SchedulerEngine.Job scheduledJob;
private volatile TimeValue pollInterval;
private final Settings settings;
private final Clock clock = Clock.systemUTC();
/*
* This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that
* sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We
* make a best effort to keep them around, but do not worry about the complexity or cost of making them durable.
*/
private final Map<ProjectIndex, SoftReference<SampleInfo>> samples = new ConcurrentHashMap<>();

public SamplingService(
/*
* This creates a new SamplingService, and configures various listeners on it.
*/
public static SamplingService create(
ScriptService scriptService,
ClusterService clusterService,
ProjectResolver projectResolver,
LongSupplier relativeMillisTimeSupplier
Settings settings
) {
SamplingService samplingService = new SamplingService(scriptService, clusterService, projectResolver, settings);
samplingService.configureListeners();
return samplingService;
}

private SamplingService(
ScriptService scriptService,
ClusterService clusterService,
ProjectResolver projectResolver,
Settings settings
) {
this.scriptService = scriptService;
this.clusterService = clusterService;
this.projectResolver = projectResolver;
this.relativeMillisTimeSupplier = relativeMillisTimeSupplier;
this.updateSamplingConfigurationTaskQueue = clusterService.createTaskQueue(
"update-sampling-configuration",
Priority.NORMAL,
Expand All @@ -113,6 +149,24 @@ public SamplingService(
Priority.NORMAL,
new DeleteSampleConfigurationExecutor()
);
this.settings = settings;
this.pollInterval = TTL_POLL_INTERVAL_SETTING.get(settings);
}

private void configureListeners() {
ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(TTL_POLL_INTERVAL_SETTING, (v) -> {
pollInterval = v;
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
maybeScheduleJob();
}
});
this.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStop() {
cancelJob();
}
});
}

/**
Expand Down Expand Up @@ -172,11 +226,7 @@ private void maybeSample(
}
SoftReference<SampleInfo> sampleInfoReference = samples.compute(
new ProjectIndex(projectMetadata.id(), indexName),
(k, v) -> v == null || v.get() == null
? new SoftReference<>(
new SampleInfo(samplingConfig.maxSamples(), samplingConfig.timeToLive(), relativeMillisTimeSupplier.getAsLong())
)
: v
(k, v) -> v == null || v.get() == null ? new SoftReference<>(new SampleInfo(samplingConfig.maxSamples())) : v
);
SampleInfo sampleInfo = sampleInfoReference.get();
if (sampleInfo == null) {
Expand Down Expand Up @@ -350,6 +400,17 @@ public void clusterChanged(ClusterChangedEvent event) {
if (RANDOM_SAMPLING_FEATURE_FLAG == false) {
return;
}
final boolean isMaster = event.localNodeMaster();
final boolean wasMaster = event.previousState().nodes().isLocalNodeElectedMaster();
if (wasMaster != isMaster) {
if (isMaster) {
// we weren't the master, and now we are
maybeScheduleJob();
} else {
// we were the master, and now we aren't
cancelJob();
}
}
if (samples.isEmpty()) {
return;
}
Expand Down Expand Up @@ -407,6 +468,33 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

private void maybeScheduleJob() {
if (scheduler.get() == null) {
// don't create scheduler if the node is shutting down
if (isClusterServiceStoppedOrClosed() == false) {
scheduler.set(new SchedulerEngine(settings, clock));
scheduler.get().register(this);
}
}
// scheduler could be null if the node is shutting down
if (scheduler.get() != null && (lifecycleState() == Lifecycle.State.STARTED || lifecycleState() == Lifecycle.State.INITIALIZED)) {
scheduledJob = new SchedulerEngine.Job(TTL_JOB_ID, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}
}

private void cancelJob() {
if (scheduler.get() != null) {
scheduler.get().remove(TTL_JOB_ID);
scheduledJob = null;
}
}

private boolean isClusterServiceStoppedOrClosed() {
final Lifecycle.State state = clusterService.lifecycleState();
return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED;
}

private boolean evaluateCondition(
Supplier<IngestDocument> ingestDocumentSupplier,
Script script,
Expand Down Expand Up @@ -457,6 +545,59 @@ private static boolean checkMaxConfigLimitBreached(ProjectId projectId, String i
return false;
}

@Override
public void triggered(SchedulerEngine.Event event) {
logger.debug("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());
checkTTLs();
}

@Override
protected void doStart() {}

@Override
protected void doStop() {
clusterService.removeListener(this);
logger.debug("Sampling service is stopping.");
}

@Override
protected void doClose() throws IOException {
logger.debug("Sampling service is closing.");
SchedulerEngine engine = scheduler.get();
if (engine != null) {
engine.stop();
}
}

private void checkTTLs() {
long now = Instant.now().toEpochMilli();
for (ProjectMetadata projectMetadata : clusterService.state().metadata().projects().values()) {
SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE);
if (samplingMetadata != null) {
for (Map.Entry<String, SamplingConfiguration> entry : samplingMetadata.getIndexToSamplingConfigMap().entrySet()) {
SamplingConfiguration samplingConfiguration = entry.getValue();
if (samplingConfiguration.creationTime() + samplingConfiguration.timeToLive().millis() < now) {
logger.debug(
"Configuration created at "
+ ZonedDateTime.ofInstant(Instant.ofEpochMilli(samplingConfiguration.creationTime()), ZoneOffset.UTC)
+ " is older than "
+ samplingConfiguration.timeToLive()
+ " because it is now "
+ ZonedDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneOffset.UTC)
);
deleteSampleConfiguration(
projectMetadata.id(),
entry.getKey(),
TimeValue.THIRTY_SECONDS,
TimeValue.THIRTY_SECONDS,
ActionListener.noop()
);
}
}
}
}
}

/*
* This represents a raw document as the user sent it to us in an IndexRequest. It only holds onto the information needed for the
* sampling API, rather than holding all of the fields a user might send in an IndexRequest.
Expand Down Expand Up @@ -835,18 +976,14 @@ private static final class SampleInfo {
*/
private volatile Tuple<Integer, Long> sizeInBytesAtIndex = Tuple.tuple(-1, 0L);
private final SampleStats stats;
private final long expiration;
private final TimeValue timeToLive;
private volatile Script script;
private volatile IngestConditionalScript.Factory factory;
private volatile boolean compilationFailed = false;
private volatile boolean isFull = false;

SampleInfo(int maxSamples, TimeValue timeToLive, long relativeNowMillis) {
this.timeToLive = timeToLive;
SampleInfo(int maxSamples) {
this.rawDocuments = new RawDocument[maxSamples];
this.stats = new SampleStats();
this.expiration = (timeToLive == null ? TimeValue.timeValueDays(5).millis() : timeToLive.millis()) + relativeNowMillis;
}

/*
Expand Down
Loading