Skip to content

Commit c5fa467

Browse files
committed
NMS-19366: Add kafka-producer-topics command to show topics status
This command will show kafka connectivity and topics status
1 parent 1b88b0a commit c5fa467

File tree

2 files changed

+251
-0
lines changed

2 files changed

+251
-0
lines changed

docs/modules/reference/pages/karaf-shell/karaf-shell.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ This list is not exhaustive and only covers the most commonly used commands.
379379
|opennms:kafka-push-topology-edges
380380
|Pushes all of the related topology edges to the configured topic
381381
|opennms-kafka-producer
382+
|opennms:kafka-producer-topics
383+
|Show status of all Kafka producer topics with connectivity check
384+
|opennms-kafka-producer
382385
|opennms:kafka-ipc-topics
383386
|Show status of all Kafka IPC topics (RPC, Sink, Twin)
384387
|opennms-core-ipc-kafka
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to The OpenNMS Group, Inc (TOG) under one or more
3+
* contributor license agreements. See the LICENSE.md file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership.
6+
*
7+
* TOG licenses this file to You under the GNU Affero General
8+
* Public License Version 3 (the "License") or (at your option)
9+
* any later version. You may not use this file except in
10+
* compliance with the License. You may obtain a copy of the
11+
* License at:
12+
*
13+
* https://www.gnu.org/licenses/agpl-3.0.txt
14+
*
15+
* Unless required by applicable law or agreed to in writing,
16+
* software distributed under the License is distributed on an
17+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
18+
* either express or implied. See the License for the specific
19+
* language governing permissions and limitations under the
20+
* License.
21+
*/
22+
package org.opennms.features.kafka.producer.shell;
23+
24+
import java.io.IOException;
25+
import java.util.Dictionary;
26+
import java.util.HashMap;
27+
import java.util.LinkedHashMap;
28+
import java.util.Map;
29+
import java.util.Properties;
30+
import java.util.Set;
31+
32+
import org.apache.kafka.clients.admin.AdminClientConfig;
33+
import org.apache.karaf.shell.api.action.Action;
34+
import org.apache.karaf.shell.api.action.Command;
35+
import org.apache.karaf.shell.api.action.Option;
36+
import org.apache.karaf.shell.api.action.lifecycle.Reference;
37+
import org.apache.karaf.shell.api.action.lifecycle.Service;
38+
import org.opennms.core.ipc.common.kafka.Utils;
39+
import org.opennms.features.kafka.producer.KafkaProducerManager;
40+
import org.osgi.service.cm.Configuration;
41+
import org.osgi.service.cm.ConfigurationAdmin;
42+
43+
/**
44+
* Karaf shell command that displays the status of all Kafka producer topics.
45+
* Shows topics with their configuration PID and existence status.
46+
* Supports per-topic Kafka configurations (different Kafka clusters per topic).
47+
*/
48+
@Command(scope = "opennms", name = "kafka-producer-topics", description = "Show status of all Kafka producer topics.")
49+
@Service
50+
public class KafkaProducerTopics implements Action {
51+
52+
private static final int DEFAULT_TIMEOUT = 5000;
53+
54+
// Status indicators
55+
private static final String STATUS_OK = "[OK]";
56+
private static final String STATUS_MISSING = "[MISSING]";
57+
private static final String STATUS_NOT_CONFIGURED = "[NOT CONFIGURED]";
58+
59+
// Producer configuration PID
60+
private static final String PRODUCER_CONFIG_PID = "org.opennms.features.kafka.producer";
61+
62+
// Topic property names (from blueprint-kafka-producer.xml)
63+
private static final String EVENT_TOPIC_PROP = "eventTopic";
64+
private static final String ALARM_TOPIC_PROP = "alarmTopic";
65+
private static final String NODE_TOPIC_PROP = "nodeTopic";
66+
private static final String METRIC_TOPIC_PROP = "metricTopic";
67+
private static final String TOPOLOGY_VERTEX_TOPIC_PROP = "topologyVertexTopic";
68+
private static final String TOPOLOGY_EDGE_TOPIC_PROP = "topologyEdgeTopic";
69+
private static final String ALARM_FEEDBACK_TOPIC_PROP = "alarmFeedbackTopic";
70+
71+
// Default topic names
72+
private static final String DEFAULT_EVENT_TOPIC = "events";
73+
private static final String DEFAULT_ALARM_TOPIC = "alarms";
74+
private static final String DEFAULT_NODE_TOPIC = "nodes";
75+
private static final String DEFAULT_METRIC_TOPIC = "metrics";
76+
private static final String DEFAULT_TOPOLOGY_VERTEX_TOPIC = "vertices";
77+
private static final String DEFAULT_TOPOLOGY_EDGE_TOPIC = "edges";
78+
private static final String DEFAULT_ALARM_FEEDBACK_TOPIC = "alarmFeedback";
79+
80+
@Reference
81+
private ConfigurationAdmin configAdmin;
82+
83+
@Option(name = "-t", aliases = "--timeout", description = "Connection timeout for Kafka Server (ms)")
84+
private int timeout;
85+
86+
@Override
87+
public Object execute() throws Exception {
88+
System.out.println("\nKafka Producer Topics Status");
89+
System.out.println("============================\n");
90+
91+
// Get producer topic configuration (optional - uses defaults if not present)
92+
Properties producerConfig = getProducerConfig();
93+
94+
// Build map of topic property -> topic name (use defaults if config not present)
95+
Map<String, String> topicNames = new LinkedHashMap<>();
96+
topicNames.put(EVENT_TOPIC_PROP, getTopicName(producerConfig, EVENT_TOPIC_PROP, DEFAULT_EVENT_TOPIC));
97+
topicNames.put(ALARM_TOPIC_PROP, getTopicName(producerConfig, ALARM_TOPIC_PROP, DEFAULT_ALARM_TOPIC));
98+
topicNames.put(NODE_TOPIC_PROP, getTopicName(producerConfig, NODE_TOPIC_PROP, DEFAULT_NODE_TOPIC));
99+
topicNames.put(METRIC_TOPIC_PROP, getTopicName(producerConfig, METRIC_TOPIC_PROP, DEFAULT_METRIC_TOPIC));
100+
topicNames.put(TOPOLOGY_VERTEX_TOPIC_PROP, getTopicName(producerConfig, TOPOLOGY_VERTEX_TOPIC_PROP, DEFAULT_TOPOLOGY_VERTEX_TOPIC));
101+
topicNames.put(TOPOLOGY_EDGE_TOPIC_PROP, getTopicName(producerConfig, TOPOLOGY_EDGE_TOPIC_PROP, DEFAULT_TOPOLOGY_EDGE_TOPIC));
102+
topicNames.put(ALARM_FEEDBACK_TOPIC_PROP, getTopicName(producerConfig, ALARM_FEEDBACK_TOPIC_PROP, DEFAULT_ALARM_FEEDBACK_TOPIC));
103+
104+
// Map topic property to its Kafka client PID
105+
Map<String, String> topicToPid = new LinkedHashMap<>();
106+
topicToPid.put(EVENT_TOPIC_PROP, getEffectivePid(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID));
107+
topicToPid.put(ALARM_TOPIC_PROP, getEffectivePid(KafkaProducerManager.ALARMS_KAFKA_CLIENT_PID));
108+
topicToPid.put(NODE_TOPIC_PROP, getEffectivePid(KafkaProducerManager.NODES_KAFKA_CLIENT_PID));
109+
topicToPid.put(METRIC_TOPIC_PROP, getEffectivePid(KafkaProducerManager.METRICS_KAFKA_CLIENT_PID));
110+
topicToPid.put(TOPOLOGY_VERTEX_TOPIC_PROP, getEffectivePid(KafkaProducerManager.TOPOLOGY_KAFKA_CLIENT_PID));
111+
topicToPid.put(TOPOLOGY_EDGE_TOPIC_PROP, getEffectivePid(KafkaProducerManager.TOPOLOGY_KAFKA_CLIENT_PID));
112+
topicToPid.put(ALARM_FEEDBACK_TOPIC_PROP, getEffectivePid(KafkaProducerManager.ALARM_FEEDBACK_KAFKA_CLIENT_PID));
113+
114+
// Get unique PIDs and check connectivity for each
115+
Map<String, Set<String>> pidToTopics = new HashMap<>();
116+
Map<String, Boolean> pidConnectivity = new HashMap<>();
117+
118+
for (String pid : topicToPid.values()) {
119+
if (pid != null && !pidToTopics.containsKey(pid)) {
120+
Properties kafkaConfig = getKafkaClientConfig(pid);
121+
if (kafkaConfig != null && kafkaConfig.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
122+
try {
123+
Set<String> existingTopics = Utils.getTopics(kafkaConfig);
124+
pidToTopics.put(pid, existingTopics);
125+
pidConnectivity.put(pid, true);
126+
} catch (Exception e) {
127+
pidToTopics.put(pid, Set.of());
128+
pidConnectivity.put(pid, false);
129+
}
130+
}
131+
}
132+
}
133+
134+
// Print Kafka connectivity status
135+
System.out.println("Kafka Connectivity (per configuration):");
136+
if (pidConnectivity.isEmpty()) {
137+
System.out.printf(" %-60s %s%n", "No Kafka client configuration found", STATUS_MISSING);
138+
return null;
139+
}
140+
for (Map.Entry<String, Boolean> entry : pidConnectivity.entrySet()) {
141+
String pid = entry.getKey();
142+
boolean connected = entry.getValue();
143+
String displayPid = pid.equals(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID) ? pid + " (global)" : pid;
144+
System.out.printf(" %-60s %s%n", displayPid, connected ? STATUS_OK : STATUS_MISSING);
145+
}
146+
System.out.println();
147+
148+
// Print producer topics status
149+
System.out.println("Producer Topics:");
150+
int existCount = 0;
151+
int totalCount = 0;
152+
153+
for (Map.Entry<String, String> entry : topicNames.entrySet()) {
154+
String topicProp = entry.getKey();
155+
String topicName = entry.getValue();
156+
String pid = topicToPid.get(topicProp);
157+
158+
Set<String> existingTopics = pid != null ? pidToTopics.getOrDefault(pid, Set.of()) : Set.of();
159+
boolean exists = existingTopics.contains(topicName);
160+
String status = exists ? STATUS_OK : STATUS_NOT_CONFIGURED;
161+
162+
System.out.printf(" %-50s %s%n", topicName, status);
163+
164+
if (exists) {
165+
existCount++;
166+
}
167+
totalCount++;
168+
}
169+
170+
// Summary
171+
System.out.println();
172+
System.out.printf("Summary: %d of %d topics exist%n", existCount, totalCount);
173+
174+
return null;
175+
}
176+
177+
private Properties getProducerConfig() {
178+
try {
179+
Configuration config = configAdmin.getConfiguration(PRODUCER_CONFIG_PID, null);
180+
if (config != null && config.getProperties() != null) {
181+
Properties props = new Properties();
182+
Dictionary<String, Object> dict = config.getProperties();
183+
var keys = dict.keys();
184+
while (keys.hasMoreElements()) {
185+
String key = keys.nextElement();
186+
Object value = dict.get(key);
187+
if (value != null) {
188+
props.setProperty(key, value.toString());
189+
}
190+
}
191+
return props;
192+
}
193+
} catch (IOException e) {
194+
// Configuration not found
195+
}
196+
return null;
197+
}
198+
199+
private Properties getKafkaClientConfig(String pid) {
200+
try {
201+
Configuration config = configAdmin.getConfiguration(pid, null);
202+
if (config != null && config.getProperties() != null) {
203+
Properties props = new Properties();
204+
Dictionary<String, Object> dict = config.getProperties();
205+
var keys = dict.keys();
206+
while (keys.hasMoreElements()) {
207+
String key = keys.nextElement();
208+
Object value = dict.get(key);
209+
if (value != null) {
210+
props.setProperty(key, value.toString());
211+
}
212+
}
213+
214+
// Set timeout
215+
int effectiveTimeout = timeout > 0 ? timeout : DEFAULT_TIMEOUT;
216+
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(effectiveTimeout));
217+
218+
return props;
219+
}
220+
} catch (IOException e) {
221+
// Configuration not found
222+
}
223+
return null;
224+
}
225+
226+
private String getEffectivePid(String topicSpecificPid) {
227+
// Check if topic-specific config exists with bootstrap.servers
228+
Properties topicConfig = getKafkaClientConfig(topicSpecificPid);
229+
if (topicConfig != null && topicConfig.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
230+
return topicSpecificPid;
231+
}
232+
233+
// Fall back to global config
234+
Properties globalConfig = getKafkaClientConfig(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID);
235+
if (globalConfig != null && globalConfig.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
236+
return KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID;
237+
}
238+
239+
return null;
240+
}
241+
242+
private String getTopicName(Properties config, String property, String defaultValue) {
243+
if (config == null) {
244+
return defaultValue;
245+
}
246+
return config.getProperty(property, defaultValue);
247+
}
248+
}

0 commit comments

Comments
 (0)