18
18
import org .elasticsearch .cluster .block .ClusterBlockException ;
19
19
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
20
20
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
21
+ import org .elasticsearch .cluster .node .DiscoveryNode ;
21
22
import org .elasticsearch .cluster .routing .GroupShardsIterator ;
22
23
import org .elasticsearch .cluster .routing .ShardIterator ;
23
24
import org .elasticsearch .cluster .routing .ShardRouting ;
24
25
import org .elasticsearch .cluster .service .ClusterService ;
25
26
import org .elasticsearch .common .inject .Inject ;
26
27
import org .elasticsearch .common .io .stream .StreamInput ;
28
+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
27
29
import org .elasticsearch .index .engine .Engine ;
28
30
import org .elasticsearch .index .shard .IndexShard ;
29
31
import org .elasticsearch .index .shard .ShardId ;
36
38
import java .io .IOException ;
37
39
import java .util .ArrayList ;
38
40
import java .util .HashMap ;
41
+ import java .util .Iterator ;
42
+ import java .util .LinkedList ;
39
43
import java .util .List ;
40
44
import java .util .Map ;
45
+ import java .util .Queue ;
46
+ import java .util .concurrent .atomic .AtomicInteger ;
41
47
import java .util .concurrent .atomic .AtomicReferenceArray ;
42
48
43
49
public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastAction <
@@ -46,6 +52,7 @@ public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastActi
46
52
AnalyzeDiskUsageShardRequest ,
47
53
AnalyzeDiskUsageShardResponse > {
48
54
private final IndicesService indicesService ;
55
+ private final ThreadPool threadPool ;
49
56
50
57
@ Inject
51
58
public TransportAnalyzeIndexDiskUsageAction (
@@ -66,11 +73,96 @@ public TransportAnalyzeIndexDiskUsageAction(
66
73
ThreadPool .Names .ANALYZE
67
74
);
68
75
this .indicesService = indexServices ;
76
+ this .threadPool = transportService .getThreadPool ();
69
77
}
70
78
71
79
@ Override
72
80
protected void doExecute (Task task , AnalyzeIndexDiskUsageRequest request , ActionListener <AnalyzeIndexDiskUsageResponse > listener ) {
73
- super .doExecute (task , request , listener );
81
+ new LimitingRequestPerNodeBroadcastAction (task , request , listener , 5 ).start ();
82
+ }
83
+
84
+ private static class ShardRequest {
85
+ private final DiscoveryNode node ;
86
+ private final AnalyzeDiskUsageShardRequest shardRequest ;
87
+ private final ActionListener <AnalyzeDiskUsageShardResponse > handler ;
88
+
89
+ ShardRequest (DiscoveryNode node , AnalyzeDiskUsageShardRequest shardRequest , ActionListener <AnalyzeDiskUsageShardResponse > handler ) {
90
+ this .node = node ;
91
+ this .shardRequest = shardRequest ;
92
+ this .handler = handler ;
93
+ }
94
+ }
95
+
96
+ final class LimitingRequestPerNodeBroadcastAction extends AsyncBroadcastAction {
97
+ private final Queue <ShardRequest > queue = new LinkedList <>();
98
+ private final Map <DiscoveryNode , AtomicInteger > sendingCounters = ConcurrentCollections .newConcurrentMap ();
99
+ private final int maxConcurrentRequestsPerNode ;
100
+
101
+ LimitingRequestPerNodeBroadcastAction (
102
+ Task task ,
103
+ AnalyzeIndexDiskUsageRequest request ,
104
+ ActionListener <AnalyzeIndexDiskUsageResponse > listener ,
105
+ int maxConcurrentRequestsPerNode
106
+ ) {
107
+ super (task , request , listener );
108
+ this .maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode ;
109
+ }
110
+
111
+ private void trySendRequests () {
112
+ assert Thread .holdsLock (this ) == false ;
113
+ final List <ShardRequest > readyRequests = new ArrayList <>();
114
+ synchronized (this ) {
115
+ final Iterator <ShardRequest > it = queue .iterator ();
116
+ while (it .hasNext ()) {
117
+ final ShardRequest r = it .next ();
118
+ final AtomicInteger sending = sendingCounters .computeIfAbsent (r .node , k -> new AtomicInteger ());
119
+ assert 0 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
120
+ if (sending .get () < maxConcurrentRequestsPerNode ) {
121
+ sending .incrementAndGet ();
122
+ readyRequests .add (r );
123
+ it .remove ();
124
+ }
125
+ }
126
+ }
127
+ if (readyRequests .isEmpty ()) {
128
+ return ;
129
+ }
130
+ final Thread sendingThread = Thread .currentThread ();
131
+ for (ShardRequest r : readyRequests ) {
132
+ super .sendShardRequest (
133
+ r .node ,
134
+ r .shardRequest ,
135
+ ActionListener .runAfter (r .handler , () -> onRequestResponded (sendingThread , r .node ))
136
+ );
137
+ }
138
+ }
139
+
140
+ private void onRequestResponded (Thread sendingThread , DiscoveryNode node ) {
141
+ final AtomicInteger sending = sendingCounters .get (node );
142
+ assert sending != null && 1 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
143
+ sending .decrementAndGet ();
144
+ // fork to avoid StackOverflow
145
+ if (sendingThread == Thread .currentThread ()) {
146
+ threadPool .generic ().execute (this ::trySendRequests );
147
+ } else {
148
+ trySendRequests ();
149
+ }
150
+ }
151
+
152
+ @ Override
153
+ protected synchronized void sendShardRequest (
154
+ DiscoveryNode node ,
155
+ AnalyzeDiskUsageShardRequest shardRequest ,
156
+ ActionListener <AnalyzeDiskUsageShardResponse > listener
157
+ ) {
158
+ queue .add (new ShardRequest (node , shardRequest , listener ));
159
+ }
160
+
161
+ @ Override
162
+ public void start () {
163
+ super .start ();
164
+ trySendRequests ();
165
+ }
74
166
}
75
167
76
168
@ Override
0 commit comments