25
25
import org .elasticsearch .cluster .node .DiscoveryNode ;
26
26
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
27
27
import org .elasticsearch .cluster .service .ClusterService ;
28
+ import org .elasticsearch .common .Randomness ;
28
29
import org .elasticsearch .common .ValidationException ;
29
30
import org .elasticsearch .common .settings .Settings ;
30
31
import org .elasticsearch .core .Nullable ;
32
+ import org .elasticsearch .core .Strings ;
33
+ import org .elasticsearch .core .TimeValue ;
31
34
import org .elasticsearch .core .Tuple ;
32
35
import org .elasticsearch .persistent .AllocatedPersistentTask ;
33
36
import org .elasticsearch .persistent .PersistentTaskState ;
@@ -225,22 +228,22 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
225
228
final SetOnce <TransformState > stateHolder = new SetOnce <>();
226
229
227
230
// <8> log the start result
228
- ActionListener <StartTransformAction .Response > startTaskListener = ActionListener .wrap (
229
- response -> logger .info ("[{}] successfully completed and scheduled task in node operation" , transformId ),
230
- failure -> {
231
- // If the transform is failed then there is no need to log an error on every node restart as the error had already been
232
- // logged when the transform first failed.
233
- boolean logErrorAsInfo = failure instanceof CannotStartFailedTransformException ;
234
- auditor . audit (
235
- logErrorAsInfo ? INFO : ERROR ,
236
- transformId ,
237
- "Failed to start transform. Please stop and attempt to start again. Failure: " + failure . getMessage ()
238
- );
239
- logger . atLevel ( logErrorAsInfo ? Level . INFO : Level . ERROR )
240
- . withThrowable ( failure )
241
- . log ( "[{}] Failed to start task in node operation" , transformId );
242
- }
243
- );
231
+ ActionListener <StartTransformAction .Response > startTaskListener = ActionListener .wrap (response -> {
232
+ logger .info ("[{}] successfully completed and scheduled task in node operation" , transformId );
233
+ transformServices . scheduler (). registerTransform ( params , buildTask );
234
+ }, failure -> {
235
+ // If the transform is failed then there is no need to log an error on every node restart as the error had already been
236
+ // logged when the transform first failed.
237
+ boolean logErrorAsInfo = failure instanceof CannotStartFailedTransformException ;
238
+ auditor . audit (
239
+ logErrorAsInfo ? INFO : ERROR ,
240
+ transformId ,
241
+ "Failed to start transform. Please stop and attempt to start again. Failure: " + failure . getMessage ()
242
+ );
243
+ logger . atLevel ( logErrorAsInfo ? Level . INFO : Level . ERROR )
244
+ . withThrowable ( failure )
245
+ . log ( "[{}] Failed to start task in node operation" , transformId );
246
+ } );
244
247
245
248
// <7> load next checkpoint
246
249
ActionListener <TransformCheckpoint > getTransformNextCheckpointListener = ActionListener .wrap (nextCheckpoint -> {
@@ -259,7 +262,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
259
262
final long lastCheckpoint = stateHolder .get ().getCheckpoint ();
260
263
final AuthorizationState authState = stateHolder .get ().getAuthState ();
261
264
262
- startTask (buildTask , indexerBuilder , authState , lastCheckpoint , startTaskListener );
265
+ startTask (buildTask , params , indexerBuilder , authState , lastCheckpoint , startTaskListener );
263
266
}, error -> {
264
267
// TODO: do not use the same error message as for loading the last checkpoint
265
268
String msg = TransformMessages .getMessage (TransformMessages .FAILED_TO_LOAD_TRANSFORM_CHECKPOINT , transformId );
@@ -326,7 +329,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
326
329
markAsFailed (buildTask , error , msg );
327
330
} else {
328
331
logger .trace ("[{}] No stats found (new transform), starting the task" , transformId );
329
- startTask (buildTask , indexerBuilder , null , null , startTaskListener );
332
+ startTask (buildTask , params , indexerBuilder , null , null , startTaskListener );
330
333
}
331
334
}
332
335
);
@@ -485,17 +488,58 @@ private ActionListener<Boolean> retryListener(TransformTask task) {
485
488
486
489
private void startTask (
487
490
TransformTask buildTask ,
491
+ TransformTaskParams params ,
488
492
ClientTransformIndexerBuilder indexerBuilder ,
489
493
AuthorizationState authState ,
490
494
Long previousCheckpoint ,
491
495
ActionListener <StartTransformAction .Response > listener
492
496
) {
497
+ // if we fail the first request, we are going to start retrying until we succeed. when start fails, it is because the cluster state
498
+ // is not handling updates yet, but the cluster will eventually recover on its own.
499
+ var startRetriesOnFirstFailureListener = listener .delegateResponse ((l , e ) -> {
500
+ // copy the params but replace the frequency, this is to prevent every transform from starting and retrying every second,
501
+ // potentially sending many cluster state updates at once. instead, add randomness to spread out the retry requests after the
502
+ // first retry
503
+ var retryTimer = TimeValue .timeValueSeconds (45 + Randomness .get ().nextInt (15 , 45 ));
504
+ var paramsWithExtendedTimer = new TransformTaskParams (
505
+ params .getId (),
506
+ params .getVersion (),
507
+ params .from (),
508
+ retryTimer ,
509
+ params .requiresRemote ()
510
+ );
511
+ logger .debug ("Failed to start Transform, retrying in [{}] seconds." , retryTimer .seconds ());
512
+ // tell the user when and why the retries are happening and how to stop them
513
+ // force stopping will eventually deregister this retry task from the scheduler
514
+ auditor .warning (
515
+ params .getId (),
516
+ Strings .format (
517
+ "Failed while starting Transform. Automatically retrying every [%s] seconds. "
518
+ + "To cancel retries, use [_transform/%s/_stop?force] to force stop this transform. Failure: [%s]" ,
519
+ retryTimer .seconds (),
520
+ params .getId (),
521
+ e .getMessage ()
522
+ )
523
+ );
524
+ var scheduler = transformServices .scheduler ();
525
+ scheduler .registerTransform (
526
+ paramsWithExtendedTimer ,
527
+ new TransformRetryableStartUpListener <>(
528
+ paramsWithExtendedTimer .getId (),
529
+ ll -> buildTask .start (previousCheckpoint , ll ),
530
+ ActionListener .runBefore (l , () -> scheduler .deregisterTransform (paramsWithExtendedTimer .getId ())),
531
+ ActionListener .noop (),
532
+ () -> true ,
533
+ buildTask .getContext ()
534
+ )
535
+ );
536
+ });
493
537
// switch the threadpool to generic, because the caller is on the system_read threadpool
494
538
threadPool .generic ().execute (() -> {
495
539
buildTask .initializeIndexer (indexerBuilder );
496
540
buildTask .setAuthState (authState );
497
541
// TransformTask#start will fail if the task state is FAILED
498
- buildTask .setNumFailureRetries (numFailureRetries ).start (previousCheckpoint , listener );
542
+ buildTask .setNumFailureRetries (numFailureRetries ).start (previousCheckpoint , startRetriesOnFirstFailureListener );
499
543
});
500
544
}
501
545
0 commit comments