diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 3f2b1c0e1af85..ecf3d9db149f0 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -331,6 +331,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_163_0_00); public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED = def(9_164_0_00); public static final TransportVersion SEARCH_SOURCE_EXCLUDE_INFERENCE_FIELDS_PARAM = def(9_165_0_00); + public static final TransportVersion RANDOM_SAMPLING = def(9_166_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index e8cf3d782e22a..42616240fcd6d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -405,6 +405,15 @@ import org.elasticsearch.rest.action.synonyms.RestGetSynonymsSetsAction; import org.elasticsearch.rest.action.synonyms.RestPutSynonymRuleAction; import org.elasticsearch.rest.action.synonyms.RestPutSynonymsAction; +import org.elasticsearch.sample.GetSampleAction; +import org.elasticsearch.sample.GetSampleStatsAction; +import org.elasticsearch.sample.PutSampleConfigAction; +import org.elasticsearch.sample.RestGetSampleAction; +import org.elasticsearch.sample.RestGetSampleStatsAction; +import org.elasticsearch.sample.RestPutSampleConfigAction; +import org.elasticsearch.sample.TransportGetSampleAction; +import org.elasticsearch.sample.TransportGetSampleStatsAction; +import org.elasticsearch.sample.TransportPutSampleConfigAction; import org.elasticsearch.snapshots.TransportUpdateSnapshotStatusAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.telemetry.TelemetryProvider; @@ -813,6 +822,10 @@ public void reg actions.register(GetSynonymRuleAction.INSTANCE, TransportGetSynonymRuleAction.class); actions.register(DeleteSynonymRuleAction.INSTANCE, TransportDeleteSynonymRuleAction.class); + actions.register(PutSampleConfigAction.INSTANCE, TransportPutSampleConfigAction.class); + actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class); + actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -1040,6 +1053,10 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestPutSynonymRuleAction()); registerHandler.accept(new RestGetSynonymRuleAction()); registerHandler.accept(new RestDeleteSynonymRuleAction()); + + registerHandler.accept(new RestPutSampleConfigAction()); + registerHandler.accept(new RestGetSampleAction(projectIdResolver)); + registerHandler.accept(new RestGetSampleStatsAction(projectIdResolver)); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index f7460dd3de47d..56ef38c85fac4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -64,7 +64,6 @@ */ public abstract class TransportAbstractBulkAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class); - public static final Set STREAMS_ALLOWED_PARAMS = new HashSet<>(9) { { add("error_trace"); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 1c1b5eed6fe95..c90b863fcb5f3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -90,6 +90,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksNodeService; import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.sample.TransportPutSampleConfigAction; import org.elasticsearch.script.ScriptMetadata; import org.elasticsearch.snapshots.RegisteredPolicySnapshots; import org.elasticsearch.snapshots.SnapshotsInfoService; @@ -277,6 +278,12 @@ public static List getNamedWriteables() { PersistentTasksCustomMetadata::new, PersistentTasksCustomMetadata::readDiffFrom ); + registerProjectCustom( + entries, + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME, + TransportPutSampleConfigAction.SamplingConfigCustomMetadata::new, + TransportPutSampleConfigAction.SamplingConfigCustomMetadata::readDiffFrom + ); // Cluster scoped persistent tasks registerMetadataCustom( entries, @@ -357,6 +364,13 @@ public static List getNamedXWriteables() { ClusterPersistentTasksCustomMetadata::fromXContent ) ); + entries.add( + new NamedXContentRegistry.Entry( + Metadata.ProjectCustom.class, + new ParseField(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME), + TransportPutSampleConfigAction.SamplingConfigCustomMetadata::fromXContent + ) + ); entries.add( new NamedXContentRegistry.Entry( Metadata.ProjectCustom.class, diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2265a0343576c..d0726276fda73 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -52,7 +52,6 @@ import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.common.TriConsumer; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -1390,45 +1389,10 @@ private void attemptToSampleData( * We need both the original document and the fully updated document for sampling, so we make a copy of the original * before overwriting it here. We can discard it after sampling. */ - samplingService.maybeSample(projectMetadata, indexRequest.index(), () -> { - IndexRequest original = copyIndexRequestForSampling(indexRequest); - updateIndexRequestMetadata(original, originalDocumentMetadata); - return original; - }, ingestDocument); - + samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument); } } - /** - * Creates a copy of an IndexRequest to be used by random sampling. - * @param original The IndexRequest to be copied - * @return A copy of the IndexRequest - */ - private IndexRequest copyIndexRequestForSampling(IndexRequest original) { - IndexRequest clonedRequest = new IndexRequest(original.index()); - clonedRequest.id(original.id()); - clonedRequest.routing(original.routing()); - clonedRequest.version(original.version()); - clonedRequest.versionType(original.versionType()); - clonedRequest.setPipeline(original.getPipeline()); - clonedRequest.setFinalPipeline(original.getFinalPipeline()); - clonedRequest.setIfSeqNo(original.ifSeqNo()); - clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm()); - clonedRequest.setRefreshPolicy(original.getRefreshPolicy()); - clonedRequest.waitForActiveShards(original.waitForActiveShards()); - clonedRequest.timeout(original.timeout()); - clonedRequest.opType(original.opType()); - clonedRequest.setParentTask(original.getParentTask()); - clonedRequest.setRequireDataStream(original.isRequireDataStream()); - clonedRequest.setRequireAlias(original.isRequireAlias()); - clonedRequest.setIncludeSourceOnError(original.getIncludeSourceOnError()); - BytesReference source = original.source(); - if (source != null) { - clonedRequest.source(source, original.getContentType()); - } - return clonedRequest; - } - private static void executePipeline( final IngestDocument ingestDocument, final Pipeline pipeline, diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 477ef12a5c042..1bd170f73503d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -9,27 +9,178 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.scheduler.SchedulerEngine; +import org.elasticsearch.common.scheduler.TimeValueSchedule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.sample.TransportPutSampleConfigAction; +import org.elasticsearch.script.IngestConditionalScript; +import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParseException; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.LongSupplier; import java.util.function.Supplier; -public class SamplingService implements ClusterStateListener { +public class SamplingService implements ClusterStateListener, SchedulerEngine.Listener { private static final Logger logger = LogManager.getLogger(SamplingService.class); private final ScriptService scriptService; private final ClusterService clusterService; + private final LongSupplier relativeNanoTimeSupplier; + private final MasterServiceTaskQueue updateSamplingConfigTaskQueue; + private final MasterServiceTaskQueue deleteSamplingConfigTaskQueue; + private final Map> samples = new HashMap<>(); + private volatile boolean isMaster = false; + private final SetOnce scheduler = new SetOnce<>(); + private SchedulerEngine.Job scheduledJob; + private final Clock clock; + private final Settings settings; + private volatile TimeValue pollInterval = TimeValue.timeValueMinutes(30); - public SamplingService(ScriptService scriptService, ClusterService clusterService) { + public SamplingService( + ScriptService scriptService, + ClusterService clusterService, + LongSupplier relativeNanoTimeSupplier, + Clock clock, + Settings settings + ) { this.scriptService = scriptService; this.clusterService = clusterService; + this.relativeNanoTimeSupplier = relativeNanoTimeSupplier; + this.clock = clock; + this.settings = settings; + ClusterStateTaskExecutor updateSampleConfigExecutor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + UpdateSampleConfigTask updateSamplingConfigTask, + ClusterState clusterState + ) { + ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateSamplingConfigTask.projectId); + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = projectMetadata.custom( + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME + ); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + projectMetadataBuilder.putCustom( + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME, + new TransportPutSampleConfigAction.SamplingConfigCustomMetadata( + updateSamplingConfigTask.indexName, + updateSamplingConfigTask.rate, + updateSamplingConfigTask.maxSamples, + updateSamplingConfigTask.maxSize, + updateSamplingConfigTask.timeToLive, + updateSamplingConfigTask.condition + ) + ); + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + return new Tuple<>(updatedClusterState, updateSamplingConfigTask); + } + }; + this.updateSamplingConfigTaskQueue = clusterService.createTaskQueue( + "update-data-stream-mappings", + Priority.NORMAL, + updateSampleConfigExecutor + ); + ClusterStateTaskExecutor deleteSampleConfigExecutor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + DeleteSampleConfigTask deleteSamplingConfigTask, + ClusterState clusterState + ) { + ProjectMetadata projectMetadata = clusterState.metadata().getProject(deleteSamplingConfigTask.projectId); + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = projectMetadata.custom( + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME + ); + if (samplingConfig != null) { + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + projectMetadataBuilder.removeCustom(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME); + ClusterState updatedClusterState = ClusterState.builder(clusterState) + .putProjectMetadata(projectMetadataBuilder) + .build(); + logger.info("Removing sampling config " + samplingConfig.indexName + " from cluster state"); + return new Tuple<>(updatedClusterState, deleteSamplingConfigTask); + } else { + return null; // someone beat us to it. This seems like a bad plan TODO + } + } + }; + this.deleteSamplingConfigTaskQueue = clusterService.createTaskQueue( + "delete-data-stream-mappings", + Priority.NORMAL, + deleteSampleConfigExecutor + ); + } + + public void updateSampleConfiguration( + ProjectId projectId, + String index, + double rate, + @Nullable Integer maxSamples, + @Nullable ByteSizeValue maxSize, + @Nullable TimeValue timeToLive, + @Nullable String condition, + @Nullable TimeValue masterNodeTimeout, + @Nullable TimeValue ackTimeout, + ActionListener listener + ) { + updateSamplingConfigTaskQueue.submitTask( + "updating sampling config", + new UpdateSampleConfigTask(projectId, index, rate, maxSamples, maxSize, timeToLive, condition, ackTimeout, listener), + masterNodeTimeout + ); + } + + public void deleteSampleConfiguration(ProjectId projectId, String index) { + logger.info("Calling deleteSampleConfiguration"); + // to be called by the master node + deleteSamplingConfigTaskQueue.submitTask( + "deleting sampling config", + new DeleteSampleConfigTask(projectId, index, TimeValue.THIRTY_SECONDS, ActionListener.noop()), + TimeValue.THIRTY_SECONDS + ); } /** @@ -38,7 +189,7 @@ public SamplingService(ScriptService scriptService, ClusterService clusterServic * @param indexRequest The raw request to potentially sample */ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest) { - maybeSample(projectMetadata, indexRequest.index(), () -> indexRequest, () -> { + maybeSample(projectMetadata, indexRequest.index(), indexRequest, () -> { Map sourceAsMap; try { sourceAsMap = indexRequest.sourceAsMap(); @@ -57,37 +208,449 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque }); } - /** - * - * @param projectMetadata Used to get the sampling configuration - * @param indexRequestSupplier A supplier for the raw request to potentially sample - * @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration - */ - public void maybeSample( + private void maybeSample( ProjectMetadata projectMetadata, String indexName, - Supplier indexRequestSupplier, - IngestDocument ingestDocument + IndexRequest indexRequest, + Supplier ingestDocumentSupplier ) { - maybeSample(projectMetadata, indexName, indexRequestSupplier, () -> ingestDocument); + long startTime = relativeNanoTimeSupplier.getAsLong(); + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = projectMetadata.custom( + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME + ); + ProjectId projectId = projectMetadata.id(); + if (samplingConfig != null) { + String samplingIndex = samplingConfig.indexName; + if (samplingIndex.equals(indexName)) { + SoftReference sampleInfoReference = samples.compute( + new ProjectIndex(projectId, samplingIndex), + (k, v) -> v == null || v.get() == null + ? new SoftReference<>(new SampleInfo(samplingConfig.timeToLive, relativeNanoTimeSupplier.getAsLong())) + : v + ); + SampleInfo sampleInfo = sampleInfoReference.get(); + if (sampleInfo != null) { + SampleStats stats = sampleInfo.stats; + stats.potentialSamples.increment(); + try { + if (sampleInfo.getSamples().size() < samplingConfig.maxSamples) { + if (Math.random() < samplingConfig.rate) { + String condition = samplingConfig.condition; + if (condition != null) { + if (sampleInfo.script == null || sampleInfo.factory == null) { + // We don't want to pay for synchronization because worst case, we compile the script twice + long compileScriptStartTime = relativeNanoTimeSupplier.getAsLong(); + try { + if (sampleInfo.compilationFailed) { + // we don't want to waste time + stats.samplesRejectedForException.increment(); + return; + } else { + Script script = getScript(condition); + sampleInfo.setScript( + script, + scriptService.compile(script, IngestConditionalScript.CONTEXT) + ); + } + } catch (Exception e) { + sampleInfo.compilationFailed = true; + throw e; + } finally { + stats.timeCompilingCondition.add( + (relativeNanoTimeSupplier.getAsLong() - compileScriptStartTime) + ); + } + } + } + long conditionStartTime = relativeNanoTimeSupplier.getAsLong(); + if (condition == null + || evaluateCondition( + ingestDocumentSupplier.get(), + sampleInfo.script, + sampleInfo.factory, + sampleInfo.stats + )) { + stats.timeEvaluatingCondition.add((relativeNanoTimeSupplier.getAsLong() - conditionStartTime)); + Sample sample = getSampleForIndexRequest(projectId, indexName, indexRequest); + sampleInfo.getSamples().add(sample); + stats.samples.increment(); + logger.info("Sampling " + indexRequest); + } else { + stats.samplesRejectedForCondition.increment(); + } + } else { + stats.samplesRejectedForRate.increment(); + } + } else { + stats.samplesRejectedForSize.increment(); + } + } catch (Exception e) { + stats.samplesRejectedForException.increment(); + stats.lastException = e; + logger.info("Error performing sampling for " + samplingIndex, e); + } finally { + stats.timeSampling.add((relativeNanoTimeSupplier.getAsLong() - startTime)); + logger.info("********* Stats: " + stats); + } + } + } + } + // checkTTLs(); // TODO make this happen less often? } - private void maybeSample( - ProjectMetadata projectMetadata, - String indexName, - Supplier indexRequest, - Supplier ingestDocumentSupplier + public List getSamples(ProjectId projectId, String index) { + SoftReference sampleInfoReference = samples.get(new ProjectIndex(projectId, index)); + SampleInfo sampleInfo = sampleInfoReference.get(); + return sampleInfo == null ? List.of() : sampleInfo.getSamples(); + } + + public SampleStats getSampleStats(ProjectId projectId, String index) { + SoftReference sampleInfoReference = samples.get(new ProjectIndex(projectId, index)); + SampleInfo sampleInfo = sampleInfoReference.get(); + return sampleInfo == null ? new SampleStats() : sampleInfo.stats; + } + + public TransportPutSampleConfigAction.SamplingConfigCustomMetadata getSampleConfig(ProjectMetadata projectMetadata, String index) { + TransportPutSampleConfigAction.SamplingConfigCustomMetadata sampleConfig = projectMetadata.custom( + TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME + ); + if (sampleConfig != null && sampleConfig.indexName.equals(index)) { + return sampleConfig; + } + return null; + } + + private boolean evaluateCondition( + IngestDocument ingestDocument, + Script script, + IngestConditionalScript.Factory factory, + SampleStats stats ) { - // TODO Sampling logic to go here in the near future + return factory.newInstance(script.getParams(), ingestDocument.getUnmodifiableSourceAndMetadata()).execute(); + } + + private static Script getScript(String conditional) throws IOException { + logger.info("Parsing script for conditional " + conditional); + try ( + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(Map.of("source", conditional)); + XContentParser parser = XContentHelper.createParserNotCompressed( + LoggingDeprecationHandler.XCONTENT_PARSER_CONFIG, + BytesReference.bytes(builder), + XContentType.JSON + ) + ) { + return Script.parse(parser); + } + } + + private synchronized void maybeScheduleJob() { + if (this.isMaster) { + if (scheduler.get() == null) { + // don't create scheduler if the node is shutting down + if (isClusterServiceStoppedOrClosed() == false) { + scheduler.set(new SchedulerEngine(settings, clock)); + scheduler.get().register(this); + } + } + + // scheduler could be null if the node might be shutting down + if (scheduler.get() != null) { + scheduledJob = new SchedulerEngine.Job("sampling_ttl", new TimeValueSchedule(pollInterval)); + scheduler.get().add(scheduledJob); + } + } + } + + private void cancelJob() { + if (scheduler.get() != null) { + scheduler.get().remove("sampling_ttl"); + scheduledJob = null; + } + } + + private boolean isClusterServiceStoppedOrClosed() { + final Lifecycle.State state = clusterService.lifecycleState(); + return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED; + } + + /** + * + * @param projectMetadata Used to get the sampling configuration + * @param indexRequest The raw request to potentially sample + * @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration + */ + public void maybeSample(ProjectMetadata projectMetadata, String indexName, IndexRequest indexRequest, IngestDocument ingestDocument) { + maybeSample(projectMetadata, indexName, indexRequest, () -> ingestDocument); } public boolean atLeastOneSampleConfigured() { - return false; // TODO Return true if there is at least one sample in the cluster state + return true; // TODO Return true if there is at least one sample in the cluster state } @Override public void clusterChanged(ClusterChangedEvent event) { - // TODO: React to sampling config changes + final boolean prevIsMaster = this.isMaster; + if (prevIsMaster != event.localNodeMaster()) { + this.isMaster = event.localNodeMaster(); + if (this.isMaster) { + // we weren't the master, and now we are + maybeScheduleJob(); + } else { + // we were the master, and now we aren't + cancelJob(); + } + } + if (event.metadataChanged()) { + for (ProjectMetadata projectMetadata : event.state().metadata().projects().values()) { + ProjectId projectId = projectMetadata.id(); + if (event.customMetadataChanged(projectId, TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME)) { + TransportPutSampleConfigAction.SamplingConfigCustomMetadata oldSamplingConfig = event.previousState() + .projectState(projectId) + .metadata() + .custom(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME); + TransportPutSampleConfigAction.SamplingConfigCustomMetadata newSamplingConfig = event.state() + .projectState(projectId) + .metadata() + .custom(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME); + if (newSamplingConfig == null && oldSamplingConfig != null) { + logger.info("Removing sampling config info from buffer because it has been deleted from cluster state"); + samples.remove(new ProjectIndex(projectId, oldSamplingConfig.indexName)); + } else if (newSamplingConfig != null && newSamplingConfig.equals(oldSamplingConfig) == false) { + samples.put( + new ProjectIndex(projectId, newSamplingConfig.indexName), + new SoftReference<>(new SampleInfo(newSamplingConfig.timeToLive, relativeNanoTimeSupplier.getAsLong())) + ); + } + } + } + } + } + + private void checkTTLs() { + long now = relativeNanoTimeSupplier.getAsLong(); + Set projectIndices = samples.keySet(); + for (ProjectIndex projectIndex : projectIndices) { + SoftReference sampleInfoReference = samples.get(projectIndex); + SampleInfo sampleInfo = sampleInfoReference.get(); + if (sampleInfo != null && sampleInfo.expiration < now) { + deleteSampleConfiguration(projectIndex.projectId, projectIndex.indexName); + // samples.remove(new ProjectIndex(projectIndex.projectId, projectIndex.indexName)); + } + } } + @Override + public void triggered(SchedulerEngine.Event event) { + logger.info("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime()); + checkTTLs(); + } + + private static final class SampleInfo { + private final List samples; + private final SampleStats stats; + private final long expiration; + private volatile Script script; + private volatile IngestConditionalScript.Factory factory; + private volatile boolean compilationFailed = false; + + SampleInfo(TimeValue timeToLive, long relativeNowNanos) { + this.samples = new ArrayList<>(); + this.stats = new SampleStats(); + this.expiration = (timeToLive == null ? TimeValue.timeValueDays(5).nanos() : timeToLive.nanos()) + relativeNowNanos; + } + + public List getSamples() { + return samples; + } + + void setScript(Script script, IngestConditionalScript.Factory factory) { + this.script = script; + this.factory = factory; + } + } + + public static final class SampleStats implements Writeable, ToXContent { + LongAdder potentialSamples = new LongAdder(); + public LongAdder samplesRejectedForSize = new LongAdder(); + LongAdder samplesRejectedForCondition = new LongAdder(); + LongAdder samplesRejectedForRate = new LongAdder(); + LongAdder samplesRejectedForException = new LongAdder(); + public LongAdder samples = new LongAdder(); + LongAdder timeSampling = new LongAdder(); + LongAdder timeEvaluatingCondition = new LongAdder(); + LongAdder timeCompilingCondition = new LongAdder(); + Exception lastException = null; + + public SampleStats() {} + + public SampleStats(StreamInput in) throws IOException { + potentialSamples.add(in.readLong()); + samplesRejectedForSize.add(in.readLong()); + samplesRejectedForCondition.add(in.readLong()); + samplesRejectedForRate.add(in.readLong()); + samplesRejectedForException.add(in.readLong()); + samples.add(in.readLong()); + timeSampling.add(in.readLong()); + timeEvaluatingCondition.add(in.readLong()); + timeCompilingCondition.add(in.readLong()); + if (in.readBoolean()) { + lastException = in.readException(); + } else { + lastException = null; + } + } + + @Override + public String toString() { + return "potential_samples: " + + potentialSamples + + ", samples_rejected_for_size: " + + samplesRejectedForSize + + ", samples_rejected_for_condition: " + + samplesRejectedForCondition + + ", samples_rejected_for_rate: " + + samplesRejectedForRate + + ", samples_rejected_for_exception: " + + samplesRejectedForException + + ", samples_accepted: " + + samples + + ", time_sampling: " + + (timeSampling.longValue() / 1000000) + + ", time_evaluating_condition: " + + (timeEvaluatingCondition.longValue() / 1000000) + + ", time_compiling_condition: " + + (timeCompilingCondition.longValue() / 1000000); + } + + public SampleStats combine(SampleStats other) { + SampleStats result = new SampleStats(); + result.potentialSamples.add(this.potentialSamples.longValue()); + result.potentialSamples.add(other.potentialSamples.longValue()); + result.samplesRejectedForSize.add(this.samplesRejectedForSize.longValue()); + result.samplesRejectedForSize.add(other.samplesRejectedForSize.longValue()); + result.samplesRejectedForCondition.add(this.samplesRejectedForCondition.longValue()); + result.samplesRejectedForCondition.add(other.samplesRejectedForCondition.longValue()); + result.samplesRejectedForRate.add(this.samplesRejectedForRate.longValue()); + result.samplesRejectedForRate.add(other.samplesRejectedForRate.longValue()); + result.samplesRejectedForException.add(this.samplesRejectedForException.longValue()); + result.samplesRejectedForException.add(other.samplesRejectedForException.longValue()); + result.samples.add(this.samples.longValue()); + result.samples.add(other.samples.longValue()); + result.timeSampling.add(this.timeSampling.longValue()); + result.timeSampling.add(other.timeSampling.longValue()); + result.timeEvaluatingCondition.add(this.timeEvaluatingCondition.longValue()); + result.timeEvaluatingCondition.add(other.timeEvaluatingCondition.longValue()); + result.timeCompilingCondition.add(this.timeCompilingCondition.longValue()); + result.timeCompilingCondition.add(other.timeCompilingCondition.longValue()); + result.lastException = this.lastException != null ? this.lastException : other.lastException; + return result; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("potential_samples", potentialSamples.longValue()); + builder.field("samples_rejected_for_size", samplesRejectedForSize.longValue()); + builder.field("samples_rejected_for_condition", samplesRejectedForCondition.longValue()); + builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue()); + builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue()); + builder.field("samples_accepted", samples.longValue()); + builder.field("time_sampling", (timeSampling.longValue() / 1000000)); + builder.field("time_evaluating_condition", (timeEvaluatingCondition.longValue() / 1000000)); + builder.field("time_compiling_condition", (timeCompilingCondition.longValue() / 1000000)); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(potentialSamples.longValue()); + out.writeLong(samplesRejectedForSize.longValue()); + out.writeLong(samplesRejectedForCondition.longValue()); + out.writeLong(samplesRejectedForRate.longValue()); + out.writeLong(samplesRejectedForException.longValue()); + out.writeLong(samples.longValue()); + out.writeLong(timeSampling.longValue()); + out.writeLong(timeEvaluatingCondition.longValue()); + out.writeLong(timeCompilingCondition.longValue()); + if (lastException == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeException(lastException); + } + } + } + + static class UpdateSampleConfigTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + private final String indexName; + private final double rate; + private final Integer maxSamples; + private final ByteSizeValue maxSize; + private final TimeValue timeToLive; + private final String condition; + + UpdateSampleConfigTask( + ProjectId projectId, + String indexName, + double rate, + Integer maxSamples, + ByteSizeValue maxSize, + TimeValue timeToLive, + String condition, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener); + this.projectId = projectId; + this.indexName = indexName; + this.rate = rate; + this.maxSamples = maxSamples; + this.maxSize = maxSize; + this.timeToLive = timeToLive; + this.condition = condition; + } + } + + static class DeleteSampleConfigTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + final String indexName; + + DeleteSampleConfigTask(ProjectId projectId, String indexName, TimeValue ackTimeout, ActionListener listener) { + super(ackTimeout, listener); + this.projectId = projectId; + this.indexName = indexName; + } + } + + record ProjectIndex(ProjectId projectId, String indexName) {}; + + public record Sample(ProjectId projectId, String indexName, byte[] source, XContentType contentType) implements Writeable { + + public Sample(StreamInput in) throws IOException { + this(ProjectId.readFrom(in), in.readString(), in.readByteArray(), in.readEnum(XContentType.class)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + projectId.writeTo(out); + out.writeString(indexName); + out.writeBytes(source); + out.writeEnum(contentType); + } + } + + private Sample getSampleForIndexRequest(ProjectId projectId, String indexName, IndexRequest indexRequest) { + BytesReference sourceReference = indexRequest.source(); + final byte[] sourceCopy; + if (sourceReference == null) { + sourceCopy = null; + } else { + byte[] source = sourceReference.array(); + sourceCopy = new byte[sourceReference.length()]; + System.arraycopy(source, sourceReference.arrayOffset(), sourceCopy, 0, sourceReference.length()); + } + return new Sample(projectId, indexName, sourceCopy, indexRequest.getContentType()); + } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 57f3dda579819..e29588ccb257f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -235,6 +235,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -716,8 +717,7 @@ private void construct( modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class)); - - SamplingService samplingService = new SamplingService(scriptService, clusterService); + SamplingService samplingService = new SamplingService(scriptService, clusterService, System::nanoTime, Clock.systemUTC(), settings); modules.bindToInstance(SamplingService.class, samplingService); clusterService.addListener(samplingService); diff --git a/server/src/main/java/org/elasticsearch/sample/GetSampleAction.java b/server/src/main/java/org/elasticsearch/sample/GetSampleAction.java new file mode 100644 index 0000000000000..cd7d36b2537ea --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/GetSampleAction.java @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.common.collect.Iterators.single; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk; + +public class GetSampleAction extends ActionType { + + public static final GetSampleAction INSTANCE = new GetSampleAction(); + public static final String NAME = "indices:admin/sample"; + + private GetSampleAction() { + super(NAME); + } + + public static class Response extends BaseNodesResponse implements Writeable, ChunkedToXContent { + private final int maxSize; + + public Response(StreamInput in) throws IOException { + super(in); + maxSize = in.readInt(); + } + + public Response(ClusterName clusterName, List nodes, List failures, int maxSize) { + super(clusterName, nodes, failures); + this.maxSize = maxSize; + } + + public List getSamples() { + return getNodes().stream().map(n -> n.samples).filter(Objects::nonNull).flatMap(Collection::stream).limit(maxSize).toList(); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + chunk((builder, p) -> builder.startObject().startArray("samples")), + Iterators.flatMap(getSamples().iterator(), sample -> single((builder, params1) -> { + Map sourceAsMap = XContentHelper.convertToMap( + sample.contentType().xContent(), + sample.source(), + 0, + sample.source().length, + false + ); + builder.value(sourceAsMap); + return builder; + })), + chunk((builder, p) -> builder.endArray().endObject()) + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return Objects.equals(getNodes(), that.getNodes()) && Objects.equals(failures(), that.failures()); + } + + @Override + public int hashCode() { + return Objects.hash(getNodes(), failures()); + } + + } + + public static class NodeResponse extends BaseNodeResponse { + private final List samples; + + protected NodeResponse(StreamInput in) throws IOException { + super(in); + samples = in.readCollectionAsList(SamplingService.Sample::new); + } + + protected NodeResponse(DiscoveryNode node, List samples) { + super(node); + this.samples = samples; + } + + public List getSamples() { + return samples; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeCollection(samples); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeResponse that = (NodeResponse) o; + return samples.equals(that.samples); + } + + @Override + public int hashCode() { + return Objects.hash(samples); + } + } + + public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable { + private final ProjectId projectId; + private String[] names; + + public Request(ProjectId projectId, String[] names) { + super((String[]) null); + this.projectId = projectId; + this.names = names; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public ActionRequestValidationException validate() { + if (this.indices().length != 1) { + return new ActionRequestValidationException(); + } + return null; + } + + public ProjectId getProjectId() { + return projectId; + } + + @Override + public IndicesRequest indices(String... indices) { + this.names = indices; + return this; + } + + @Override + public String[] indices() { + return names; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.DEFAULT; + } + } + + public static class NodeRequest extends AbstractTransportRequest { + private final ProjectId projectId; + private final String index; + + public NodeRequest(ProjectId projectId, String index) { + this.projectId = projectId; + this.index = index; + } + + public NodeRequest(StreamInput in) throws IOException { + super(in); + this.projectId = ProjectId.readFrom(in); + this.index = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + projectId.writeTo(out); + out.writeString(index); + } + + public ProjectId getProjectId() { + return projectId; + } + + public String getIndex() { + return index; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/sample/GetSampleStatsAction.java new file mode 100644 index 0000000000000..151dc0c8d6e6c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/GetSampleStatsAction.java @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class GetSampleStatsAction extends ActionType { + + public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction(); + public static final String NAME = "indices:admin/sample/stats"; + + private GetSampleStatsAction() { + super(NAME); + } + + public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { + private final int maxSize; + + public Response(StreamInput in) throws IOException { + super(in); + maxSize = in.readInt(); + } + + public Response( + ClusterName clusterName, + List nodes, + List failures, + int maxSize + ) { + super(clusterName, nodes, failures); + this.maxSize = maxSize; + } + + public SamplingService.SampleStats getSampleStats() { + SamplingService.SampleStats rawStats = getRawSampleStats(); + if (rawStats.samples.longValue() > maxSize) { + SamplingService.SampleStats filteredStats = new SamplingService.SampleStats().combine(rawStats); + filteredStats.samples.add(maxSize - rawStats.samples.longValue()); + filteredStats.samplesRejectedForSize.add(rawStats.samples.longValue() - maxSize); + return filteredStats; + } else { + return rawStats; + } + } + + private SamplingService.SampleStats getRawSampleStats() { + return getNodes().stream() + .map(n -> n.sampleStats) + .filter(Objects::nonNull) + .reduce(SamplingService.SampleStats::combine) + .orElse(new SamplingService.SampleStats()); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetSampleStatsAction.Response that = (GetSampleStatsAction.Response) o; + return Objects.equals(getNodes(), that.getNodes()) && Objects.equals(failures(), that.failures()); + } + + @Override + public int hashCode() { + return Objects.hash(getNodes(), failures()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return getSampleStats().toXContent(builder, params); + } + } + + public static class NodeResponse extends BaseNodeResponse { + private final SamplingService.SampleStats sampleStats; + + protected NodeResponse(StreamInput in) throws IOException { + super(in); + sampleStats = new SamplingService.SampleStats(in); + } + + protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) { + super(node); + this.sampleStats = sampleStats; + } + + public SamplingService.SampleStats getSampleStats() { + return sampleStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sampleStats.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetSampleStatsAction.NodeResponse that = (GetSampleStatsAction.NodeResponse) o; + return sampleStats.equals(that.sampleStats); // TODO + } + + @Override + public int hashCode() { + return Objects.hash(sampleStats); // TODO + } + } + + public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable { + private final ProjectId projectId; + private String[] names; + + public Request(ProjectId projectId, String[] names) { + super((String[]) null); + this.projectId = projectId; + this.names = names; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public ActionRequestValidationException validate() { + if (this.indices().length != 1) { + return new ActionRequestValidationException(); + } + return null; + } + + public ProjectId getProjectId() { + return projectId; + } + + @Override + public IndicesRequest indices(String... indices) { + this.names = indices; + return this; + } + + @Override + public String[] indices() { + return names; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.DEFAULT; + } + } + + public static class NodeRequest extends AbstractTransportRequest { + private final ProjectId projectId; + private final String index; + + public NodeRequest(ProjectId projectId, String index) { + this.projectId = projectId; + this.index = index; + } + + public NodeRequest(StreamInput in) throws IOException { + super(in); + this.projectId = ProjectId.readFrom(in); + this.index = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + projectId.writeTo(out); + out.writeString(index); + } + + public ProjectId getProjectId() { + return projectId; + } + + public String getIndex() { + return index; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/PutSampleConfigAction.java b/server/src/main/java/org/elasticsearch/sample/PutSampleConfigAction.java new file mode 100644 index 0000000000000..5467e615fe59c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/PutSampleConfigAction.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +public class PutSampleConfigAction extends ActionType { + public static final String NAME = "indices:admin/sample/config/update"; + public static final PutSampleConfigAction INSTANCE = new PutSampleConfigAction(); + + public PutSampleConfigAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final double rate; + private final Integer maxSamples; + private final ByteSizeValue maxSize; + private final TimeValue timeToLive; + private final String condition; + private String[] indices = Strings.EMPTY_ARRAY; + + public Request( + double rate, + @Nullable Integer maxSamples, + @Nullable ByteSizeValue maxSize, + @Nullable TimeValue timeToLive, + @Nullable String condition, + @Nullable TimeValue masterNodeTimeout, + @Nullable TimeValue ackTimeout + ) { + super(masterNodeTimeout, ackTimeout); + this.rate = rate; + this.maxSamples = maxSamples; + this.maxSize = maxSize; + this.timeToLive = timeToLive; + this.condition = condition; + } + + public double getRate() { + return rate; + } + + public Integer getMaxSamples() { + return maxSamples; + } + + public ByteSizeValue getMaxSize() { + return maxSize; + } + + public TimeValue getTimeToLive() { + return timeToLive; + } + + public String getCondition() { + return condition; + } + + @Override + public PutSampleConfigAction.Request indices(String... dataStreamNames) { + this.indices = dataStreamNames; + return this; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.indices = in.readStringArray(); + this.rate = in.readDouble(); + this.maxSamples = in.readOptionalInt(); + this.maxSize = in.readOptionalWriteable(ByteSizeValue::readFrom); + this.timeToLive = in.readOptionalTimeValue(); + this.condition = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeDouble(rate); + out.writeOptionalInt(maxSamples); + out.writeOptionalWriteable(maxSize); + out.writeOptionalTimeValue(timeToLive); + out.writeOptionalString(condition); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutSampleConfigAction.Request request = (PutSampleConfigAction.Request) o; + return Arrays.equals(indices, request.indices()) + && rate == request.rate + && Objects.equals(maxSamples, request.maxSamples) + && Objects.equals(maxSize, request.maxSize) + && Objects.equals(timeToLive, request.timeToLive) + && Objects.equals(condition, request.condition); + } + + @Override + public int hashCode() { + return Objects.hash( + Arrays.hashCode(indices), + rate, + maxSamples, + maxSize, + timeToLive, + condition, + masterNodeTimeout(), + ackTimeout() + ); + } + + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/RestGetSampleAction.java b/server/src/main/java/org/elasticsearch/sample/RestGetSampleAction.java new file mode 100644 index 0000000000000..ecfd783874f8e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/RestGetSampleAction.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.project.ProjectIdResolver; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetSampleAction extends BaseRestHandler { + private final ProjectIdResolver projectIdResolver; + + public RestGetSampleAction(ProjectIdResolver projectIdResolver) { + this.projectIdResolver = projectIdResolver; + } + + @Override + public String getName() { + return "get_sample"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_sample/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + GetSampleAction.Request getSampleRequest = new GetSampleAction.Request( + projectIdResolver.getProjectId(), + new String[] { request.param("name") } + ); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetSampleAction.INSTANCE, + getSampleRequest, + new RestRefCountedChunkedToXContentListener<>(channel) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/RestGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/sample/RestGetSampleStatsAction.java new file mode 100644 index 0000000000000..e432622340618 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/RestGetSampleStatsAction.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.project.ProjectIdResolver; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetSampleStatsAction extends BaseRestHandler { + private final ProjectIdResolver projectIdResolver; + + public RestGetSampleStatsAction(ProjectIdResolver projectIdResolver) { + this.projectIdResolver = projectIdResolver; + } + + @Override + public String getName() { + return "get_sample_stats"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_sample/{name}/stats")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + GetSampleStatsAction.Request getSampleRequest = new GetSampleStatsAction.Request( + projectIdResolver.getProjectId(), + new String[] { request.param("name") } + ); + return channel -> client.execute(GetSampleStatsAction.INSTANCE, getSampleRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/RestPutSampleConfigAction.java b/server/src/main/java/org/elasticsearch/sample/RestPutSampleConfigAction.java new file mode 100644 index 0000000000000..2adb036362304 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/RestPutSampleConfigAction.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +@ServerlessScope(Scope.PUBLIC) +public class RestPutSampleConfigAction extends BaseRestHandler { + @Override + public String getName() { + return "put_sample_config"; + } + + @Override + public List routes() { + return List.of(new Route(PUT, "/_sample/{name}/_config")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig; + try (XContentParser parser = request.contentParser()) { + samplingConfig = TransportPutSampleConfigAction.SamplingConfigCustomMetadata.fromXContent(parser); + } + PutSampleConfigAction.Request putSampleConfigRequest = new PutSampleConfigAction.Request( + samplingConfig.rate, + samplingConfig.maxSamples, + samplingConfig.maxSize, + samplingConfig.timeToLive, + samplingConfig.condition, + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request) + ).indices(Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + PutSampleConfigAction.INSTANCE, + putSampleConfigRequest, + new RestToXContentListener(channel) + ); + } + + static class RestToXContentListener extends RestBuilderListener { + + RestToXContentListener(RestChannel channel) { + super(channel); + } + + @Override + public RestResponse buildResponse(AcknowledgedResponse response, XContentBuilder builder) throws Exception { + response.toXContent(builder, channel.request()); + return new RestResponse(RestStatus.OK, builder); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/TransportGetSampleAction.java b/server/src/main/java/org/elasticsearch/sample/TransportGetSampleAction.java new file mode 100644 index 0000000000000..580e0df7d41fb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/TransportGetSampleAction.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.sample.GetSampleAction.NodeRequest; +import static org.elasticsearch.sample.GetSampleAction.NodeResponse; +import static org.elasticsearch.sample.GetSampleAction.Request; +import static org.elasticsearch.sample.GetSampleAction.Response; + +public class TransportGetSampleAction extends TransportNodesAction { + private final SamplingService samplingService; + + @Inject + public TransportGetSampleAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + SamplingService samplingService + ) { + super( + GetSampleAction.NAME, + clusterService, + transportService, + actionFilters, + NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.samplingService = samplingService; + } + + @SuppressWarnings("checkstyle:LineLength") + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = samplingService.getSampleConfig( + clusterService.state().projectState(request.getProjectId()).metadata(), + request.indices()[0] + ); + int maxSamples = samplingConfig == null ? 0 : samplingConfig.maxSamples; + return new Response(clusterService.getClusterName(), nodeResponses, failures, maxSamples); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request.getProjectId(), request.indices()[0]); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + ProjectId projectId = request.getProjectId(); + String index = request.getIndex(); + List samples = samplingService.getSamples(projectId, index); + return new NodeResponse(transportService.getLocalNode(), samples == null ? List.of() : samples); + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/sample/TransportGetSampleStatsAction.java new file mode 100644 index 0000000000000..1c3c2abc7445f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/TransportGetSampleStatsAction.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.sample.GetSampleStatsAction.NodeRequest; +import static org.elasticsearch.sample.GetSampleStatsAction.NodeResponse; +import static org.elasticsearch.sample.GetSampleStatsAction.Request; +import static org.elasticsearch.sample.GetSampleStatsAction.Response; + +public class TransportGetSampleStatsAction extends TransportNodesAction { + private final SamplingService samplingService; + + @Inject + public TransportGetSampleStatsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + SamplingService samplingService + ) { + super( + GetSampleStatsAction.NAME, + clusterService, + transportService, + actionFilters, + NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.samplingService = samplingService; + } + + @SuppressWarnings("checkstyle:LineLength") + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = samplingService.getSampleConfig( + clusterService.state().projectState(request.getProjectId()).metadata(), + request.indices()[0] + ); + int maxSamples = samplingConfig == null ? 0 : samplingConfig.maxSamples; + return new Response(clusterService.getClusterName(), nodeResponses, failures, maxSamples); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request.getProjectId(), request.indices()[0]); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + SamplingService.SampleStats sampleStats = samplingService.getSampleStats(request.getProjectId(), request.getIndex()); + return new NodeResponse(transportService.getLocalNode(), sampleStats); + } +} diff --git a/server/src/main/java/org/elasticsearch/sample/TransportPutSampleConfigAction.java b/server/src/main/java/org/elasticsearch/sample/TransportPutSampleConfigAction.java new file mode 100644 index 0000000000000..ab92d13abe905 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/sample/TransportPutSampleConfigAction.java @@ -0,0 +1,267 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.sample; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Objects; + +import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS; +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class TransportPutSampleConfigAction extends AcknowledgedTransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutSampleConfigAction.class); + private final ProjectResolver projectResolver; + private final SamplingService samplingService; + + @Inject + public TransportPutSampleConfigAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + SamplingService samplingService + ) { + super( + PutSampleConfigAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PutSampleConfigAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.samplingService = samplingService; + } + + @Override + protected void masterOperation( + Task task, + PutSampleConfigAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + ProjectId projectId = projectResolver.getProjectId(); + samplingService.updateSampleConfiguration( + projectId, + request.indices()[0], + request.getRate(), + request.getMaxSamples(), + request.getMaxSize(), + request.getTimeToLive(), + request.getCondition(), + request.masterNodeTimeout(), + request.ackTimeout(), + listener + ); + state.projectState(projectId).metadata().custom("sample_config"); + } + + @Override + protected ClusterBlockException checkBlock(PutSampleConfigAction.Request request, ClusterState state) { + return null; + } + + public static final class SamplingConfigCustomMetadata extends AbstractNamedDiffable + implements + Metadata.ProjectCustom { + public final String indexName; + public final double rate; + public final Integer maxSamples; + public final ByteSizeValue maxSize; + public final TimeValue timeToLive; + public final String condition; + + public static final String NAME = "sampling_config"; + public static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); + public static final ParseField RATE_FIELD = new ParseField("rate"); + public static final ParseField MAX_SAMPLES_FIELD = new ParseField("max_samples"); + public static final ParseField MAX_SIZE_FIELD = new ParseField("max_size"); + public static final ParseField TIME_TO_LIVE_FIELD = new ParseField("time_to_live"); + public static final ParseField CONDITION_FIELD = new ParseField("condition"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + args -> new SamplingConfigCustomMetadata( + (String) args[0], + (double) args[1], + (Integer) args[2], + (Long) args[3], + args[4] == null ? null : TimeValue.timeValueMillis((long) args[4]), + (String) args[5] + ) + ); + + static { + PARSER.declareString(constructorArg(), INDEX_NAME_FIELD); + PARSER.declareDouble(constructorArg(), RATE_FIELD); + PARSER.declareInt(optionalConstructorArg(), MAX_SAMPLES_FIELD); + PARSER.declareLong(optionalConstructorArg(), MAX_SIZE_FIELD); + PARSER.declareLong(optionalConstructorArg(), TIME_TO_LIVE_FIELD); + PARSER.declareString(optionalConstructorArg(), CONDITION_FIELD); + } + + public SamplingConfigCustomMetadata( + String indexName, + double rate, + Integer maxSamples, + ByteSizeValue maxSize, + TimeValue timeToLive, + String condition + ) { + this.indexName = indexName; + this.rate = rate; + this.maxSamples = maxSamples; + this.maxSize = maxSize; + this.timeToLive = timeToLive; + this.condition = condition; + } + + public SamplingConfigCustomMetadata( + String indexName, + double rate, + Integer maxSamples, + Long maxSizeInBytes, + TimeValue timeToLive, + String condition + ) { + this( + indexName, + rate, + maxSamples, + maxSizeInBytes == null ? null : ByteSizeValue.of(maxSizeInBytes, ByteSizeUnit.BYTES), + timeToLive, + condition + ); + } + + public SamplingConfigCustomMetadata(StreamInput in) throws IOException { + this( + in.readString(), + in.readDouble(), + in.readOptionalInt(), + in.readOptionalLong(), + in.readOptionalTimeValue(), + in.readOptionalString() + ); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ProjectCustom.class, NAME, in); + } + + @Override + public EnumSet context() { + return ALL_CONTEXTS; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.RANDOM_SAMPLING; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeDouble(rate); + out.writeOptionalInt(maxSamples); + out.writeOptionalLong(maxSize == null ? null : maxSize.getBytes()); + out.writeOptionalTimeValue(timeToLive); + out.writeOptionalString(condition); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.single((b, p) -> { + // b.startObject(); + b.field(INDEX_NAME_FIELD.getPreferredName(), indexName); + b.field(RATE_FIELD.getPreferredName(), rate); + if (maxSamples != null) { + b.field(MAX_SAMPLES_FIELD.getPreferredName(), maxSamples); + } + if (maxSize != null) { + b.field(MAX_SIZE_FIELD.getPreferredName(), maxSize.getBytes()); + } + if (timeToLive != null) { + b.field(TIME_TO_LIVE_FIELD.getPreferredName(), timeToLive.millis()); + } + if (condition != null) { + b.field(CONDITION_FIELD.getPreferredName(), condition); + } + return b; + }); + } + + public static SamplingConfigCustomMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object other) { + if (other instanceof SamplingConfigCustomMetadata otherConfig) { + return Objects.equals(indexName, otherConfig.indexName) + && rate == otherConfig.rate + && Objects.equals(maxSamples, otherConfig.maxSamples) + && Objects.equals(maxSize, otherConfig.maxSize) + && Objects.equals(timeToLive, otherConfig.timeToLive) + && Objects.equals(condition, otherConfig.condition); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(indexName, rate, maxSamples, maxSize, timeToLive, condition); + } + } +} diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000 diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index f2634a19d1068..69a6a7cbbf574 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -656,6 +656,9 @@ public class Constants { "indices:admin/index/create_from_source", "indices:admin/index/copy_lifecycle_index_metadata", "internal:admin/repository/verify", - "internal:admin/repository/verify/coordinate" + "internal:admin/repository/verify/coordinate", + "indices:admin/sample/stats", + "indices:admin/sample/config/update", + "indices:admin/sample" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); }