1616
1717import java .net .InetAddress ;
1818import java .net .UnknownHostException ;
19+ import java .util .Arrays ;
20+ import java .util .Collections ;
21+ import java .util .HashMap ;
22+ import java .util .List ;
23+ import java .util .Map ;
1924import java .util .Properties ;
2025import java .util .UUID ;
26+ import java .util .concurrent .ExecutionException ;
2127import java .util .concurrent .LinkedBlockingQueue ;
2228import java .util .concurrent .RejectedExecutionException ;
2329import java .util .concurrent .ThreadPoolExecutor ;
2632import javax .annotation .PostConstruct ;
2733import javax .annotation .PreDestroy ;
2834
35+ import org .apache .kafka .clients .admin .AdminClient ;
36+ import org .apache .kafka .clients .admin .AdminClientConfig ;
37+ import org .apache .kafka .clients .admin .CreateTopicsResult ;
38+ import org .apache .kafka .clients .admin .NewTopic ;
2939import org .apache .kafka .clients .producer .KafkaProducer ;
3040import org .apache .kafka .clients .producer .ProducerConfig ;
3141import org .apache .kafka .clients .producer .ProducerRecord ;
42+ import org .apache .kafka .common .errors .TopicExistsException ;
3243import org .apache .kafka .common .serialization .StringSerializer ;
3344import org .apache .logging .log4j .LogManager ;
3445import org .apache .logging .log4j .Logger ;
@@ -68,12 +79,26 @@ public class KafkaEventPublisher extends ThreadPoolExecutor {
6879 private static final String TOPIC_HOST_EVENTS = "opencue.host.events" ;
6980 private static final String TOPIC_PROC_EVENTS = "opencue.proc.events" ;
7081
82+ // All topics managed by this publisher
83+ private static final List <String > ALL_TOPICS = Arrays .asList (TOPIC_JOB_EVENTS ,
84+ TOPIC_LAYER_EVENTS , TOPIC_FRAME_EVENTS , TOPIC_HOST_EVENTS , TOPIC_PROC_EVENTS );
85+
86+ // Default topic configuration
87+ private static final int DEFAULT_NUM_PARTITIONS = 3 ;
88+ private static final short DEFAULT_REPLICATION_FACTOR = 1 ;
89+ private static final String DEFAULT_RETENTION_MS = "604800000" ; // 7 days
90+ private static final String DEFAULT_CLEANUP_POLICY = "delete" ;
91+ private static final String DEFAULT_SEGMENT_MS = "86400000" ; // 1 day
92+ private static final String DEFAULT_SEGMENT_BYTES = "1073741824" ; // 1GB
93+
7194 @ Autowired
7295 private Environment env ;
7396
7497 private KafkaProducer <String , String > producer ;
98+ private AdminClient adminClient ;
7599 private JsonFormat .Printer jsonPrinter ;
76100 private String sourceCuebot ;
101+ private String bootstrapServers ;
77102 private boolean enabled = false ;
78103
79104 public KafkaEventPublisher () {
@@ -96,20 +121,88 @@ public void initialize() {
96121 sourceCuebot = "unknown" ;
97122 }
98123
124+ bootstrapServers = env .getProperty ("monitoring.kafka.bootstrap.servers" , "localhost:9092" );
125+
99126 jsonPrinter =
100127 JsonFormat .printer ().includingDefaultValueFields ().preservingProtoFieldNames ();
101128
129+ // Initialize admin client and create topics before starting the producer
130+ initializeAdminClient ();
131+ createTopics ();
102132 initializeKafkaProducer ();
103133
104134 logger .info ("Kafka event publishing initialized, source cuebot: {}" , sourceCuebot );
105135 }
106136
137+ private void initializeAdminClient () {
138+ Properties props = new Properties ();
139+ props .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
140+ props .put (AdminClientConfig .REQUEST_TIMEOUT_MS_CONFIG ,
141+ env .getProperty ("monitoring.kafka.admin.timeout.ms" , Integer .class , 30000 ));
142+ adminClient = AdminClient .create (props );
143+ logger .info ("Kafka AdminClient initialized" );
144+ }
145+
146+ /**
147+ * Creates all monitoring topics with proper configuration. Topics that already exist are
148+ * skipped.
149+ */
150+ private void createTopics () {
151+ int numPartitions = env .getProperty ("monitoring.kafka.topic.partitions" , Integer .class ,
152+ DEFAULT_NUM_PARTITIONS );
153+ short replicationFactor = env .getProperty ("monitoring.kafka.topic.replication.factor" ,
154+ Short .class , DEFAULT_REPLICATION_FACTOR );
155+ String retentionMs = env .getProperty ("monitoring.kafka.topic.retention.ms" ,
156+ DEFAULT_RETENTION_MS );
157+ String cleanupPolicy = env .getProperty ("monitoring.kafka.topic.cleanup.policy" ,
158+ DEFAULT_CLEANUP_POLICY );
159+ String segmentMs =
160+ env .getProperty ("monitoring.kafka.topic.segment.ms" , DEFAULT_SEGMENT_MS );
161+ String segmentBytes =
162+ env .getProperty ("monitoring.kafka.topic.segment.bytes" , DEFAULT_SEGMENT_BYTES );
163+
164+ // Topic configuration
165+ Map <String , String > topicConfig = new HashMap <>();
166+ topicConfig .put ("retention.ms" , retentionMs );
167+ topicConfig .put ("cleanup.policy" , cleanupPolicy );
168+ topicConfig .put ("segment.ms" , segmentMs );
169+ topicConfig .put ("segment.bytes" , segmentBytes );
170+
171+ for (String topicName : ALL_TOPICS ) {
172+ createTopic (topicName , numPartitions , replicationFactor , topicConfig );
173+ }
174+ }
175+
176+ /**
177+ * Creates a single topic with the specified configuration.
178+ */
179+ private void createTopic (String topicName , int numPartitions , short replicationFactor ,
180+ Map <String , String > config ) {
181+ NewTopic newTopic = new NewTopic (topicName , numPartitions , replicationFactor );
182+ newTopic .configs (config );
183+
184+ CreateTopicsResult result = adminClient .createTopics (Collections .singletonList (newTopic ));
185+
186+ try {
187+ result .values ().get (topicName ).get ();
188+ logger .info ("Topic '{}' created successfully with {} partitions, replication={}" ,
189+ topicName , numPartitions , replicationFactor );
190+ } catch (ExecutionException e ) {
191+ if (e .getCause () instanceof TopicExistsException ) {
192+ logger .info ("Topic '{}' already exists" , topicName );
193+ } else {
194+ logger .error ("Failed to create topic '{}': {}" , topicName , e .getMessage ());
195+ }
196+ } catch (InterruptedException e ) {
197+ Thread .currentThread ().interrupt ();
198+ logger .error ("Interrupted while creating topic '{}': {}" , topicName , e .getMessage ());
199+ }
200+ }
201+
107202 private void initializeKafkaProducer () {
108203 Properties props = new Properties ();
109204
110205 // Kafka broker configuration
111- String bootstrapServers =
112- env .getProperty ("monitoring.kafka.bootstrap.servers" , "localhost:9092" );
113206 props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
114207
115208 // Serialization
@@ -143,6 +236,9 @@ public void shutdown() {
143236 producer .flush ();
144237 producer .close ();
145238 }
239+ if (adminClient != null ) {
240+ adminClient .close ();
241+ }
146242 shutdownNow ();
147243 logger .info ("Kafka event publisher shut down" );
148244 }
0 commit comments