54
54
import java .util .Objects ;
55
55
import java .util .Set ;
56
56
import java .util .TreeMap ;
57
+ import java .util .concurrent .Executor ;
57
58
import java .util .function .BiConsumer ;
58
59
import java .util .function .Consumer ;
59
60
import java .util .function .Function ;
@@ -77,6 +78,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
77
78
private final CcrLicenseChecker ccrLicenseChecker ;
78
79
private final LongSupplier relativeMillisTimeProvider ;
79
80
private final LongSupplier absoluteMillisTimeProvider ;
81
+ private final Executor executor ;
80
82
81
83
private volatile TimeValue waitForMetadataTimeOut ;
82
84
private volatile Map <String , AutoFollower > autoFollowers = Collections .emptyMap ();
@@ -88,18 +90,20 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
88
90
private final LinkedHashMap <String , Tuple <Long , ElasticsearchException >> recentAutoFollowErrors ;
89
91
90
92
public AutoFollowCoordinator (
91
- Settings settings ,
92
- Client client ,
93
- ClusterService clusterService ,
94
- CcrLicenseChecker ccrLicenseChecker ,
95
- LongSupplier relativeMillisTimeProvider ,
96
- LongSupplier absoluteMillisTimeProvider ) {
93
+ final Settings settings ,
94
+ final Client client ,
95
+ final ClusterService clusterService ,
96
+ final CcrLicenseChecker ccrLicenseChecker ,
97
+ final LongSupplier relativeMillisTimeProvider ,
98
+ final LongSupplier absoluteMillisTimeProvider ,
99
+ final Executor executor ) {
97
100
98
101
this .client = client ;
99
102
this .clusterService = clusterService ;
100
103
this .ccrLicenseChecker = Objects .requireNonNull (ccrLicenseChecker , "ccrLicenseChecker" );
101
104
this .relativeMillisTimeProvider = relativeMillisTimeProvider ;
102
105
this .absoluteMillisTimeProvider = absoluteMillisTimeProvider ;
106
+ this .executor = Objects .requireNonNull (executor );
103
107
this .recentAutoFollowErrors = new LinkedHashMap <String , Tuple <Long , ElasticsearchException >>() {
104
108
@ Override
105
109
protected boolean removeEldestEntry (final Map .Entry <String , Tuple <Long , ElasticsearchException >> eldest ) {
@@ -209,7 +213,7 @@ void updateAutoFollowers(ClusterState followerClusterState) {
209
213
Map <String , AutoFollower > newAutoFollowers = new HashMap <>(newRemoteClusters .size ());
210
214
for (String remoteCluster : newRemoteClusters ) {
211
215
AutoFollower autoFollower =
212
- new AutoFollower (remoteCluster , this ::updateStats , clusterService ::state , relativeMillisTimeProvider ) {
216
+ new AutoFollower (remoteCluster , this ::updateStats , clusterService ::state , relativeMillisTimeProvider , executor ) {
213
217
214
218
@ Override
215
219
void getRemoteClusterState (final String remoteCluster ,
@@ -331,6 +335,7 @@ abstract static class AutoFollower {
331
335
private final Consumer <List <AutoFollowResult >> statsUpdater ;
332
336
private final Supplier <ClusterState > followerClusterStateSupplier ;
333
337
private final LongSupplier relativeTimeProvider ;
338
+ private final Executor executor ;
334
339
335
340
private volatile long lastAutoFollowTimeInMillis = -1 ;
336
341
private volatile long metadataVersion = 0 ;
@@ -343,11 +348,13 @@ abstract static class AutoFollower {
343
348
AutoFollower (final String remoteCluster ,
344
349
final Consumer <List <AutoFollowResult >> statsUpdater ,
345
350
final Supplier <ClusterState > followerClusterStateSupplier ,
346
- LongSupplier relativeTimeProvider ) {
351
+ final LongSupplier relativeTimeProvider ,
352
+ final Executor executor ) {
347
353
this .remoteCluster = remoteCluster ;
348
354
this .statsUpdater = statsUpdater ;
349
355
this .followerClusterStateSupplier = followerClusterStateSupplier ;
350
356
this .relativeTimeProvider = relativeTimeProvider ;
357
+ this .executor = Objects .requireNonNull (executor );
351
358
}
352
359
353
360
void start () {
@@ -386,6 +393,7 @@ void start() {
386
393
this .autoFollowPatternsCountDown = new CountDown (patterns .size ());
387
394
this .autoFollowResults = new AtomicArray <>(patterns .size ());
388
395
396
+ final Thread thread = Thread .currentThread ();
389
397
getRemoteClusterState (remoteCluster , metadataVersion + 1 , (remoteClusterStateResponse , remoteError ) -> {
390
398
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
391
399
if (removed ) {
@@ -402,7 +410,7 @@ void start() {
402
410
}
403
411
ClusterState remoteClusterState = remoteClusterStateResponse .getState ();
404
412
metadataVersion = remoteClusterState .metaData ().version ();
405
- autoFollowIndices (autoFollowMetadata , clusterState , remoteClusterState , patterns );
413
+ autoFollowIndices (autoFollowMetadata , clusterState , remoteClusterState , patterns , thread );
406
414
} else {
407
415
assert remoteError != null ;
408
416
if (remoteError instanceof NoSuchRemoteClusterException ) {
@@ -413,7 +421,7 @@ void start() {
413
421
414
422
for (int i = 0 ; i < patterns .size (); i ++) {
415
423
String autoFollowPatternName = patterns .get (i );
416
- finalise (i , new AutoFollowResult (autoFollowPatternName , remoteError ));
424
+ finalise (i , new AutoFollowResult (autoFollowPatternName , remoteError ), thread );
417
425
}
418
426
}
419
427
});
@@ -427,7 +435,8 @@ void stop() {
427
435
private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
428
436
final ClusterState clusterState ,
429
437
final ClusterState remoteClusterState ,
430
- final List <String > patterns ) {
438
+ final List <String > patterns ,
439
+ final Thread thread ) {
431
440
int i = 0 ;
432
441
for (String autoFollowPatternName : patterns ) {
433
442
final int slot = i ;
@@ -438,7 +447,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
438
447
final List <Index > leaderIndicesToFollow =
439
448
getLeaderIndicesToFollow (autoFollowPattern , remoteClusterState , followedIndices );
440
449
if (leaderIndicesToFollow .isEmpty ()) {
441
- finalise (slot , new AutoFollowResult (autoFollowPatternName ));
450
+ finalise (slot , new AutoFollowResult (autoFollowPatternName ), thread );
442
451
} else {
443
452
List <Tuple <String , AutoFollowPattern >> patternsForTheSameRemoteCluster = autoFollowMetadata .getPatterns ()
444
453
.entrySet ().stream ()
@@ -447,7 +456,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
447
456
.map (item -> new Tuple <>(item .getKey (), item .getValue ()))
448
457
.collect (Collectors .toList ());
449
458
450
- Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
459
+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result , thread );
451
460
checkAutoFollowPattern (autoFollowPatternName , remoteCluster , autoFollowPattern , leaderIndicesToFollow , headers ,
452
461
patternsForTheSameRemoteCluster , remoteClusterState .metaData (), clusterState .metaData (), resultHandler );
453
462
}
@@ -575,11 +584,23 @@ private void followLeaderIndex(String autoFollowPattenName,
575
584
createAndFollow (headers , request , successHandler , onResult );
576
585
}
577
586
578
- private void finalise (int slot , AutoFollowResult result ) {
587
+ private void finalise (int slot , AutoFollowResult result , final Thread thread ) {
579
588
assert autoFollowResults .get (slot ) == null ;
580
589
autoFollowResults .set (slot , result );
581
590
if (autoFollowPatternsCountDown .countDown ()) {
582
591
statsUpdater .accept (autoFollowResults .asList ());
592
+ /*
593
+ * In the face of a failure, we could be called back on the same thread. That is, it could be that we
594
+ * never fired off the asynchronous remote cluster state call, instead failing beforehand. In this case,
595
+ * we will recurse on the same thread. If there are repeated failures, we could blow the stack and
596
+ * overflow. A real-world scenario in which this can occur is if the local connect queue is full. To
597
+ * avoid this, if we are called back on the same thread, then we truncate the stack by forking to
598
+ * another thread.
599
+ */
600
+ if (thread == Thread .currentThread ()) {
601
+ executor .execute (this ::start );
602
+ return ;
603
+ }
583
604
start ();
584
605
}
585
606
}
0 commit comments