2222import org .elasticsearch .xpack .core .transform .transforms .TransformTaskState ;
2323import org .elasticsearch .xpack .transform .notifications .MockTransformAuditor ;
2424
25+ import java .util .List ;
2526import java .util .Map ;
2627import java .util .Set ;
2728
@@ -63,9 +64,121 @@ public int getFailureCountChangedCounter() {
6364 }
6465 }
6566
66- public void testUnattended () {
67+ public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeLessThanMinimumPageSize () {
68+ var e = new CircuitBreakingException (randomAlphaOfLength (10 ), 1 , 0 , randomFrom (CircuitBreaker .Durability .values ()));
69+ assertRetryIfUnattendedOtherwiseFail (e );
70+ }
71+
72+ public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeNotLessThanMinimumPageSize () {
73+ var e = new CircuitBreakingException (randomAlphaOfLength (10 ), 1 , 1 , randomFrom (CircuitBreaker .Durability .values ()));
74+
75+ List .of (true , false ).forEach ((unattended ) -> { assertNoFailureAndContextPageSizeSet (e , unattended , 365 ); });
76+ }
77+
78+ public void testHandleIndexerFailure_ScriptException () {
79+ var e = new ScriptException (
80+ randomAlphaOfLength (10 ),
81+ new ArithmeticException (randomAlphaOfLength (10 )),
82+ singletonList (randomAlphaOfLength (10 )),
83+ randomAlphaOfLength (10 ),
84+ randomAlphaOfLength (10 )
85+ );
86+ assertRetryIfUnattendedOtherwiseFail (e );
87+ }
88+
89+ public void testHandleIndexerFailure_BulkIndexExceptionWrappingClusterBlockException () {
90+ final BulkIndexingException bulkIndexingException = new BulkIndexingException (
91+ randomAlphaOfLength (10 ),
92+ new ClusterBlockException (Map .of ("test-index" , Set .of (MetadataIndexStateService .INDEX_CLOSED_BLOCK ))),
93+ randomBoolean ()
94+ );
95+
96+ List .of (true , false ).forEach ((unattended ) -> { assertRetryFailureCountNotIncremented (bulkIndexingException , unattended ); });
97+ }
98+
99+ public void testHandleIndexerFailure_IrrecoverableBulkIndexException () {
100+ final BulkIndexingException e = new BulkIndexingException (
101+ randomAlphaOfLength (10 ),
102+ new ElasticsearchStatusException (randomAlphaOfLength (10 ), RestStatus .INTERNAL_SERVER_ERROR ),
103+ true
104+ );
105+ assertRetryIfUnattendedOtherwiseFail (e );
106+ }
107+
108+ public void testHandleIndexerFailure_RecoverableBulkIndexException () {
109+ final BulkIndexingException bulkIndexingException = new BulkIndexingException (
110+ randomAlphaOfLength (10 ),
111+ new ElasticsearchStatusException (randomAlphaOfLength (10 ), RestStatus .INTERNAL_SERVER_ERROR ),
112+ false
113+ );
114+
115+ List .of (true , false ).forEach ((unattended ) -> { assertRetry (bulkIndexingException , unattended ); });
116+ }
117+
118+ public void testHandleIndexerFailure_ClusterBlockException () {
119+ List .of (true , false ).forEach ((unattended ) -> {
120+ assertRetry (
121+ new ClusterBlockException (Map .of (randomAlphaOfLength (10 ), Set .of (MetadataIndexStateService .INDEX_CLOSED_BLOCK ))),
122+ unattended
123+ );
124+ });
125+ }
126+
127+ public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithNoShardSearchFailures () {
128+ List .of (true , false ).forEach ((unattended ) -> {
129+ assertRetry (
130+ new SearchPhaseExecutionException (randomAlphaOfLength (10 ), randomAlphaOfLength (10 ), ShardSearchFailure .EMPTY_ARRAY ),
131+ unattended
132+ );
133+ });
134+ }
135+
136+ public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithShardSearchFailures () {
137+ List .of (true , false ).forEach ((unattended ) -> {
138+ assertRetry (
139+ new SearchPhaseExecutionException (
140+ randomAlphaOfLength (10 ),
141+ randomAlphaOfLength (10 ),
142+ new ShardSearchFailure [] { new ShardSearchFailure (new Exception ()) }
143+ ),
144+ unattended
145+ );
146+ });
147+ }
148+
149+ public void testHandleIndexerFailure_RecoverableElasticsearchException () {
150+ List .of (true , false ).forEach ((unattended ) -> {
151+ assertRetry (new ElasticsearchStatusException (randomAlphaOfLength (10 ), RestStatus .INTERNAL_SERVER_ERROR ), unattended );
152+ });
153+ }
154+
155+ public void testHandleIndexerFailure_IrrecoverableElasticsearchException () {
156+ var e = new ElasticsearchStatusException (randomAlphaOfLength (10 ), RestStatus .NOT_FOUND );
157+ assertRetryIfUnattendedOtherwiseFail (e );
158+ }
159+
160+ public void testHandleIndexerFailure_IllegalArgumentException () {
161+ var e = new IllegalArgumentException (randomAlphaOfLength (10 ));
162+ assertRetryIfUnattendedOtherwiseFail (e );
163+ }
164+
165+ public void testHandleIndexerFailure_UnexpectedException () {
166+ List .of (true , false ).forEach ((unattended ) -> { assertRetry (new Exception (), unattended ); });
167+ }
168+
169+ private void assertRetryIfUnattendedOtherwiseFail (Exception e ) {
170+ List .of (true , false ).forEach ((unattended ) -> {
171+ if (unattended ) {
172+ assertRetry (e , unattended );
173+ } else {
174+ assertFailure (e );
175+ }
176+ });
177+ }
178+
179+ private void assertRetry (Exception e , boolean unattended ) {
67180 String transformId = randomAlphaOfLength (10 );
68- SettingsConfig settings = new SettingsConfig .Builder ().setUnattended (true ).build ();
181+ SettingsConfig settings = new SettingsConfig .Builder ().setNumFailureRetries ( 2 ). setUnattended (unattended ).build ();
69182
70183 MockTransformAuditor auditor = MockTransformAuditor .createMockAuditor ();
71184 MockTransformContextListener contextListener = new MockTransformContextListener ();
@@ -74,51 +187,33 @@ public void testUnattended() {
74187
75188 TransformFailureHandler handler = new TransformFailureHandler (auditor , context , transformId );
76189
77- handler . handleIndexerFailure (
78- new SearchPhaseExecutionException (
79- "query" ,
80- "Partial shards failure" ,
81- new ShardSearchFailure [] {
82- new ShardSearchFailure ( new CircuitBreakingException ( "to much memory" , 110 , 100 , CircuitBreaker . Durability . TRANSIENT )) }
83- ),
84- settings
85- );
190+ assertNoFailure ( handler , e , contextListener , settings , true );
191+ assertNoFailure ( handler , e , contextListener , settings , true );
192+ if ( unattended ) {
193+ assertNoFailure ( handler , e , contextListener , settings , true );
194+ } else {
195+ // fail after max retry attempts reached
196+ assertFailure ( handler , e , contextListener , settings , true );
197+ }
198+ }
86199
87- // CBE isn't a failure, but it only affects page size(which we don't test here)
88- assertFalse ( contextListener . getFailed () );
89- assertEquals ( 0 , contextListener . getFailureCountChangedCounter () );
200+ private void assertRetryFailureCountNotIncremented ( Exception e , boolean unattended ) {
201+ String transformId = randomAlphaOfLength ( 10 );
202+ SettingsConfig settings = new SettingsConfig . Builder (). setNumFailureRetries ( 2 ). setUnattended ( unattended ). build ( );
90203
91- assertNoFailure (
92- handler ,
93- new SearchPhaseExecutionException (
94- "query" ,
95- "Partial shards failure" ,
96- new ShardSearchFailure [] {
97- new ShardSearchFailure (
98- new ScriptException (
99- "runtime error" ,
100- new ArithmeticException ("/ by zero" ),
101- singletonList ("stack" ),
102- "test" ,
103- "painless"
104- )
105- ) }
106- ),
107- contextListener ,
108- settings
109- );
110- assertNoFailure (
111- handler ,
112- new ElasticsearchStatusException ("something really bad happened" , RestStatus .INTERNAL_SERVER_ERROR ),
113- contextListener ,
114- settings
115- );
116- assertNoFailure (handler , new IllegalArgumentException ("expected apples not oranges" ), contextListener , settings );
117- assertNoFailure (handler , new RuntimeException ("the s*** hit the fan" ), contextListener , settings );
118- assertNoFailure (handler , new NullPointerException ("NPE" ), contextListener , settings );
204+ MockTransformAuditor auditor = MockTransformAuditor .createMockAuditor ();
205+ MockTransformContextListener contextListener = new MockTransformContextListener ();
206+ TransformContext context = new TransformContext (TransformTaskState .STARTED , "" , 0 , contextListener );
207+ context .setPageSize (500 );
208+
209+ TransformFailureHandler handler = new TransformFailureHandler (auditor , context , transformId );
210+
211+ assertNoFailure (handler , e , contextListener , settings , false );
212+ assertNoFailure (handler , e , contextListener , settings , false );
213+ assertNoFailure (handler , e , contextListener , settings , false );
119214 }
120215
121- public void testClusterBlock ( ) {
216+ private void assertFailure ( Exception e ) {
122217 String transformId = randomAlphaOfLength (10 );
123218 SettingsConfig settings = new SettingsConfig .Builder ().setNumFailureRetries (2 ).build ();
124219
@@ -129,32 +224,50 @@ public void testClusterBlock() {
129224
130225 TransformFailureHandler handler = new TransformFailureHandler (auditor , context , transformId );
131226
132- final ClusterBlockException clusterBlock = new ClusterBlockException (
133- Map .of ("test-index" , Set .of (MetadataIndexStateService .INDEX_CLOSED_BLOCK ))
134- );
227+ assertFailure (handler , e , contextListener , settings , false );
228+ }
135229
136- handler .handleIndexerFailure (clusterBlock , settings );
137- assertFalse (contextListener .getFailed ());
138- assertEquals (1 , contextListener .getFailureCountChangedCounter ());
230+ private void assertNoFailure (
231+ TransformFailureHandler handler ,
232+ Exception e ,
233+ MockTransformContextListener mockTransformContextListener ,
234+ SettingsConfig settings ,
235+ boolean failureCountIncremented
236+ ) {
237+ handler .handleIndexerFailure (e , settings );
238+ assertFalse (mockTransformContextListener .getFailed ());
239+ assertEquals (failureCountIncremented ? 1 : 0 , mockTransformContextListener .getFailureCountChangedCounter ());
240+ mockTransformContextListener .reset ();
241+ }
139242
140- handler . handleIndexerFailure ( clusterBlock , settings );
141- assertFalse ( contextListener . getFailed () );
142- assertEquals ( 2 , contextListener . getFailureCountChangedCounter () );
243+ private void assertNoFailureAndContextPageSizeSet ( Exception e , boolean unattended , int newPageSize ) {
244+ String transformId = randomAlphaOfLength ( 10 );
245+ SettingsConfig settings = new SettingsConfig . Builder (). setNumFailureRetries ( 2 ). setUnattended ( unattended ). build ( );
143246
144- handler .handleIndexerFailure (clusterBlock , settings );
145- assertTrue (contextListener .getFailed ());
146- assertEquals (3 , contextListener .getFailureCountChangedCounter ());
247+ MockTransformAuditor auditor = MockTransformAuditor .createMockAuditor ();
248+ MockTransformContextListener contextListener = new MockTransformContextListener ();
249+ TransformContext context = new TransformContext (TransformTaskState .STARTED , "" , 0 , contextListener );
250+ context .setPageSize (500 );
251+
252+ TransformFailureHandler handler = new TransformFailureHandler (auditor , context , transformId );
253+
254+ handler .handleIndexerFailure (e , settings );
255+ assertFalse (contextListener .getFailed ());
256+ assertEquals (0 , contextListener .getFailureCountChangedCounter ());
257+ assertEquals (newPageSize , context .getPageSize ());
258+ contextListener .reset ();
147259 }
148260
149- private void assertNoFailure (
261+ private void assertFailure (
150262 TransformFailureHandler handler ,
151263 Exception e ,
152264 MockTransformContextListener mockTransformContextListener ,
153- SettingsConfig settings
265+ SettingsConfig settings ,
266+ boolean failureCountChanged
154267 ) {
155268 handler .handleIndexerFailure (e , settings );
156- assertFalse (mockTransformContextListener .getFailed ());
157- assertEquals (1 , mockTransformContextListener .getFailureCountChangedCounter ());
269+ assertTrue (mockTransformContextListener .getFailed ());
270+ assertEquals (failureCountChanged ? 1 : 0 , mockTransformContextListener .getFailureCountChangedCounter ());
158271 mockTransformContextListener .reset ();
159272 }
160273
0 commit comments