Skip to content

Commit 4fcfa7f

Browse files
nzkeithpassuied
andauthored
Fix regression in pubsub.kafka Avro message publication (dapr#4026)
Signed-off-by: Patrick Assuied <[email protected]> Signed-off-by: Keith Smith <[email protected]> Co-authored-by: Patrick Assuied <[email protected]>
1 parent d08a61a commit 4fcfa7f

File tree

6 files changed

+84
-7
lines changed

6 files changed

+84
-7
lines changed

common/component/kafka/kafka.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
226226
if meta.SchemaRegistryURL != "" {
227227
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
228228
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
229+
k.srClient.CodecCreationEnabled(true)
229230
k.srClient.CodecJsonEnabled(!meta.UseAvroJSON)
230231
// Empty password is a possibility
231232
if meta.SchemaRegistryAPIKey != "" {

common/component/kafka/kafka_test.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,18 +207,20 @@ func formatByteRecord(schemaID int, valueBytes []byte) []byte {
207207
return recordValue
208208
}
209209

210-
func assertValueSerialized(t *testing.T, act []byte, valJSON []byte, schema *srclient.Schema) {
211-
require.NotEqual(t, act, valJSON)
210+
func assertValueSerialized(t *testing.T, act []byte, expJSON []byte, schema *srclient.Schema) {
211+
require.NotEqual(t, expJSON, act)
212212

213213
actSchemaID := int(binary.BigEndian.Uint32(act[1:5]))
214214
codec, _ := goavro.NewCodecForStandardJSONFull(schema.Schema())
215215
native, _, _ := codec.NativeFromBinary(act[5:])
216216
actJSON, _ := codec.TextualFromNative(nil, native)
217217
var actMap map[string]any
218218
json.Unmarshal(actJSON, &actMap)
219+
var expMap map[string]any
220+
json.Unmarshal(expJSON, &expMap)
219221

220222
require.Equal(t, schema.ID(), actSchemaID)
221-
require.Equal(t, testValue1, actMap)
223+
require.Equal(t, expMap, actMap)
222224
}
223225

224226
func TestSerializeValueCachingDisabled(t *testing.T) {
@@ -340,6 +342,78 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
340342
assertValueSerialized(t, act, valJSON, schema)
341343
require.NoError(t, err)
342344
})
345+
346+
t.Run("serialize with complex avro schema", func(t *testing.T) {
347+
testSchemaOcr := `{
348+
"type": "record",
349+
"name": "ocr_requested",
350+
"namespace": "foo.cmd.image_processing",
351+
"fields": [
352+
{
353+
"name": "id",
354+
"type": "string",
355+
"doc": "Idempotency key"
356+
},
357+
{
358+
"name": "document_metadata",
359+
"type": {
360+
"type": "record",
361+
"name": "DocumentMetadata",
362+
"fields": [
363+
{
364+
"name": "content_type",
365+
"type": "string"
366+
},
367+
{
368+
"name": "original_filename",
369+
"type": ["null", "string"],
370+
"default": null
371+
},
372+
{
373+
"name": "source",
374+
"type": {
375+
"type": "enum",
376+
"name": "DocumentSource",
377+
"symbols": [
378+
"Unknown",
379+
"Import",
380+
"PatientUpload",
381+
"UserUpload",
382+
"UserUploadFax"
383+
]
384+
}
385+
},
386+
{
387+
"name": "type",
388+
"type": {
389+
"type": "enum",
390+
"name": "DocumentType",
391+
"symbols": ["Unknown", "InsuranceCard", "MiscReport"]
392+
}
393+
}
394+
]
395+
}
396+
},
397+
{
398+
"name": "s3_path",
399+
"type": {
400+
"type": "record",
401+
"name": "S3Path",
402+
"fields": [
403+
{ "name": "bucket", "type": "string" },
404+
{ "name": "key", "type": "string" }
405+
]
406+
}
407+
}
408+
]
409+
}`
410+
schemaOcr, _ := registry.CreateSchema("my-ocr-topic-value", testSchemaOcr, srclient.Avro)
411+
valueOcr := map[string]any{"id": "123", "document_metadata": map[string]any{"content_type": "application/pdf", "original_filename": nil, "source": "UserUpload", "type": "InsuranceCard"}, "s3_path": map[string]any{"bucket": "test-bucket", "key": "test-key"}}
412+
valJSONOcr, _ := json.Marshal(valueOcr)
413+
act, err := k.SerializeValue("my-ocr-topic", valJSONOcr, map[string]string{"valueSchemaType": "Avro"})
414+
assertValueSerialized(t, act, valJSONOcr, schemaOcr)
415+
require.NoError(t, err)
416+
})
343417
}
344418

345419
func TestLatestSchemaCaching(t *testing.T) {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ require (
9292
github.com/labd/commercetools-go-sdk v1.3.1
9393
github.com/lestrrat-go/httprc v1.0.5
9494
github.com/lestrrat-go/jwx/v2 v2.0.21
95-
github.com/linkedin/goavro/v2 v2.13.1
95+
github.com/linkedin/goavro/v2 v2.14.0
9696
github.com/machinebox/graphql v0.2.2
9797
github.com/matoous/go-nanoid/v2 v2.0.0
9898
github.com/microsoft/go-mssqldb v1.6.0

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,8 +1186,9 @@ github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod
11861186
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
11871187
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
11881188
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
1189-
github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I=
11901189
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1190+
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
1191+
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
11911192
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
11921193
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
11931194
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

tests/certification/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ require (
226226
github.com/lestrrat-go/httprc v1.0.5 // indirect
227227
github.com/lestrrat-go/iter v1.0.2 // indirect
228228
github.com/lestrrat-go/option v1.0.1 // indirect
229-
github.com/linkedin/goavro/v2 v2.13.1 // indirect
229+
github.com/linkedin/goavro/v2 v2.14.0 // indirect
230230
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
231231
github.com/magiconair/properties v1.8.7 // indirect
232232
github.com/mailru/easyjson v0.7.7 // indirect

tests/certification/go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,8 +1009,9 @@ github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod
10091009
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
10101010
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
10111011
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
1012-
github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I=
10131012
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1013+
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
1014+
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
10141015
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
10151016
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
10161017
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

0 commit comments

Comments
 (0)