18
18
import org .elasticsearch .cluster .block .ClusterBlockException ;
19
19
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
20
20
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
21
+ import org .elasticsearch .cluster .node .DiscoveryNode ;
21
22
import org .elasticsearch .cluster .routing .GroupShardsIterator ;
22
23
import org .elasticsearch .cluster .routing .ShardIterator ;
23
24
import org .elasticsearch .cluster .routing .ShardRouting ;
24
25
import org .elasticsearch .cluster .service .ClusterService ;
25
26
import org .elasticsearch .common .inject .Inject ;
26
27
import org .elasticsearch .common .io .stream .StreamInput ;
28
+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
27
29
import org .elasticsearch .index .engine .Engine ;
28
30
import org .elasticsearch .index .shard .IndexShard ;
29
31
import org .elasticsearch .index .shard .ShardId ;
36
38
import java .io .IOException ;
37
39
import java .util .ArrayList ;
38
40
import java .util .HashMap ;
41
+ import java .util .Iterator ;
42
+ import java .util .LinkedList ;
39
43
import java .util .List ;
40
44
import java .util .Map ;
45
+ import java .util .Queue ;
46
+ import java .util .concurrent .atomic .AtomicInteger ;
41
47
import java .util .concurrent .atomic .AtomicReferenceArray ;
42
48
43
49
public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastAction <
@@ -46,6 +52,7 @@ public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastActi
46
52
AnalyzeDiskUsageShardRequest ,
47
53
AnalyzeDiskUsageShardResponse > {
48
54
private final IndicesService indicesService ;
55
+ private final ThreadPool threadPool ;
49
56
50
57
@ Inject
51
58
public TransportAnalyzeIndexDiskUsageAction (
@@ -66,11 +73,92 @@ public TransportAnalyzeIndexDiskUsageAction(
66
73
ThreadPool .Names .ANALYZE
67
74
);
68
75
this .indicesService = indexServices ;
76
+ this .threadPool = transportService .getThreadPool ();
69
77
}
70
78
71
79
@ Override
72
80
protected void doExecute (Task task , AnalyzeIndexDiskUsageRequest request , ActionListener <AnalyzeIndexDiskUsageResponse > listener ) {
73
- super .doExecute (task , request , listener );
81
+ new LimitingRequestPerNodeBroadcastAction (task , request , listener , 5 ).start ();
82
+ }
83
+
84
+ private record ShardRequest (
85
+ DiscoveryNode node ,
86
+ AnalyzeDiskUsageShardRequest shardRequest ,
87
+ ActionListener <AnalyzeDiskUsageShardResponse > handler
88
+ ) {
89
+
90
+ }
91
+
92
+ final class LimitingRequestPerNodeBroadcastAction extends AsyncBroadcastAction {
93
+ private final Queue <ShardRequest > queue = new LinkedList <>();
94
+ private final Map <DiscoveryNode , AtomicInteger > sendingCounters = ConcurrentCollections .newConcurrentMap ();
95
+ private final int maxConcurrentRequestsPerNode ;
96
+
97
+ LimitingRequestPerNodeBroadcastAction (
98
+ Task task ,
99
+ AnalyzeIndexDiskUsageRequest request ,
100
+ ActionListener <AnalyzeIndexDiskUsageResponse > listener ,
101
+ int maxConcurrentRequestsPerNode
102
+ ) {
103
+ super (task , request , listener );
104
+ this .maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode ;
105
+ }
106
+
107
+ private void trySendRequests () {
108
+ assert Thread .holdsLock (this ) == false ;
109
+ final List <ShardRequest > readyRequests = new ArrayList <>();
110
+ synchronized (this ) {
111
+ final Iterator <ShardRequest > it = queue .iterator ();
112
+ while (it .hasNext ()) {
113
+ final ShardRequest r = it .next ();
114
+ final AtomicInteger sending = sendingCounters .computeIfAbsent (r .node , k -> new AtomicInteger ());
115
+ assert 0 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
116
+ if (sending .get () < maxConcurrentRequestsPerNode ) {
117
+ sending .incrementAndGet ();
118
+ readyRequests .add (r );
119
+ it .remove ();
120
+ }
121
+ }
122
+ }
123
+ if (readyRequests .isEmpty ()) {
124
+ return ;
125
+ }
126
+ final Thread sendingThread = Thread .currentThread ();
127
+ for (ShardRequest r : readyRequests ) {
128
+ super .sendShardRequest (
129
+ r .node ,
130
+ r .shardRequest ,
131
+ ActionListener .runAfter (r .handler , () -> onRequestResponded (sendingThread , r .node ))
132
+ );
133
+ }
134
+ }
135
+
136
+ private void onRequestResponded (Thread sendingThread , DiscoveryNode node ) {
137
+ final AtomicInteger sending = sendingCounters .get (node );
138
+ assert sending != null && 1 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
139
+ sending .decrementAndGet ();
140
+ // fork to avoid StackOverflow
141
+ if (sendingThread == Thread .currentThread ()) {
142
+ threadPool .generic ().execute (this ::trySendRequests );
143
+ } else {
144
+ trySendRequests ();
145
+ }
146
+ }
147
+
148
+ @ Override
149
+ protected synchronized void sendShardRequest (
150
+ DiscoveryNode node ,
151
+ AnalyzeDiskUsageShardRequest shardRequest ,
152
+ ActionListener <AnalyzeDiskUsageShardResponse > listener
153
+ ) {
154
+ queue .add (new ShardRequest (node , shardRequest , listener ));
155
+ }
156
+
157
+ @ Override
158
+ public void start () {
159
+ super .start ();
160
+ trySendRequests ();
161
+ }
74
162
}
75
163
76
164
@ Override
0 commit comments