99import org .apache .logging .log4j .LogManager ;
1010import org .apache .logging .log4j .Logger ;
1111import org .elasticsearch .action .ActionListener ;
12- import org .elasticsearch .action .admin .indices .delete .DeleteIndexRequest ;
1312import org .elasticsearch .client .internal .Client ;
1413import org .elasticsearch .cluster .ClusterState ;
1514import org .elasticsearch .cluster .ClusterStateObserver ;
@@ -35,9 +34,20 @@ public class DownsampleStep extends AsyncActionStep {
3534 private static final Logger logger = LogManager .getLogger (DownsampleStep .class );
3635
3736 private final DateHistogramInterval fixedInterval ;
37+ private final StepKey nextStepOnSuccess ;
38+ private final StepKey nextStepOnFailure ;
39+ private volatile boolean downsampleFailed ;
3840
39- public DownsampleStep (StepKey key , StepKey nextStepKey , Client client , DateHistogramInterval fixedInterval ) {
40- super (key , nextStepKey , client );
41+ public DownsampleStep (
42+ StepKey key ,
43+ StepKey nextStepOnSuccess ,
44+ StepKey nextStepOnFailure ,
45+ Client client ,
46+ DateHistogramInterval fixedInterval
47+ ) {
48+ super (key , null , client );
49+ this .nextStepOnSuccess = nextStepOnSuccess ;
50+ this .nextStepOnFailure = nextStepOnFailure ;
4151 this .fixedInterval = fixedInterval ;
4252 }
4353
@@ -62,72 +72,63 @@ public void performAction(
6272 final String indexName = indexMetadata .getIndex ().getName ();
6373 final String downsampleIndexName = lifecycleState .downsampleIndexName ();
6474 if (Strings .hasText (downsampleIndexName ) == false ) {
75+ downsampleFailed = true ;
6576 listener .onFailure (
6677 new IllegalStateException (
67- "rollup index name was not generated for policy [" + policyName + "] and index [" + indexName + "]"
78+ "downsample index name was not generated for policy [" + policyName + "] and index [" + indexName + "]"
6879 )
6980 );
7081 return ;
7182 }
7283
73- IndexMetadata rollupIndexMetadata = currentState .metadata ().index (downsampleIndexName );
74- if (rollupIndexMetadata != null ) {
75- IndexMetadata .DownsampleTaskStatus rollupIndexStatus = IndexMetadata .INDEX_DOWNSAMPLE_STATUS .get (
76- rollupIndexMetadata .getSettings ()
84+ IndexMetadata downsampleIndexMetadata = currentState .metadata ().index (downsampleIndexName );
85+ if (downsampleIndexMetadata != null ) {
86+ IndexMetadata .DownsampleTaskStatus downsampleIndexStatus = IndexMetadata .INDEX_DOWNSAMPLE_STATUS .get (
87+ downsampleIndexMetadata .getSettings ()
7788 );
78- // Rollup index has already been created with the generated name and its status is "success".
79- // So we skip index rollup creation .
80- if ( IndexMetadata . DownsampleTaskStatus . SUCCESS . equals ( rollupIndexStatus )) {
89+ if ( IndexMetadata . DownsampleTaskStatus . SUCCESS . equals ( downsampleIndexStatus )) {
90+ // Downsample index has already been created with the generated name and its status is "success" .
91+ // So we skip index downsample creation.
8192 logger .warn (
82- "skipping [{}] step for index [{}] as part of policy [{}] as the rollup index [{}] already exists" ,
93+ "skipping [{}] step for index [{}] as part of policy [{}] as the downsample index [{}] already exists" ,
8394 DownsampleStep .NAME ,
8495 indexName ,
8596 policyName ,
8697 downsampleIndexName
8798 );
8899 listener .onResponse (null );
89100 } else {
90- logger .warn (
91- "[{}] step for index [{}] as part of policy [{}] found the rollup index [{}] already exists. Deleting it." ,
92- DownsampleStep .NAME ,
93- indexName ,
94- policyName ,
95- downsampleIndexName
101+ // Downsample index has already been created with the generated name but its status is not "success".
102+ // So we fail this step so that we go back to cleaning up the index and try again with a new downsample
103+ // index name.
104+ downsampleFailed = true ;
105+ listener .onFailure (
106+ new IllegalStateException (
107+ "failing ["
108+ + DownsampleStep .NAME
109+ + "] step for index ["
110+ + indexName
111+ + "] as part of policy ["
112+ + policyName
113+ + "] because the downsample index ["
114+ + downsampleIndexName
115+ + "] already exists with downsample status ["
116+ + downsampleIndexStatus
117+ + "]"
118+ )
96119 );
97- // Rollup index has already been created with the generated name but its status is not "success".
98- // So we delete the index and proceed with executing the rollup step.
99- DeleteIndexRequest deleteRequest = new DeleteIndexRequest (downsampleIndexName );
100- getClient ().admin ().indices ().delete (deleteRequest , ActionListener .wrap (response -> {
101- if (response .isAcknowledged ()) {
102- performDownsampleIndex (indexName , downsampleIndexName , listener );
103- } else {
104- listener .onFailure (
105- new IllegalStateException (
106- "failing ["
107- + DownsampleStep .NAME
108- + "] step for index ["
109- + indexName
110- + "] as part of policy ["
111- + policyName
112- + "] because the rollup index ["
113- + downsampleIndexName
114- + "] already exists with rollup status ["
115- + rollupIndexStatus
116- + "]"
117- )
118- );
119- }
120- }, listener ::onFailure ));
121120 }
122- return ;
121+ } else {
122+ performDownsampleIndex (indexName , downsampleIndexName , ActionListener .wrap (listener ::onResponse , e -> {
123+ downsampleFailed = true ;
124+ listener .onFailure (e );
125+ }));
123126 }
124-
125- performDownsampleIndex (indexName , downsampleIndexName , listener );
126127 }
127128
128- private void performDownsampleIndex (String indexName , String rollupIndexName , ActionListener <Void > listener ) {
129+ void performDownsampleIndex (String indexName , String downsampleIndexName , ActionListener <Void > listener ) {
129130 DownsampleConfig config = new DownsampleConfig (fixedInterval );
130- DownsampleAction .Request request = new DownsampleAction .Request (indexName , rollupIndexName , config ).masterNodeTimeout (
131+ DownsampleAction .Request request = new DownsampleAction .Request (indexName , downsampleIndexName , config ).masterNodeTimeout (
131132 TimeValue .MAX_VALUE
132133 );
133134 // Currently, DownsampleAction always acknowledges action was complete when no exceptions are thrown.
@@ -138,24 +139,35 @@ private void performDownsampleIndex(String indexName, String rollupIndexName, Ac
138139 );
139140 }
140141
142+ @ Override
143+ public final StepKey getNextStepKey () {
144+ return downsampleFailed ? nextStepOnFailure : nextStepOnSuccess ;
145+ }
146+
141147 public DateHistogramInterval getFixedInterval () {
142148 return fixedInterval ;
143149 }
144150
145151 @ Override
146152 public int hashCode () {
147- return Objects .hash (super .hashCode (), fixedInterval );
153+ return Objects .hash (super .hashCode (), fixedInterval , nextStepOnSuccess , nextStepOnFailure );
148154 }
149155
150156 @ Override
151157 public boolean equals (Object obj ) {
158+ if (this == obj ) {
159+ return true ;
160+ }
152161 if (obj == null ) {
153162 return false ;
154163 }
155164 if (getClass () != obj .getClass ()) {
156165 return false ;
157166 }
158167 DownsampleStep other = (DownsampleStep ) obj ;
159- return super .equals (obj ) && Objects .equals (fixedInterval , other .fixedInterval );
168+ return super .equals (obj )
169+ && Objects .equals (fixedInterval , other .fixedInterval )
170+ && Objects .equals (nextStepOnSuccess , other .nextStepOnSuccess )
171+ && Objects .equals (nextStepOnFailure , other .nextStepOnFailure );
160172 }
161173}
0 commit comments