66
66
67
67
/**
68
68
* This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a DLM lifecycle configured.
69
- * It runs on the master node and it schedules a job according to the configured {@link DataLifecycleService#DLM_POLL_INTERVAL_SETTING}.
69
+ * It runs on the master node and it schedules a job according to the configured
70
+ * {@link DataLifecycleService#DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING}.
70
71
*/
71
72
public class DataLifecycleService implements ClusterStateListener , Closeable , SchedulerEngine .Listener {
72
73
73
- public static final String DLM_POLL_INTERVAL = "indices.dlm .poll_interval" ;
74
- public static final Setting <TimeValue > DLM_POLL_INTERVAL_SETTING = Setting .timeSetting (
75
- DLM_POLL_INTERVAL ,
74
+ public static final String DATA_STREAM_LIFECYCLE_POLL_INTERVAL = "data_streams.lifecycle .poll_interval" ;
75
+ public static final Setting <TimeValue > DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING = Setting .timeSetting (
76
+ DATA_STREAM_LIFECYCLE_POLL_INTERVAL ,
76
77
TimeValue .timeValueMinutes (10 ),
77
78
TimeValue .timeValueSeconds (1 ),
78
79
Setting .Property .Dynamic ,
@@ -83,11 +84,11 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
83
84
/**
84
85
* Name constant for the job DLM schedules
85
86
*/
86
- private static final String DATA_LIFECYCLE_JOB_NAME = "dlm " ;
87
+ private static final String LIFECYCLE_JOB_NAME = "data_stream_lifecycle " ;
87
88
/*
88
89
* This is the key for DLM-related custom index metadata.
89
90
*/
90
- static final String DLM_CUSTOM_INDEX_METADATA_KEY = "dlm " ;
91
+ static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle " ;
91
92
static final String FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY = "force_merge_completed_timestamp" ;
92
93
93
94
private final Settings settings ;
@@ -137,8 +138,8 @@ public DataLifecycleService(
137
138
this .nowSupplier = nowSupplier ;
138
139
this .errorStore = errorStore ;
139
140
this .scheduledJob = null ;
140
- this .pollInterval = DLM_POLL_INTERVAL_SETTING .get (settings );
141
- this .rolloverConfiguration = clusterService .getClusterSettings ().get (DataLifecycle .CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING );
141
+ this .pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING .get (settings );
142
+ this .rolloverConfiguration = clusterService .getClusterSettings ().get (DataLifecycle .CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING );
142
143
this .forceMergeClusterStateUpdateTaskQueue = clusterService .createTaskQueue (
143
144
"dlm-forcemerge-state-update" ,
144
145
Priority .LOW ,
@@ -151,9 +152,10 @@ public DataLifecycleService(
151
152
*/
152
153
public void init () {
153
154
clusterService .addListener (this );
154
- clusterService .getClusterSettings ().addSettingsUpdateConsumer (DLM_POLL_INTERVAL_SETTING , this ::updatePollInterval );
155
155
clusterService .getClusterSettings ()
156
- .addSettingsUpdateConsumer (DataLifecycle .CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING , this ::updateRolloverConfiguration );
156
+ .addSettingsUpdateConsumer (DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING , this ::updatePollInterval );
157
+ clusterService .getClusterSettings ()
158
+ .addSettingsUpdateConsumer (DataLifecycle .CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING , this ::updateRolloverConfiguration );
157
159
}
158
160
159
161
@ Override
@@ -190,7 +192,7 @@ public void close() {
190
192
191
193
@ Override
192
194
public void triggered (SchedulerEngine .Event event ) {
193
- if (event .getJobName ().equals (DATA_LIFECYCLE_JOB_NAME )) {
195
+ if (event .getJobName ().equals (LIFECYCLE_JOB_NAME )) {
194
196
if (this .isMaster ) {
195
197
logger .trace ("DLM job triggered: {}, {}, {}" , event .getJobName (), event .getScheduledTime (), event .getTriggeredTime ());
196
198
run (clusterService .state ());
@@ -501,8 +503,9 @@ public void onFailure(Exception e) {
501
503
}
502
504
503
505
/*
504
- * This method sets the value of the custom index metadata field "force_merge_completed_timestamp" within the field "dlm" to value. The
505
- * method returns immediately, but the update happens asynchronously and listener is notified on success or failure.
506
+ * This method sets the value of the custom index metadata field "force_merge_completed_timestamp" within the field
507
+ * "data_stream_lifecycle" to value. The method returns immediately, but the update happens asynchronously and listener is notified on
508
+ * success or failure.
506
509
*/
507
510
private void setForceMergeCompletedTimestamp (String targetIndex , ActionListener <Void > listener ) {
508
511
forceMergeClusterStateUpdateTaskQueue .submitTask (
@@ -513,10 +516,11 @@ private void setForceMergeCompletedTimestamp(String targetIndex, ActionListener<
513
516
}
514
517
515
518
/*
516
- * Returns true if a value has been set for the custom index metadata field "force_merge_completed_timestamp" within the field "dlm".
519
+ * Returns true if a value has been set for the custom index metadata field "force_merge_completed_timestamp" within the field
520
+ * "data_stream_lifecycle".
517
521
*/
518
522
private boolean isForceMergeComplete (IndexMetadata backingIndex ) {
519
- Map <String , String > customMetadata = backingIndex .getCustomData (DLM_CUSTOM_INDEX_METADATA_KEY );
523
+ Map <String , String > customMetadata = backingIndex .getCustomData (LIFECYCLE_CUSTOM_INDEX_METADATA_KEY );
520
524
return customMetadata != null && customMetadata .containsKey (FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY );
521
525
}
522
526
@@ -574,7 +578,7 @@ private void updateRolloverConfiguration(RolloverConfiguration newRolloverConfig
574
578
575
579
private void cancelJob () {
576
580
if (scheduler .get () != null ) {
577
- scheduler .get ().remove (DATA_LIFECYCLE_JOB_NAME );
581
+ scheduler .get ().remove (LIFECYCLE_JOB_NAME );
578
582
scheduledJob = null ;
579
583
}
580
584
}
@@ -601,7 +605,7 @@ private void maybeScheduleJob() {
601
605
}
602
606
603
607
assert scheduler .get () != null : "scheduler should be available" ;
604
- scheduledJob = new SchedulerEngine .Job (DATA_LIFECYCLE_JOB_NAME , new TimeValueSchedule (pollInterval ));
608
+ scheduledJob = new SchedulerEngine .Job (LIFECYCLE_JOB_NAME , new TimeValueSchedule (pollInterval ));
605
609
scheduler .get ().add (scheduledJob );
606
610
}
607
611
@@ -628,14 +632,14 @@ static class UpdateForceMergeCompleteTask implements ClusterStateTaskListener {
628
632
ClusterState execute (ClusterState currentState ) throws Exception {
629
633
logger .debug ("Updating cluster state with force merge complete marker for {}" , targetIndex );
630
634
IndexMetadata indexMetadata = currentState .metadata ().index (targetIndex );
631
- Map <String , String > customMetadata = indexMetadata .getCustomData (DLM_CUSTOM_INDEX_METADATA_KEY );
635
+ Map <String , String > customMetadata = indexMetadata .getCustomData (LIFECYCLE_CUSTOM_INDEX_METADATA_KEY );
632
636
Map <String , String > newCustomMetadata = new HashMap <>();
633
637
if (customMetadata != null ) {
634
638
newCustomMetadata .putAll (customMetadata );
635
639
}
636
640
newCustomMetadata .put (FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY , Long .toString (threadPool .absoluteTimeInMillis ()));
637
641
IndexMetadata updatededIndexMetadata = new IndexMetadata .Builder (indexMetadata ).putCustom (
638
- DLM_CUSTOM_INDEX_METADATA_KEY ,
642
+ LIFECYCLE_CUSTOM_INDEX_METADATA_KEY ,
639
643
newCustomMetadata
640
644
).build ();
641
645
Metadata metadata = Metadata .builder (currentState .metadata ()).put (updatededIndexMetadata , true ).build ();
0 commit comments