Skip to content

Commit 1170bf8

Browse files
committed
Rolling concurrency
1 parent d3d7193 commit 1170bf8

File tree

2 files changed

+97
-75
lines changed

2 files changed

+97
-75
lines changed

operator/src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
import io.kubernetes.client.models.V1ObjectMeta;
99
import io.kubernetes.client.models.V1Pod;
1010
import io.kubernetes.client.util.Watch;
11+
import java.util.ArrayList;
12+
import java.util.Collection;
13+
import java.util.HashMap;
1114
import java.util.List;
1215
import java.util.Map;
13-
import java.util.concurrent.ConcurrentHashMap;
14-
import java.util.concurrent.ConcurrentMap;
1516
import java.util.concurrent.ThreadFactory;
1617
import java.util.concurrent.atomic.AtomicBoolean;
1718
import oracle.kubernetes.operator.TuningParameters.WatchTuning;
@@ -39,8 +40,24 @@ public class PodWatcher extends Watcher<V1Pod>
3940
private final WatchListener<V1Pod> listener;
4041

4142
// Map of Pod name to OnReady
42-
private final ConcurrentMap<String, OnReady> readyCallbackRegistrations =
43-
new ConcurrentHashMap<>();
43+
private final Map<String, Collection<OnReady>> readyCallbackRegistrations = new HashMap<>();
44+
45+
private void registerOnReady(String podName, OnReady onReady) {
46+
synchronized (readyCallbackRegistrations) {
47+
Collection<OnReady> col = readyCallbackRegistrations.get(podName);
48+
if (col == null) {
49+
col = new ArrayList<>();
50+
readyCallbackRegistrations.put(podName, col);
51+
}
52+
col.add(onReady);
53+
}
54+
}
55+
56+
private Collection<OnReady> retrieveOnReady(String podName) {
57+
synchronized (readyCallbackRegistrations) {
58+
return readyCallbackRegistrations.remove(podName);
59+
}
60+
}
4461

4562
/**
4663
* Factory for PodWatcher.
@@ -96,9 +113,11 @@ public void receivedResponse(Watch.Response<V1Pod> item) {
96113
Boolean isReady = !PodHelper.isDeleting(pod) && PodHelper.isReady(pod);
97114
String podName = pod.getMetadata().getName();
98115
if (isReady) {
99-
OnReady ready = readyCallbackRegistrations.remove(podName);
100-
if (ready != null) {
101-
ready.onReady();
116+
Collection<OnReady> col = retrieveOnReady(podName);
117+
if (col != null) {
118+
for (OnReady ready : col) {
119+
ready.onReady();
120+
}
102121
}
103122
}
104123
break;
@@ -148,7 +167,7 @@ public NextAction apply(Packet packet) {
148167
fiber.resume(packet);
149168
}
150169
};
151-
readyCallbackRegistrations.put(metadata.getName(), ready);
170+
registerOnReady(metadata.getName(), ready);
152171

153172
// Timing window -- pod may have come ready before registration for callback
154173
CallBuilderFactory factory =

operator/src/main/java/oracle/kubernetes/operator/helpers/RollingHelper.java

Lines changed: 70 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -176,84 +176,87 @@ public String getDetail() {
176176

177177
@Override
178178
public NextAction apply(Packet packet) {
179-
if (it.hasNext()) {
180-
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
181-
WlsDomainConfig config = (WlsDomainConfig) packet.get(ProcessingConstants.DOMAIN_TOPOLOGY);
182-
183-
// Refresh as this is constantly changing
184-
Domain dom = info.getDomain();
185-
// These are presently Ready servers
186-
List<String> availableServers = getReadyServers(info);
187-
188-
List<String> servers = new ArrayList<>();
189-
List<String> readyServers = new ArrayList<>();
190-
List<V1Pod> notReadyServers = new ArrayList<>();
191-
192-
Collection<StepAndPacket> serversThatCanRestartNow = new ArrayList<>();
193-
194-
int countReady = 0;
195-
WlsClusterConfig cluster = config != null ? config.getClusterConfig(clusterName) : null;
196-
if (cluster != null) {
197-
List<WlsServerConfig> serversConfigs = cluster.getServerConfigs();
198-
if (serversConfigs != null) {
199-
for (WlsServerConfig s : serversConfigs) {
200-
// figure out how many servers are currently ready
201-
String name = s.getName();
202-
if (availableServers.contains(name)) {
203-
readyServers.add(s.getName());
204-
countReady++;
205-
} else {
206-
V1Pod pod = info.getServerPod(name);
207-
if (pod != null) {
208-
notReadyServers.add(pod);
179+
synchronized (it) {
180+
if (it.hasNext()) {
181+
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
182+
WlsDomainConfig config =
183+
(WlsDomainConfig) packet.get(ProcessingConstants.DOMAIN_TOPOLOGY);
184+
185+
// Refresh as this is constantly changing
186+
Domain dom = info.getDomain();
187+
// These are presently Ready servers
188+
List<String> availableServers = getReadyServers(info);
189+
190+
List<String> servers = new ArrayList<>();
191+
List<String> readyServers = new ArrayList<>();
192+
List<V1Pod> notReadyServers = new ArrayList<>();
193+
194+
Collection<StepAndPacket> serversThatCanRestartNow = new ArrayList<>();
195+
196+
int countReady = 0;
197+
WlsClusterConfig cluster = config != null ? config.getClusterConfig(clusterName) : null;
198+
if (cluster != null) {
199+
List<WlsServerConfig> serversConfigs = cluster.getServerConfigs();
200+
if (serversConfigs != null) {
201+
for (WlsServerConfig s : serversConfigs) {
202+
// figure out how many servers are currently ready
203+
String name = s.getName();
204+
if (availableServers.contains(name)) {
205+
readyServers.add(s.getName());
206+
countReady++;
207+
} else {
208+
V1Pod pod = info.getServerPod(name);
209+
if (pod != null) {
210+
notReadyServers.add(pod);
211+
}
209212
}
210213
}
211214
}
212215
}
213-
}
214216

215-
// then add as many as possible next() entries leaving at least minimum cluster
216-
// availability
217-
while (countReady-- > dom.getMinAvailable(clusterName)) {
218-
StepAndPacket current = it.next();
219-
WlsServerConfig serverConfig =
220-
(WlsServerConfig) current.packet.get(ProcessingConstants.SERVER_SCAN);
221-
String serverName = null;
222-
if (serverConfig != null) {
223-
serverName = serverConfig.getName();
224-
} else if (config != null) {
225-
serverName = config.getAdminServerName();
226-
}
227-
if (serverName != null) {
228-
servers.add(serverName);
229-
}
230-
serversThatCanRestartNow.add(current);
231-
if (!it.hasNext()) {
232-
break;
217+
// then add as many as possible next() entries leaving at least minimum cluster
218+
// availability
219+
while (countReady-- > dom.getMinAvailable(clusterName)) {
220+
StepAndPacket current = it.next();
221+
WlsServerConfig serverConfig =
222+
(WlsServerConfig) current.packet.get(ProcessingConstants.SERVER_SCAN);
223+
String serverName = null;
224+
if (serverConfig != null) {
225+
serverName = serverConfig.getName();
226+
} else if (config != null) {
227+
serverName = config.getAdminServerName();
228+
}
229+
if (serverName != null) {
230+
servers.add(serverName);
231+
}
232+
serversThatCanRestartNow.add(current);
233+
if (!it.hasNext()) {
234+
break;
235+
}
233236
}
234-
}
235237

236-
if (serversThatCanRestartNow.isEmpty()) {
237-
// Not enough servers are ready to let us restart a server now
238-
if (!notReadyServers.isEmpty()) {
239-
PodAwaiterStepFactory pw = PodHelper.getPodAwaiterStepFactory(packet);
240-
Collection<StepAndPacket> waitForUnreadyServers = new ArrayList<>();
241-
for (V1Pod pod : notReadyServers) {
242-
waitForUnreadyServers.add(
243-
new StepAndPacket(pw.waitForReady(pod, null), packet.clone()));
244-
}
238+
if (serversThatCanRestartNow.isEmpty()) {
239+
// Not enough servers are ready to let us restart a server now
240+
if (!notReadyServers.isEmpty()) {
241+
PodAwaiterStepFactory pw = PodHelper.getPodAwaiterStepFactory(packet);
242+
Collection<StepAndPacket> waitForUnreadyServers = new ArrayList<>();
243+
for (V1Pod pod : notReadyServers) {
244+
waitForUnreadyServers.add(
245+
new StepAndPacket(pw.waitForReady(pod, null), packet.clone()));
246+
}
245247

246-
// Wait for at least one of the not-yet-ready servers to become ready
247-
return doForkAtLeastOne(this, packet, waitForUnreadyServers);
248-
} else {
249-
throw new IllegalStateException();
248+
// Wait for at least one of the not-yet-ready servers to become ready
249+
return doForkAtLeastOne(this, packet, waitForUnreadyServers);
250+
} else {
251+
throw new IllegalStateException();
252+
}
250253
}
251-
}
252254

253-
readyServers.removeAll(servers);
254-
LOGGER.info(MessageKeys.ROLLING_SERVERS, dom.getDomainUID(), servers, readyServers);
255+
readyServers.removeAll(servers);
256+
LOGGER.info(MessageKeys.ROLLING_SERVERS, dom.getDomainUID(), servers, readyServers);
255257

256-
return doNext(new ServersThatCanRestartNowStep(serversThatCanRestartNow, this), packet);
258+
return doNext(new ServersThatCanRestartNowStep(serversThatCanRestartNow, this), packet);
259+
}
257260
}
258261

259262
return doNext(packet);

0 commit comments

Comments
 (0)