5
5
package oracle .kubernetes .operator .helpers ;
6
6
7
7
import io .kubernetes .client .models .V1Pod ;
8
- import java .util .ArrayList ;
9
- import java .util .Collection ;
10
- import java .util .HashMap ;
11
- import java .util .Iterator ;
12
- import java .util .List ;
13
- import java .util .Map ;
14
- import oracle .kubernetes .operator .PodAwaiterStepFactory ;
8
+ import java .util .*;
9
+ import java .util .concurrent .ConcurrentLinkedQueue ;
15
10
import oracle .kubernetes .operator .ProcessingConstants ;
16
11
import oracle .kubernetes .operator .logging .LoggingFacade ;
17
12
import oracle .kubernetes .operator .logging .LoggingFactory ;
@@ -74,7 +69,7 @@ public NextAction apply(Packet packet) {
74
69
List <String > availableServers = getReadyServers (info );
75
70
76
71
Collection <StepAndPacket > serversThatCanRestartNow = new ArrayList <>();
77
- Map <String , Collection <StepAndPacket >> clusteredRestarts = new HashMap <>();
72
+ Map <String , Queue <StepAndPacket >> clusteredRestarts = new HashMap <>();
78
73
79
74
List <String > servers = new ArrayList <>();
80
75
for (Map .Entry <String , StepAndPacket > entry : rolling .entrySet ()) {
@@ -96,9 +91,9 @@ public NextAction apply(Packet packet) {
96
91
}
97
92
98
93
// clustered server
99
- Collection <StepAndPacket > cr = clusteredRestarts .get (clusterName );
94
+ Queue <StepAndPacket > cr = clusteredRestarts .get (clusterName );
100
95
if (cr == null ) {
101
- cr = new ArrayList <>();
96
+ cr = new ConcurrentLinkedQueue <>();
102
97
clusteredRestarts .put (clusterName , cr );
103
98
}
104
99
cr .add (entry .getValue ());
@@ -116,7 +111,7 @@ public NextAction apply(Packet packet) {
116
111
}
117
112
118
113
if (!clusteredRestarts .isEmpty ()) {
119
- for (Map .Entry <String , Collection <StepAndPacket >> entry : clusteredRestarts .entrySet ()) {
114
+ for (Map .Entry <String , Queue <StepAndPacket >> entry : clusteredRestarts .entrySet ()) {
120
115
work .add (
121
116
new StepAndPacket (
122
117
new RollSpecificClusterStep (entry .getKey (), entry .getValue (), null ), packet ));
@@ -160,13 +155,13 @@ private static List<String> getReadyServers(DomainPresenceInfo info) {
160
155
161
156
private static class RollSpecificClusterStep extends Step {
162
157
private final String clusterName ;
163
- private final Iterator <StepAndPacket > it ;
158
+ private final Queue <StepAndPacket > servers ;
164
159
165
160
public RollSpecificClusterStep (
166
- String clusterName , Collection <StepAndPacket > clusteredServerRestarts , Step next ) {
161
+ String clusterName , Queue <StepAndPacket > clusteredServerRestarts , Step next ) {
167
162
super (next );
168
163
this .clusterName = clusterName ;
169
- it = clusteredServerRestarts . iterator () ;
164
+ servers = clusteredServerRestarts ;
170
165
}
171
166
172
167
@ Override
@@ -176,89 +171,58 @@ public String getDetail() {
176
171
177
172
@ Override
178
173
public NextAction apply (Packet packet ) {
179
- synchronized (it ) {
180
- if (it .hasNext ()) {
181
- DomainPresenceInfo info = packet .getSPI (DomainPresenceInfo .class );
182
- WlsDomainConfig config =
183
- (WlsDomainConfig ) packet .get (ProcessingConstants .DOMAIN_TOPOLOGY );
184
-
185
- // Refresh as this is constantly changing
186
- Domain dom = info .getDomain ();
187
- // These are presently Ready servers
188
- List <String > availableServers = getReadyServers (info );
189
-
190
- List <String > servers = new ArrayList <>();
191
- List <String > readyServers = new ArrayList <>();
192
- List <V1Pod > notReadyServers = new ArrayList <>();
193
-
194
- Collection <StepAndPacket > serversThatCanRestartNow = new ArrayList <>();
195
-
196
- int countReady = 0 ;
197
- WlsClusterConfig cluster = config != null ? config .getClusterConfig (clusterName ) : null ;
198
- if (cluster != null ) {
199
- List <WlsServerConfig > serversConfigs = cluster .getServerConfigs ();
200
- if (serversConfigs != null ) {
201
- for (WlsServerConfig s : serversConfigs ) {
202
- // figure out how many servers are currently ready
203
- String name = s .getName ();
204
- if (availableServers .contains (name )) {
205
- readyServers .add (s .getName ());
206
- countReady ++;
207
- } else {
208
- V1Pod pod = info .getServerPod (name );
209
- if (pod != null ) {
210
- notReadyServers .add (pod );
211
- }
212
- }
213
- }
214
- }
215
- }
174
+ DomainPresenceInfo info = packet .getSPI (DomainPresenceInfo .class );
175
+ WlsDomainConfig config = (WlsDomainConfig ) packet .get (ProcessingConstants .DOMAIN_TOPOLOGY );
216
176
217
- // then add as many as possible next() entries leaving at least minimum cluster
218
- // availability
219
- while (countReady -- > dom .getMinAvailable (clusterName )) {
220
- StepAndPacket current = it .next ();
221
- WlsServerConfig serverConfig =
222
- (WlsServerConfig ) current .packet .get (ProcessingConstants .SERVER_SCAN );
223
- String serverName = null ;
224
- if (serverConfig != null ) {
225
- serverName = serverConfig .getName ();
226
- } else if (config != null ) {
227
- serverName = config .getAdminServerName ();
228
- }
229
- if (serverName != null ) {
230
- servers .add (serverName );
231
- }
232
- serversThatCanRestartNow .add (current );
233
- if (!it .hasNext ()) {
234
- break ;
235
- }
236
- }
177
+ // Refresh as this is constantly changing
178
+ Domain dom = info .getDomain ();
179
+ // These are presently Ready servers
180
+ List <String > availableServers = getReadyServers (info );
237
181
238
- if (serversThatCanRestartNow .isEmpty ()) {
239
- // Not enough servers are ready to let us restart a server now
240
- if (!notReadyServers .isEmpty ()) {
241
- PodAwaiterStepFactory pw = PodHelper .getPodAwaiterStepFactory (packet );
242
- Collection <StepAndPacket > waitForUnreadyServers = new ArrayList <>();
243
- for (V1Pod pod : notReadyServers ) {
244
- waitForUnreadyServers .add (
245
- new StepAndPacket (pw .waitForReady (pod , null ), packet .clone ()));
246
- }
247
-
248
- // Wait for at least one of the not-yet-ready servers to become ready
249
- return doForkAtLeastOne (this , packet , waitForUnreadyServers );
250
- } else {
251
- throw new IllegalStateException ();
182
+ List <String > readyServers = new ArrayList <>();
183
+
184
+ int countReady = 0 ;
185
+ WlsClusterConfig cluster = config != null ? config .getClusterConfig (clusterName ) : null ;
186
+ if (cluster != null ) {
187
+ List <WlsServerConfig > serversConfigs = cluster .getServerConfigs ();
188
+ if (serversConfigs != null ) {
189
+ for (WlsServerConfig s : serversConfigs ) {
190
+ // figure out how many servers are currently ready
191
+ String name = s .getName ();
192
+ if (availableServers .contains (name )) {
193
+ readyServers .add (s .getName ());
194
+ countReady ++;
252
195
}
253
196
}
197
+ }
198
+ }
254
199
255
- readyServers .removeAll (servers );
256
- LOGGER .info (MessageKeys .ROLLING_SERVERS , dom .getDomainUID (), servers , readyServers );
200
+ LOGGER .info (MessageKeys .ROLLING_SERVERS , dom .getDomainUID (), servers , readyServers );
257
201
258
- return doNext (new ServersThatCanRestartNowStep (serversThatCanRestartNow , this ), packet );
259
- }
202
+ int countToRestartNow = Math .max (1 , countReady - dom .getMinAvailable (clusterName ));
203
+ Collection <StepAndPacket > restarts = new ArrayList <>();
204
+ for (int i = 0 ; i < countToRestartNow ; i ++) {
205
+ restarts .add (new StepAndPacket (new RestartOneClusteredServerStep (servers , null ), packet ));
260
206
}
207
+ return doForkJoin (getNext (), packet , restarts );
208
+ }
209
+ }
210
+
211
+ private static class RestartOneClusteredServerStep extends Step {
212
+ private final Queue <StepAndPacket > servers ;
261
213
214
+ public RestartOneClusteredServerStep (Queue <StepAndPacket > servers , Step next ) {
215
+ super (next );
216
+ this .servers = servers ;
217
+ }
218
+
219
+ @ Override
220
+ public NextAction apply (Packet packet ) {
221
+ StepAndPacket serverToRestart = servers .poll ();
222
+ if (serverToRestart != null ) {
223
+ Collection <StepAndPacket > col = Collections .singleton (serverToRestart );
224
+ return doForkJoin (this , packet , col );
225
+ }
262
226
return doNext (packet );
263
227
}
264
228
}
0 commit comments