4242import  org .elasticsearch .common .util .concurrent .AbstractRunnable ;
4343import  org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
4444import  org .elasticsearch .common .util .concurrent .EsExecutors ;
45+ import  org .elasticsearch .common .util .concurrent .RunOnce ;
4546import  org .elasticsearch .common .util .concurrent .ThreadContext ;
4647import  org .elasticsearch .common .util .concurrent .ThrottledTaskRunner ;
4748import  org .elasticsearch .core .Nullable ;
7475import  org .elasticsearch .indices .recovery .RecoveryFailedException ;
7576import  org .elasticsearch .indices .recovery .RecoveryState ;
7677import  org .elasticsearch .injection .guice .Inject ;
78+ import  org .elasticsearch .monitor .jvm .HotThreads ;
7779import  org .elasticsearch .repositories .RepositoriesService ;
7880import  org .elasticsearch .search .SearchService ;
7981import  org .elasticsearch .snapshots .SnapshotShardsService ;
@@ -688,6 +690,7 @@ private void createShard(ShardRouting shardRouting, ClusterState state) {
688690                primaryTerm ,
689691                0 ,
690692                0L ,
693+                 new  RunOnce (() -> HotThreads .logLocalCurrentThreads (logger , Level .WARN , shardId  + ": acquire shard lock for create" )),
691694                ActionListener .runBefore (new  ActionListener <>() {
692695                    @ Override 
693696                    public  void  onResponse (Boolean  success ) {
@@ -740,6 +743,7 @@ private void createShardWhenLockAvailable(
740743        long  primaryTerm ,
741744        int  iteration ,
742745        long  delayMillis ,
746+         RunOnce  dumpHotThreads ,
743747        ActionListener <Boolean > listener 
744748    ) {
745749        try  {
@@ -763,8 +767,9 @@ private void createShardWhenLockAvailable(
763767                listener .onFailure (e );
764768                return ;
765769            }
770+             final  Level  level  = (iteration  + 25 ) % 30  == 0  ? Level .WARN  : Level .DEBUG ;
766771            logger .log (
767-                 ( iteration  +  25 ) %  30  ==  0  ?  Level . WARN  :  Level . DEBUG ,
772+                 level ,
768773                """ 
769774                    shard lock for [{}] has been unavailable for at least [{}/{}ms], \ 
770775                    attempting to create shard while applying cluster state [version={},uuid={}], will retry in [{}]: [{}]""" ,
@@ -776,6 +781,9 @@ private void createShardWhenLockAvailable(
776781                shardLockRetryInterval ,
777782                e .getMessage ()
778783            );
784+             if  (level  == Level .WARN ) {
785+                 dumpHotThreads .run ();
786+             }
779787            // TODO could we instead subscribe to the shard lock and trigger the retry exactly when it is released rather than polling? 
780788            threadPool .scheduleUnlessShuttingDown (
781789                shardLockRetryInterval ,
@@ -813,6 +821,7 @@ private void createShardWhenLockAvailable(
813821                                shardLockRetryTimeout .millis (),
814822                                shardRouting 
815823                            );
824+                             dumpHotThreads .run ();
816825                            listener .onFailure (
817826                                new  ElasticsearchTimeoutException ("timed out while waiting to acquire shard lock for "  + shardRouting )
818827                            );
@@ -841,6 +850,7 @@ private void createShardWhenLockAvailable(
841850                            primaryTerm ,
842851                            iteration  + 1 ,
843852                            newDelayMillis ,
853+                             dumpHotThreads ,
844854                            listener 
845855                        );
846856
0 commit comments