-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Random sampling TTL prototype #136430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Random sampling TTL prototype #136430
Changes from 50 commits
Commits
Show all changes
54 commits
Select commit
Hold shift + click to select a range
a1bed05
Deleting sample when sampling configuration is deleted or changed
masseyke 2d37869
Merge branch 'main' into random-sampling-cluster-changed
masseyke e9c5479
minor performance improvements
masseyke 7aead54
Merge branch 'main' into random-sampling-cluster-changed
masseyke 018717b
Add put sample config action
seanzatzdev ae4a977
Merge branch 'main' into putsamplingconfig
seanzatzdev 9647e52
Update server/src/main/java/org/elasticsearch/ingest/SamplingService.…
seanzatzdev 0d77476
Update server/src/main/java/org/elasticsearch/action/admin/indices/sa…
seanzatzdev ccaf2d7
respond to PR comments
seanzatzdev 95fcb1e
[CI] Auto commit changes from spotless
c376005
Update docs/changelog/136148.yaml
seanzatzdev 7ea3e8d
Add upper bound on # of sampling configs & nit fixes
seanzatzdev f8c436f
Update server/src/test/java/org/elasticsearch/ingest/UpdateSamplingCo…
seanzatzdev c6af926
nit fix
seanzatzdev 652b6ca
Make MAX_CONFIGURATIONS_SETTING
seanzatzdev 711ee48
[CI] Auto commit changes from spotless
346ed35
update max sampling config test with further mocking
seanzatzdev acdff98
[CI] Auto commit changes from spotless
2a61a06
naming changes
seanzatzdev 8bd9c67
remove extraneous logging
seanzatzdev 47aa460
[CI] Auto commit changes from spotless
484ab42
merging main
masseyke 5af5119
Delete docs/changelog/136148.yaml
seanzatzdev fd61f44
respond to PR comments
seanzatzdev f72baf8
[CI] Auto commit changes from spotless
2cd6b62
fix failing test
seanzatzdev 3d1da2d
Merge branch 'main' into random-sampling-cluster-changed
masseyke 61e2fc0
copilot feedback
masseyke c42e69d
Merge branch 'random-sampling-cluster-changed' into random-sampling-e…
masseyke 52b6a56
merging putsamplingconfig
masseyke 582f251
Random Sampling TTL prototype
masseyke 6013d7f
[CI] Auto commit changes from spotless
db33687
merging main
masseyke f75a2e4
Add creationDate to SamplingConfiguration
seanzatzdev 44d1341
Update server/src/main/java/org/elasticsearch/action/admin/indices/sa…
seanzatzdev f4c9939
nit fixes
seanzatzdev 89355b9
Add creationTime tests
seanzatzdev 1e942da
[CI] Auto commit changes from spotless
afa554d
Merge branch 'main' into ttl-in-config
seanzatzdev 68e6efe
nit fixes
seanzatzdev 6bcd7ea
add to logging
seanzatzdev 312921b
fix typo
seanzatzdev 8c7f2e7
merging main
masseyke 9c39e86
merging ttl-in-config branch
masseyke ea53317
fixing failed merge
masseyke bf14861
[CI] Auto commit changes from spotless
c9f5baa
merging main
masseyke db39ae1
Merge branch 'random-sampling-everything' of github.com:masseyke/elas…
masseyke b5a9cd6
adding lifecycle parts
masseyke fe134c4
adding an integration test
masseyke b1a37b8
[CI] Auto commit changes from spotless
dcc6e20
cleanup
masseyke fce9dcc
Merge branch 'random-sampling-everything' of github.com:masseyke/elas…
masseyke b842304
merging main
masseyke File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
84 changes: 84 additions & 0 deletions
84
server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.ingest; | ||
|
||
import org.elasticsearch.ResourceNotFoundException; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; | ||
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction; | ||
import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction; | ||
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; | ||
import org.elasticsearch.action.bulk.BulkRequest; | ||
import org.elasticsearch.action.bulk.BulkResponse; | ||
import org.elasticsearch.action.bulk.TransportBulkAction; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.junit.After; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.ingest.SamplingService.TTL_POLL_INTERVAL_SETTING; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.hamcrest.Matchers.equalTo; | ||
|
||
public class SamplingServiceIT extends ESIntegTestCase { | ||
public void testTTL() throws Exception { | ||
assertAcked( | ||
clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
.setPersistentSettings(Map.of(TTL_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))) | ||
); | ||
String indexName = randomIdentifier(); | ||
client().execute(TransportCreateIndexAction.TYPE, new CreateIndexRequest(indexName)).actionGet(); | ||
ensureYellow(indexName); | ||
PutSampleConfigurationAction.Request putSampleConfigRequest = new PutSampleConfigurationAction.Request( | ||
new SamplingConfiguration(1.0d, 10, null, TimeValue.timeValueSeconds(1), null), | ||
TimeValue.THIRTY_SECONDS, | ||
TimeValue.THIRTY_SECONDS | ||
).indices(indexName); | ||
client().execute(PutSampleConfigurationAction.INSTANCE, putSampleConfigRequest).actionGet(); | ||
BulkRequest bulkRequest = new BulkRequest(); | ||
for (int i = 0; i < 20; i++) { | ||
IndexRequest indexRequest = new IndexRequest(indexName); | ||
indexRequest.source( | ||
Map.of( | ||
"@timestamp", | ||
randomTimeValue().toHumanReadableString(2), | ||
"foo", | ||
randomBoolean() ? 3L : randomLong(), | ||
"bar", | ||
randomBoolean() ? true : randomBoolean() | ||
) | ||
); | ||
bulkRequest.add(indexRequest); | ||
} | ||
BulkResponse bulkResponse = client().execute(TransportBulkAction.TYPE, bulkRequest).actionGet(); | ||
assertThat(bulkResponse.hasFailures(), equalTo(false)); | ||
GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName)) | ||
.actionGet(); | ||
assertThat(getSampleResponse.getSample().size(), equalTo(10)); | ||
assertBusy(() -> { | ||
assertThrows( | ||
ResourceNotFoundException.class, | ||
() -> client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName)).actionGet() | ||
); | ||
}); | ||
} | ||
|
||
@After | ||
public void cleanup() { | ||
Map<String, Object> clearedSettings = new HashMap<>(); | ||
clearedSettings.put(TTL_POLL_INTERVAL_SETTING.getKey(), null); | ||
assertAcked( | ||
clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(clearedSettings) | ||
); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ | |||||
|
||||||
package org.elasticsearch.ingest; | ||||||
|
||||||
import org.apache.lucene.util.SetOnce; | ||||||
import org.elasticsearch.action.ActionListener; | ||||||
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; | ||||||
import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; | ||||||
|
@@ -19,6 +20,7 @@ | |||||
import org.elasticsearch.cluster.ClusterState; | ||||||
import org.elasticsearch.cluster.ClusterStateAckListener; | ||||||
import org.elasticsearch.cluster.ClusterStateListener; | ||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask; | ||||||
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; | ||||||
import org.elasticsearch.cluster.metadata.Metadata; | ||||||
import org.elasticsearch.cluster.metadata.ProjectId; | ||||||
|
@@ -28,10 +30,17 @@ | |||||
import org.elasticsearch.cluster.service.MasterServiceTaskQueue; | ||||||
import org.elasticsearch.common.Priority; | ||||||
import org.elasticsearch.common.bytes.BytesReference; | ||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||||||
import org.elasticsearch.common.component.Lifecycle; | ||||||
import org.elasticsearch.common.component.LifecycleListener; | ||||||
import org.elasticsearch.common.io.stream.StreamInput; | ||||||
import org.elasticsearch.common.io.stream.StreamOutput; | ||||||
import org.elasticsearch.common.io.stream.Writeable; | ||||||
import org.elasticsearch.common.scheduler.SchedulerEngine; | ||||||
import org.elasticsearch.common.scheduler.TimeValueSchedule; | ||||||
import org.elasticsearch.common.settings.ClusterSettings; | ||||||
import org.elasticsearch.common.settings.Setting; | ||||||
import org.elasticsearch.common.settings.Settings; | ||||||
import org.elasticsearch.common.util.FeatureFlag; | ||||||
import org.elasticsearch.common.util.set.Sets; | ||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; | ||||||
|
@@ -52,6 +61,10 @@ | |||||
|
||||||
import java.io.IOException; | ||||||
import java.lang.ref.SoftReference; | ||||||
import java.time.Clock; | ||||||
import java.time.Instant; | ||||||
import java.time.ZoneOffset; | ||||||
import java.time.ZonedDateTime; | ||||||
import java.util.Arrays; | ||||||
import java.util.HashMap; | ||||||
import java.util.HashSet; | ||||||
|
@@ -66,13 +79,20 @@ | |||||
import java.util.function.LongSupplier; | ||||||
import java.util.function.Supplier; | ||||||
|
||||||
public class SamplingService implements ClusterStateListener { | ||||||
public class SamplingService extends AbstractLifecycleComponent implements ClusterStateListener, SchedulerEngine.Listener { | ||||||
public static final boolean RANDOM_SAMPLING_FEATURE_FLAG = new FeatureFlag("random_sampling").isEnabled(); | ||||||
private static final Logger logger = LogManager.getLogger(SamplingService.class); | ||||||
private static final String TTL_JOB_ID = "sampling_ttl"; | ||||||
public static final Setting<TimeValue> TTL_POLL_INTERVAL_SETTING = Setting.timeSetting( | ||||||
"random.sampling.ttl.poll.interval", | ||||||
TimeValue.timeValueMinutes(30), | ||||||
TimeValue.timeValueSeconds(1), | ||||||
Setting.Property.Dynamic, | ||||||
Setting.Property.NodeScope | ||||||
); | ||||||
private final ScriptService scriptService; | ||||||
private final ClusterService clusterService; | ||||||
private final ProjectResolver projectResolver; | ||||||
private final LongSupplier relativeMillisTimeSupplier; | ||||||
private final LongSupplier statsTimeSupplier = System::nanoTime; | ||||||
private final MasterServiceTaskQueue<UpdateSamplingConfigurationTask> updateSamplingConfigurationTaskQueue; | ||||||
|
||||||
|
@@ -83,29 +103,43 @@ public class SamplingService implements ClusterStateListener { | |||||
Setting.Property.NodeScope, | ||||||
Setting.Property.Dynamic | ||||||
); | ||||||
|
||||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>(); | ||||||
private SchedulerEngine.Job scheduledJob; | ||||||
private volatile TimeValue pollInterval = TimeValue.timeValueSeconds(1); | ||||||
|
private volatile TimeValue pollInterval = TimeValue.timeValueSeconds(1); | |
private volatile TimeValue pollInterval = TTL_POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY); |
Copilot uses AI. Check for mistakes.
Positive FeedbackNegative Feedback
masseyke marked this conversation as resolved.
Show resolved
Hide resolved
masseyke marked this conversation as resolved.
Show resolved
Hide resolved
masseyke marked this conversation as resolved.
Show resolved
Hide resolved
masseyke marked this conversation as resolved.
Show resolved
Hide resolved
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using randomTimeValue().toHumanReadableString(2) for @timestamp field is problematic. The toHumanReadableString() method returns human-readable format like '1.5s' which is not a valid timestamp format. This should be an ISO timestamp string or epoch milliseconds.
Copilot uses AI. Check for mistakes.