@@ -15,7 +15,8 @@ const (
1515 hostKeyName = "host"
1616 portKeyName = "port"
1717
18- coordinatorKeyName = "coordinator"
18+ coordinatorKeyName = "coordinator"
19+ coordinatorsKeyName = "coordinators"
1920)
2021
2122var (
@@ -144,6 +145,16 @@ func createMetadataResponseSchemaVersions() []Schema {
144145 & SchemaTaggedFields {Name : "topic_metadata_tagged_fields" },
145146 )
146147
148+ topicMetadataSchema12 := NewSchema ("topic_metadata_schema12" ,
149+ & Mfield {Name : "error_code" , Ty : TypeInt16 },
150+ & Mfield {Name : "name" , Ty : TypeCompactNullableStr },
151+ & Mfield {Name : "topic_id" , Ty : TypeUuid },
152+ & Mfield {Name : "is_internal" , Ty : TypeBool },
153+ & CompactArray {Name : "partition_metadata" , Ty : partitionMetadataSchema9 },
154+ & Mfield {Name : "topic_authorized_operations" , Ty : TypeInt32 },
155+ & SchemaTaggedFields {Name : "topic_metadata_tagged_fields" },
156+ )
157+
147158 metadataResponseV1 := NewSchema ("metadata_response_v1" ,
148159 & Array {Name : brokersKeyName , Ty : metadataBrokerV1 },
149160 & Mfield {Name : "controller_id" , Ty : TypeInt32 },
@@ -222,7 +233,17 @@ func createMetadataResponseSchemaVersions() []Schema {
222233 & CompactArray {Name : "topic_metadata" , Ty : topicMetadataSchema10 },
223234 & SchemaTaggedFields {Name : "response_tagged_fields" },
224235 )
225- return []Schema {metadataResponseV0 , metadataResponseV1 , metadataResponseV2 , metadataResponseV3 , metadataResponseV4 , metadataResponseV5 , metadataResponseV6 , metadataResponseV7 , metadataResponseV8 , metadataResponseV9 , metadataResponseV10 , metadataResponseV11 }
236+
237+ metadataResponseV12 := NewSchema ("metadata_response_v12" ,
238+ & Mfield {Name : "throttle_time_ms" , Ty : TypeInt32 },
239+ & CompactArray {Name : brokersKeyName , Ty : metadataBrokerSchema9 },
240+ & Mfield {Name : "cluster_id" , Ty : TypeCompactNullableStr },
241+ & Mfield {Name : "controller_id" , Ty : TypeInt32 },
242+ & CompactArray {Name : "topic_metadata" , Ty : topicMetadataSchema12 },
243+ & SchemaTaggedFields {Name : "response_tagged_fields" },
244+ )
245+
246+ return []Schema {metadataResponseV0 , metadataResponseV1 , metadataResponseV2 , metadataResponseV3 , metadataResponseV4 , metadataResponseV5 , metadataResponseV6 , metadataResponseV7 , metadataResponseV8 , metadataResponseV9 , metadataResponseV10 , metadataResponseV11 , metadataResponseV12 }
226247}
227248
228249func createFindCoordinatorResponseSchemaVersions () []Schema {
@@ -260,7 +281,19 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
260281 & SchemaTaggedFields {Name : "response_tagged_fields" },
261282 )
262283
263- return []Schema {findCoordinatorResponseV0 , findCoordinatorResponseV1 , findCoordinatorResponseV2 , findCoordinatorResponseV3 }
284+ findCoordinatorCoordinatorsSchema4 := NewSchema ("find_coordinator_coordinators_schema4" ,
285+ & Mfield {Name : "key" , Ty : TypeCompactStr },
286+ & Mfield {Name : coordinatorKeyName , Ty : findCoordinatorBrokerSchema9 },
287+ & Mfield {Name : "error_code" , Ty : TypeInt16 },
288+ & Mfield {Name : "error_message" , Ty : TypeCompactNullableStr },
289+ & SchemaTaggedFields {"coordinators_tagged_fields" },
290+ )
291+ findCoordinatorResponseV4 := NewSchema ("find_coordinator_response_v4" ,
292+ & Mfield {Name : "throttle_time_ms" , Ty : TypeInt32 },
293+ & CompactArray {Name : coordinatorsKeyName , Ty : findCoordinatorCoordinatorsSchema4 },
294+ & SchemaTaggedFields {Name : "response_tagged_fields" },
295+ )
296+ return []Schema {findCoordinatorResponseV0 , findCoordinatorResponseV1 , findCoordinatorResponseV2 , findCoordinatorResponseV3 , findCoordinatorResponseV4 }
264297}
265298
266299func modifyMetadataResponse (decodedStruct * Struct , fn config.NetAddressMappingFunc ) error {
@@ -316,6 +349,25 @@ func modifyFindCoordinatorResponse(decodedStruct *Struct, fn config.NetAddressMa
316349 if fn == nil {
317350 return errors .New ("net address mapper must not be nil" )
318351 }
352+ coordinators := decodedStruct .Get (coordinatorsKeyName )
353+ if coordinators != nil {
354+ coordinatorsArray , ok := coordinators .([]interface {})
355+ if ! ok {
356+ return errors .New ("coordinators list not found" )
357+ }
358+ for _ , coordinatorElement := range coordinatorsArray {
359+ coordinatorStruct := coordinatorElement .(* Struct )
360+ if err := modifyCoordinator (coordinatorStruct , fn ); err != nil {
361+ return err
362+ }
363+ }
364+ return nil
365+ } else {
366+ return modifyCoordinator (decodedStruct , fn )
367+ }
368+ }
369+
370+ func modifyCoordinator (decodedStruct * Struct , fn config.NetAddressMappingFunc ) error {
319371 coordinator , ok := decodedStruct .Get (coordinatorKeyName ).(* Struct )
320372 if ! ok {
321373 return errors .New ("coordinator not found" )
0 commit comments