Skip to content

Commit 227c097

Browse files
committed
Actively add pod info, don't run queries.
1 parent ce4d0bf commit 227c097

File tree

8 files changed

+287
-41
lines changed

8 files changed

+287
-41
lines changed

org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/Constants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.jdrupes.vmoperator.common;
2020

21-
// TODO: Auto-generated Javadoc
2221
/**
2322
* Some constants.
2423
*/

org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmExtraData.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ public String nodeName() {
7575
return nodeName;
7676
}
7777

78+
/**
79+
* Gets the node addresses.
80+
*
81+
* @return the nodeAddresses
82+
*/
83+
public List<String> nodeAddresses() {
84+
return nodeAddresses;
85+
}
86+
7887
/**
7988
* Sets the reset count.
8089
*
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* VM-Operator
3+
* Copyright (C) 2023 Michael N. Lipp
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as
7+
* published by the Free Software Foundation, either version 3 of the
8+
* License, or (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*/
18+
19+
package org.jdrupes.vmoperator.manager.events;
20+
21+
import io.kubernetes.client.openapi.models.V1Pod;
22+
import org.jdrupes.vmoperator.common.K8sObserver;
23+
import org.jgrapes.core.Channel;
24+
import org.jgrapes.core.Components;
25+
import org.jgrapes.core.Event;
26+
27+
/**
28+
* Indicates a change in a pod that runs a VM.
29+
*/
30+
public class PodChanged extends Event<Void> {
31+
32+
private final V1Pod pod;
33+
private final K8sObserver.ResponseType type;
34+
35+
/**
36+
* Instantiates a new VM changed event.
37+
*
38+
* @param pod the pod
39+
* @param type the type
40+
*/
41+
public PodChanged(V1Pod pod, K8sObserver.ResponseType type) {
42+
this.pod = pod;
43+
this.type = type;
44+
}
45+
46+
/**
47+
* Gets the pod.
48+
*
49+
* @return the pod
50+
*/
51+
public V1Pod pod() {
52+
return pod;
53+
}
54+
55+
/**
56+
* Returns the type.
57+
*
58+
* @return the type
59+
*/
60+
public K8sObserver.ResponseType type() {
61+
return type;
62+
}
63+
64+
@Override
65+
public String toString() {
66+
StringBuilder builder = new StringBuilder();
67+
builder.append(Components.objectName(this)).append(" [")
68+
.append(pod.getMetadata().getName()).append(' ').append(type);
69+
if (channels() != null) {
70+
builder.append(", channels=").append(Channel.toString(channels()));
71+
}
72+
builder.append(']');
73+
return builder.toString();
74+
}
75+
}

org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/VmChannel.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.jdrupes.vmoperator.common.K8sClient;
2222
import org.jdrupes.vmoperator.common.VmDefinition;
2323
import org.jgrapes.core.Channel;
24+
import org.jgrapes.core.Event;
2425
import org.jgrapes.core.EventPipeline;
2526
import org.jgrapes.core.Subchannel.DefaultSubchannel;
2627

@@ -104,6 +105,19 @@ public EventPipeline pipeline() {
104105
return pipeline;
105106
}
106107

108+
/**
109+
* Fire the given event on this channel, using the associated
110+
* {@link #pipeline()}.
111+
*
112+
* @param <T> the generic type
113+
* @param event the event
114+
* @return the t
115+
*/
116+
public <T extends Event<?>> T fire(T event) {
117+
pipeline.fire(event, this);
118+
return event;
119+
}
120+
107121
/**
108122
* Returns the API client.
109123
*
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/logging.properties

org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public Controller(Channel componentChannel) {
113113
// attach(new ServiceMonitor(channel()).channelManager(chanMgr));
114114
attach(new Reconciler(channel()));
115115
attach(new PoolMonitor(channel()));
116+
attach(new PodMonitor(channel(), chanMgr));
116117
}
117118

118119
/**
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* VM-Operator
3+
* Copyright (C) 2025 Michael N. Lipp
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as
7+
* published by the Free Software Foundation, either version 3 of the
8+
* License, or (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*/
18+
19+
package org.jdrupes.vmoperator.manager;
20+
21+
import io.kubernetes.client.openapi.ApiException;
22+
import io.kubernetes.client.openapi.models.V1Pod;
23+
import io.kubernetes.client.openapi.models.V1PodList;
24+
import io.kubernetes.client.util.Watch.Response;
25+
import io.kubernetes.client.util.generic.options.ListOptions;
26+
import java.io.IOException;
27+
import java.time.Duration;
28+
import java.time.Instant;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.logging.Level;
33+
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
34+
import static org.jdrupes.vmoperator.common.Constants.VM_OP_NAME;
35+
import org.jdrupes.vmoperator.common.K8sClient;
36+
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
37+
import org.jdrupes.vmoperator.common.K8sV1PodStub;
38+
import org.jdrupes.vmoperator.manager.events.ChannelDictionary;
39+
import org.jdrupes.vmoperator.manager.events.PodChanged;
40+
import org.jdrupes.vmoperator.manager.events.VmChannel;
41+
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
42+
import org.jgrapes.core.Channel;
43+
import org.jgrapes.core.annotation.Handler;
44+
45+
/**
46+
* Watches for changes of pods that run VMs.
47+
*/
48+
public class PodMonitor extends AbstractMonitor<V1Pod, V1PodList, VmChannel> {
49+
50+
private final ChannelDictionary<String, VmChannel, ?> channelDictionary;
51+
52+
private final Map<String, PendingChange> pendingChanges
53+
= new ConcurrentHashMap<>();
54+
55+
/**
56+
* Instantiates a new pod monitor.
57+
*
58+
* @param componentChannel the component channel
59+
* @param channelDictionary the channel dictionary
60+
*/
61+
public PodMonitor(Channel componentChannel,
62+
ChannelDictionary<String, VmChannel, ?> channelDictionary) {
63+
super(componentChannel, V1Pod.class, V1PodList.class);
64+
this.channelDictionary = channelDictionary;
65+
context(K8sV1PodStub.CONTEXT);
66+
ListOptions options = new ListOptions();
67+
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
68+
+ "app.kubernetes.io/component=" + APP_NAME + ","
69+
+ "app.kubernetes.io/managed-by=" + VM_OP_NAME);
70+
options(options);
71+
}
72+
73+
@Override
74+
protected void prepareMonitoring() throws IOException, ApiException {
75+
client(new K8sClient());
76+
}
77+
78+
@Override
79+
protected void handleChange(K8sClient client, Response<V1Pod> change) {
80+
String vmName = change.object.getMetadata().getLabels()
81+
.get("app.kubernetes.io/instance");
82+
if (vmName == null) {
83+
return;
84+
}
85+
var channel = channelDictionary.channel(vmName).orElse(null);
86+
var responseType = ResponseType.valueOf(change.type);
87+
if (channel != null && channel.vmDefinition() != null) {
88+
pendingChanges.remove(vmName);
89+
channel.fire(new PodChanged(change.object, responseType));
90+
return;
91+
}
92+
93+
// VM definition not available yet, may happen during startup
94+
if (responseType == ResponseType.DELETED) {
95+
return;
96+
}
97+
purgePendingChanges();
98+
logger.finer(() -> "Add pending pod change for " + vmName);
99+
pendingChanges.put(vmName, new PendingChange(Instant.now(), change));
100+
}
101+
102+
private void purgePendingChanges() {
103+
Instant tooOld = Instant.now().minus(Duration.ofMinutes(15));
104+
for (var itr = pendingChanges.entrySet().iterator(); itr.hasNext();) {
105+
var change = itr.next();
106+
if (change.getValue().from().isBefore(tooOld)) {
107+
itr.remove();
108+
logger.finer(
109+
() -> "Cleaned pending pod change for " + change.getKey());
110+
}
111+
}
112+
}
113+
114+
/**
115+
* Check for pending changes.
116+
*
117+
* @param event the event
118+
* @param channel the channel
119+
*/
120+
@Handler
121+
public void onVmDefChanged(VmDefChanged event, VmChannel channel) {
122+
Optional.ofNullable(pendingChanges.remove(event.vmDefinition().name()))
123+
.map(PendingChange::change).ifPresent(change -> {
124+
logger.finer(() -> "Firing pending pod change for "
125+
+ event.vmDefinition().name());
126+
channel.fire(new PodChanged(change.object,
127+
ResponseType.valueOf(change.type)));
128+
if (logger.isLoggable(Level.FINER)
129+
&& pendingChanges.isEmpty()) {
130+
logger.finer("No pending pod changes left.");
131+
}
132+
});
133+
}
134+
135+
private record PendingChange(Instant from, Response<V1Pod> change) {
136+
}
137+
138+
}

0 commit comments

Comments
 (0)