99
1010package org .elasticsearch .ingest ;
1111
12+ import org .apache .lucene .util .SetOnce ;
1213import org .elasticsearch .ResourceNotFoundException ;
1314import org .elasticsearch .action .ActionListener ;
1415import org .elasticsearch .action .admin .indices .sampling .SamplingConfiguration ;
2930import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
3031import org .elasticsearch .common .Priority ;
3132import org .elasticsearch .common .bytes .BytesReference ;
33+ import org .elasticsearch .common .component .AbstractLifecycleComponent ;
34+ import org .elasticsearch .common .component .Lifecycle ;
35+ import org .elasticsearch .common .component .LifecycleListener ;
3236import org .elasticsearch .common .io .stream .StreamInput ;
3337import org .elasticsearch .common .io .stream .StreamOutput ;
3438import org .elasticsearch .common .io .stream .Writeable ;
39+ import org .elasticsearch .common .scheduler .SchedulerEngine ;
40+ import org .elasticsearch .common .scheduler .TimeValueSchedule ;
41+ import org .elasticsearch .common .settings .ClusterSettings ;
3542import org .elasticsearch .common .settings .Setting ;
43+ import org .elasticsearch .common .settings .Settings ;
3644import org .elasticsearch .common .util .FeatureFlag ;
3745import org .elasticsearch .common .util .set .Sets ;
3846import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
5361
5462import java .io .IOException ;
5563import java .lang .ref .SoftReference ;
64+ import java .time .Clock ;
65+ import java .time .Instant ;
66+ import java .time .ZoneOffset ;
67+ import java .time .ZonedDateTime ;
5668import java .util .Arrays ;
5769import java .util .HashMap ;
5870import java .util .HashSet ;
6779import java .util .function .LongSupplier ;
6880import java .util .function .Supplier ;
6981
70- public class SamplingService implements ClusterStateListener {
82+ public class SamplingService extends AbstractLifecycleComponent implements ClusterStateListener , SchedulerEngine . Listener {
7183 public static final boolean RANDOM_SAMPLING_FEATURE_FLAG = new FeatureFlag ("random_sampling" ).isEnabled ();
84+ public static final Setting <TimeValue > TTL_POLL_INTERVAL_SETTING = Setting .timeSetting (
85+ "random_sampling.ttl_poll_interval" ,
86+ TimeValue .timeValueMinutes (30 ),
87+ TimeValue .timeValueSeconds (1 ),
88+ Setting .Property .Dynamic ,
89+ Setting .Property .NodeScope
90+ );
7291 private static final Logger logger = LogManager .getLogger (SamplingService .class );
92+ private static final String TTL_JOB_ID = "sampling_ttl" ;
7393 private final ScriptService scriptService ;
7494 private final ClusterService clusterService ;
7595 private final ProjectResolver projectResolver ;
76- private final LongSupplier relativeMillisTimeSupplier ;
7796 private final LongSupplier statsTimeSupplier = System ::nanoTime ;
7897 private final MasterServiceTaskQueue <UpdateSamplingConfigurationTask > updateSamplingConfigurationTaskQueue ;
7998 private final MasterServiceTaskQueue <DeleteSampleConfigurationTask > deleteSamplingConfigurationTaskQueue ;
@@ -85,24 +104,41 @@ public class SamplingService implements ClusterStateListener {
85104 Setting .Property .NodeScope ,
86105 Setting .Property .Dynamic
87106 );
88-
107+ private final SetOnce <SchedulerEngine > scheduler = new SetOnce <>();
108+ private SchedulerEngine .Job scheduledJob ;
109+ private volatile TimeValue pollInterval ;
110+ private final Settings settings ;
111+ private final Clock clock = Clock .systemUTC ();
89112 /*
90113 * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that
91114 * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We
92115 * make a best effort to keep them around, but do not worry about the complexity or cost of making them durable.
93116 */
94117 private final Map <ProjectIndex , SoftReference <SampleInfo >> samples = new ConcurrentHashMap <>();
95118
96- public SamplingService (
119+ /*
120+ * This creates a new SamplingService, and configures various listeners on it.
121+ */
122+ public static SamplingService create (
97123 ScriptService scriptService ,
98124 ClusterService clusterService ,
99125 ProjectResolver projectResolver ,
100- LongSupplier relativeMillisTimeSupplier
126+ Settings settings
127+ ) {
128+ SamplingService samplingService = new SamplingService (scriptService , clusterService , projectResolver , settings );
129+ samplingService .configureListeners ();
130+ return samplingService ;
131+ }
132+
133+ private SamplingService (
134+ ScriptService scriptService ,
135+ ClusterService clusterService ,
136+ ProjectResolver projectResolver ,
137+ Settings settings
101138 ) {
102139 this .scriptService = scriptService ;
103140 this .clusterService = clusterService ;
104141 this .projectResolver = projectResolver ;
105- this .relativeMillisTimeSupplier = relativeMillisTimeSupplier ;
106142 this .updateSamplingConfigurationTaskQueue = clusterService .createTaskQueue (
107143 "update-sampling-configuration" ,
108144 Priority .NORMAL ,
@@ -113,6 +149,24 @@ public SamplingService(
113149 Priority .NORMAL ,
114150 new DeleteSampleConfigurationExecutor ()
115151 );
152+ this .settings = settings ;
153+ this .pollInterval = TTL_POLL_INTERVAL_SETTING .get (settings );
154+ }
155+
156+ private void configureListeners () {
157+ ClusterSettings clusterSettings = clusterService .getClusterSettings ();
158+ clusterSettings .addSettingsUpdateConsumer (TTL_POLL_INTERVAL_SETTING , (v ) -> {
159+ pollInterval = v ;
160+ if (clusterService .state ().nodes ().isLocalNodeElectedMaster ()) {
161+ maybeScheduleJob ();
162+ }
163+ });
164+ this .addLifecycleListener (new LifecycleListener () {
165+ @ Override
166+ public void afterStop () {
167+ cancelJob ();
168+ }
169+ });
116170 }
117171
118172 /**
@@ -172,11 +226,7 @@ private void maybeSample(
172226 }
173227 SoftReference <SampleInfo > sampleInfoReference = samples .compute (
174228 new ProjectIndex (projectMetadata .id (), indexName ),
175- (k , v ) -> v == null || v .get () == null
176- ? new SoftReference <>(
177- new SampleInfo (samplingConfig .maxSamples (), samplingConfig .timeToLive (), relativeMillisTimeSupplier .getAsLong ())
178- )
179- : v
229+ (k , v ) -> v == null || v .get () == null ? new SoftReference <>(new SampleInfo (samplingConfig .maxSamples ())) : v
180230 );
181231 SampleInfo sampleInfo = sampleInfoReference .get ();
182232 if (sampleInfo == null ) {
@@ -345,11 +395,35 @@ public void deleteSampleConfiguration(
345395 );
346396 }
347397
398+ /*
399+ * This version is meant to be used by background processes, not user requests.
400+ */
401+ private void deleteSampleConfiguration (ProjectId projectId , String index ) {
402+ deleteSampleConfiguration (projectId , index , TimeValue .MAX_VALUE , TimeValue .MAX_VALUE , ActionListener .wrap (response -> {
403+ if (response .isAcknowledged ()) {
404+ logger .debug ("Deleted sampling configuration for {}" , index );
405+ } else {
406+ logger .warn ("Deletion of sampling configuration for {} not acknowledged" , index );
407+ }
408+ }, e -> logger .warn ("Failed to delete sample configuration for " + index , e )));
409+ }
410+
348411 @ Override
349412 public void clusterChanged (ClusterChangedEvent event ) {
350413 if (RANDOM_SAMPLING_FEATURE_FLAG == false ) {
351414 return ;
352415 }
416+ final boolean isMaster = event .localNodeMaster ();
417+ final boolean wasMaster = event .previousState ().nodes ().isLocalNodeElectedMaster ();
418+ if (wasMaster != isMaster ) {
419+ if (isMaster ) {
420+ // we weren't the master, and now we are
421+ maybeScheduleJob ();
422+ } else {
423+ // we were the master, and now we aren't
424+ cancelJob ();
425+ }
426+ }
353427 if (samples .isEmpty ()) {
354428 return ;
355429 }
@@ -407,6 +481,31 @@ public void clusterChanged(ClusterChangedEvent event) {
407481 }
408482 }
409483
484+ private void maybeScheduleJob () {
485+ if (isClusterServiceStoppedOrClosed ()) {
486+ // don't create scheduler if the node is shutting down
487+ return ;
488+ }
489+ if (scheduler .get () == null ) {
490+ scheduler .set (new SchedulerEngine (settings , clock ));
491+ scheduler .get ().register (this );
492+ }
493+ scheduledJob = new SchedulerEngine .Job (TTL_JOB_ID , new TimeValueSchedule (pollInterval ));
494+ scheduler .get ().add (scheduledJob );
495+ }
496+
497+ private void cancelJob () {
498+ if (scheduler .get () != null ) {
499+ scheduler .get ().remove (TTL_JOB_ID );
500+ scheduledJob = null ;
501+ }
502+ }
503+
504+ private boolean isClusterServiceStoppedOrClosed () {
505+ final Lifecycle .State state = clusterService .lifecycleState ();
506+ return state == Lifecycle .State .STOPPED || state == Lifecycle .State .CLOSED ;
507+ }
508+
410509 private boolean evaluateCondition (
411510 Supplier <IngestDocument > ingestDocumentSupplier ,
412511 Script script ,
@@ -457,6 +556,53 @@ private static boolean checkMaxConfigLimitBreached(ProjectId projectId, String i
457556 return false ;
458557 }
459558
559+ @ Override
560+ public void triggered (SchedulerEngine .Event event ) {
561+ logger .debug ("job triggered: {}, {}, {}" , event .jobName (), event .scheduledTime (), event .triggeredTime ());
562+ checkTTLs ();
563+ }
564+
565+ @ Override
566+ protected void doStart () {}
567+
568+ @ Override
569+ protected void doStop () {
570+ clusterService .removeListener (this );
571+ logger .debug ("Sampling service is stopping." );
572+ }
573+
574+ @ Override
575+ protected void doClose () throws IOException {
576+ logger .debug ("Sampling service is closing." );
577+ SchedulerEngine engine = scheduler .get ();
578+ if (engine != null ) {
579+ engine .stop ();
580+ }
581+ }
582+
583+ private void checkTTLs () {
584+ long now = clock .instant ().toEpochMilli ();
585+ for (ProjectMetadata projectMetadata : clusterService .state ().metadata ().projects ().values ()) {
586+ SamplingMetadata samplingMetadata = projectMetadata .custom (SamplingMetadata .TYPE );
587+ if (samplingMetadata != null ) {
588+ for (Map .Entry <String , SamplingConfiguration > entry : samplingMetadata .getIndexToSamplingConfigMap ().entrySet ()) {
589+ SamplingConfiguration samplingConfiguration = entry .getValue ();
590+ if (samplingConfiguration .creationTime () + samplingConfiguration .timeToLive ().millis () < now ) {
591+ String indexName = entry .getKey ();
592+ logger .debug (
593+ "Deleting configuration for {} created at {} because it is older than {} now at {}" ,
594+ indexName ,
595+ ZonedDateTime .ofInstant (Instant .ofEpochMilli (samplingConfiguration .creationTime ()), ZoneOffset .UTC ),
596+ samplingConfiguration .timeToLive (),
597+ ZonedDateTime .ofInstant (Instant .ofEpochMilli (now ), ZoneOffset .UTC )
598+ );
599+ deleteSampleConfiguration (projectMetadata .id (), indexName );
600+ }
601+ }
602+ }
603+ }
604+ }
605+
460606 /*
461607 * This represents a raw document as the user sent it to us in an IndexRequest. It only holds onto the information needed for the
462608 * sampling API, rather than holding all of the fields a user might send in an IndexRequest.
@@ -835,18 +981,14 @@ private static final class SampleInfo {
835981 */
836982 private volatile Tuple <Integer , Long > sizeInBytesAtIndex = Tuple .tuple (-1 , 0L );
837983 private final SampleStats stats ;
838- private final long expiration ;
839- private final TimeValue timeToLive ;
840984 private volatile Script script ;
841985 private volatile IngestConditionalScript .Factory factory ;
842986 private volatile boolean compilationFailed = false ;
843987 private volatile boolean isFull = false ;
844988
845- SampleInfo (int maxSamples , TimeValue timeToLive , long relativeNowMillis ) {
846- this .timeToLive = timeToLive ;
989+ SampleInfo (int maxSamples ) {
847990 this .rawDocuments = new RawDocument [maxSamples ];
848991 this .stats = new SampleStats ();
849- this .expiration = (timeToLive == null ? TimeValue .timeValueDays (5 ).millis () : timeToLive .millis ()) + relativeNowMillis ;
850992 }
851993
852994 /*
0 commit comments