55 "fmt"
66 "os"
77 "path/filepath"
8+ "strconv"
89 "strings"
910 "sync"
1011 "testing"
@@ -22,13 +23,35 @@ type kafkaMessage struct {
2223 CommonStructs []kafkaStruct `json:"commonStructs"`
2324}
2425
26+ // flexInt unmarshals a JSON value that may be either a number or a string
27+ // containing a number. Kafka sometimes uses "1" instead of 1 for tag fields.
28+ type flexInt int
29+
30+ func (f * flexInt ) UnmarshalJSON (data []byte ) error {
31+ var n int
32+ if err := json .Unmarshal (data , & n ); err == nil {
33+ * f = flexInt (n )
34+ return nil
35+ }
36+ var s string
37+ if err := json .Unmarshal (data , & s ); err != nil {
38+ return fmt .Errorf ("cannot unmarshal %s into int or string" , string (data ))
39+ }
40+ n , err := strconv .Atoi (s )
41+ if err != nil {
42+ return fmt .Errorf ("cannot parse %q as int: %v" , s , err )
43+ }
44+ * f = flexInt (n )
45+ return nil
46+ }
47+
2548type kafkaField struct {
2649 Name string `json:"name"`
2750 Type string `json:"type"`
2851 Versions string `json:"versions"`
2952 NullableVersions string `json:"nullableVersions"`
3053 TaggedVersions string `json:"taggedVersions"`
31- Tag * int `json:"tag"`
54+ Tag * flexInt `json:"tag"`
3255 Default any `json:"default"`
3356 Fields []kafkaField `json:"fields"`
3457}
@@ -134,7 +157,7 @@ func initDSL(t *testing.T) {
134157 const enumsFile = "enums"
135158
136159 path := filepath .Join (dir , enumsFile )
137- f , err := os .ReadFile (path )
160+ f , err := os .ReadFile (path ) //nolint:gosec // reading known definitions directory
138161 if err != nil {
139162 t .Fatalf ("reading enums: %v" , err )
140163 }
@@ -149,7 +172,7 @@ func initDSL(t *testing.T) {
149172 continue
150173 }
151174 path := filepath .Join (dir , ent .Name ())
152- f , err := os .ReadFile (path )
175+ f , err := os .ReadFile (path ) //nolint:gosec // reading known definitions directory
153176 if err != nil {
154177 t .Fatalf ("reading %s: %v" , path , err )
155178 }
@@ -293,10 +316,7 @@ func resolvedJSONFields(f kafkaField, commons map[string]kafkaStruct) []kafkaFie
293316 return f .Fields
294317 }
295318 jt := f .Type
296- name := jt
297- if strings .HasPrefix (name , "[]" ) {
298- name = name [2 :]
299- }
319+ name := strings .TrimPrefix (jt , "[]" )
300320 if cs , ok := commons [name ]; ok {
301321 return cs .Fields
302322 }
@@ -310,7 +330,7 @@ func TestValidateDSLAgainstKafkaJSON(t *testing.T) {
310330 }
311331
312332 jsonDir := filepath .Join (kafkaDir , "clients" , "src" , "main" , "resources" , "common" , "message" )
313- if _ , err := os .Stat (jsonDir ); err != nil {
333+ if _ , err := os .Stat (jsonDir ); err != nil { //nolint:gosec // path from trusted env var
314334 t .Fatalf ("Kafka message dir not found at %s: %v" , jsonDir , err )
315335 }
316336
@@ -354,7 +374,7 @@ func TestValidateDSLAgainstKafkaJSON(t *testing.T) {
354374 if ! strings .HasSuffix (ent .Name (), ".json" ) {
355375 continue
356376 }
357- data , err := os .ReadFile (filepath .Join (jsonDir , ent .Name ()))
377+ data , err := os .ReadFile (filepath .Join (jsonDir , ent .Name ())) //nolint:gosec // reading from trusted KAFKA_DIR
358378 if err != nil {
359379 t .Fatalf ("reading %s: %v" , ent .Name (), err )
360380 }
@@ -477,7 +497,7 @@ func compareFieldsAtVersion(t *testing.T, msgName string, version, flexibleAt in
477497 if jf .Tag != nil {
478498 tvr := parseVersionRange (jf .TaggedVersions )
479499 if tvr .contains (version ) {
480- jsonTagged [* jf .Tag ] = jf
500+ jsonTagged [int ( * jf .Tag ) ] = jf
481501 continue
482502 }
483503 }
0 commit comments