2525import org .apache .kafka .clients .admin .NewTopic ;
2626import org .apache .kafka .common .errors .TopicExistsException ;
2727
28- import java .nio .charset .StandardCharsets ;
28+ import com .automq .events .RebalanceSummaryEvent ;
29+ import com .automq .events .RequestErrorEvent ;
30+
2931import java .util .Collections ;
3032import java .util .List ;
3133import java .util .Map ;
@@ -67,21 +69,30 @@ public static void main(String[] args) throws Exception {
6769
6870 // Step 2: publish test events via ClusterEventPublisher
6971 try (ClusterEventPublisher publisher = new ClusterEventPublisher (bootstrapServers , Map .of ())) {
72+ RebalanceSummaryEvent rebalanceEvent = RebalanceSummaryEvent .newBuilder ()
73+ .setRebalanceId ("test-001" )
74+ .setTriggerReason ("load_imbalance" )
75+ .setPartitionCount (5 )
76+ .build ();
7077 publisher .publishEvent (
7178 "com.automq.ops.rebalance.summary" ,
7279 "/automq/broker/0" ,
7380 "rebalance-test-001" ,
74- "com.automq.events.RebalanceSummaryEvent" ,
75- "{\" rebalance_id\" :\" test-001\" ,\" trigger_reason\" :\" load_imbalance\" ,\" partition_count\" :5}"
76- .getBytes (StandardCharsets .UTF_8 ));
77-
81+ RebalanceSummaryEvent .getDescriptor ().getFullName (),
82+ rebalanceEvent .toByteArray ());
83+
84+ RequestErrorEvent requestErrorEvent = RequestErrorEvent .newBuilder ()
85+ .setApiKey (0 )
86+ .setErrorCode (29 )
87+ .setResource ("test-topic" )
88+ .setRps (42.0 )
89+ .build ();
7890 publisher .publishEvent (
7991 "com.automq.risk.request_error" ,
8092 "/automq/broker/0" ,
8193 "PRODUCE:test-topic" ,
82- "com.automq.events.RequestErrorEvent" ,
83- "{\" api_key\" :0,\" error_code\" :29,\" resource\" :\" test-topic\" ,\" rps\" :42.0}"
84- .getBytes (StandardCharsets .UTF_8 ));
94+ RequestErrorEvent .getDescriptor ().getFullName (),
95+ requestErrorEvent .toByteArray ());
8596
8697 System .out .println ("✓ Published 2 test events" );
8798 }
@@ -97,6 +108,7 @@ public static void main(String[] args) throws Exception {
97108 for (CloudEvent e : events ) {
98109 System .out .printf (" type=%-45s source=%-25s subject=%s%n" ,
99110 e .getType (), e .getSource (), e .getSubject ());
111+ printEventData (e );
100112 }
101113
102114 List <CloudEvent > rebalanceEvents = admin .describeClusterEvents (
@@ -118,4 +130,21 @@ public static void main(String[] args) throws Exception {
118130 System .out .println ("\n ✓ All assertions passed" );
119131 }
120132 }
133+
134+ private static void printEventData (CloudEvent event ) throws Exception {
135+ io .cloudevents .CloudEventData data = event .getData ();
136+ if (data == null ) return ;
137+ byte [] bytes = data .toBytes ();
138+ if (bytes == null ) return ;
139+ String schema = event .getDataSchema () != null ? event .getDataSchema ().toString () : "" ;
140+ if (schema .endsWith ("RebalanceSummaryEvent" )) {
141+ RebalanceSummaryEvent msg = RebalanceSummaryEvent .parseFrom (bytes );
142+ System .out .printf (" rebalance_id=%s trigger_reason=%s partition_count=%d%n" ,
143+ msg .getRebalanceId (), msg .getTriggerReason (), msg .getPartitionCount ());
144+ } else if (schema .endsWith ("RequestErrorEvent" )) {
145+ RequestErrorEvent msg = RequestErrorEvent .parseFrom (bytes );
146+ System .out .printf (" api_key=%d error_code=%d resource=%s rps=%.1f%n" ,
147+ msg .getApiKey (), msg .getErrorCode (), msg .getResource (), msg .getRps ());
148+ }
149+ }
121150}
0 commit comments