@@ -207,18 +207,20 @@ func formatByteRecord(schemaID int, valueBytes []byte) []byte {
207207 return recordValue
208208}
209209
210- func assertValueSerialized (t * testing.T , act []byte , valJSON []byte , schema * srclient.Schema ) {
211- require .NotEqual (t , act , valJSON )
210+ func assertValueSerialized (t * testing.T , act []byte , expJSON []byte , schema * srclient.Schema ) {
211+ require .NotEqual (t , expJSON , act )
212212
213213 actSchemaID := int (binary .BigEndian .Uint32 (act [1 :5 ]))
214214 codec , _ := goavro .NewCodecForStandardJSONFull (schema .Schema ())
215215 native , _ , _ := codec .NativeFromBinary (act [5 :])
216216 actJSON , _ := codec .TextualFromNative (nil , native )
217217 var actMap map [string ]any
218218 json .Unmarshal (actJSON , & actMap )
219+ var expMap map [string ]any
220+ json .Unmarshal (expJSON , & expMap )
219221
220222 require .Equal (t , schema .ID (), actSchemaID )
221- require .Equal (t , testValue1 , actMap )
223+ require .Equal (t , expMap , actMap )
222224}
223225
224226func TestSerializeValueCachingDisabled (t * testing.T ) {
@@ -340,6 +342,78 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
340342 assertValueSerialized (t , act , valJSON , schema )
341343 require .NoError (t , err )
342344 })
345+
346+ t .Run ("serialize with complex avro schema" , func (t * testing.T ) {
347+ testSchemaOcr := `{
348+ "type": "record",
349+ "name": "ocr_requested",
350+ "namespace": "foo.cmd.image_processing",
351+ "fields": [
352+ {
353+ "name": "id",
354+ "type": "string",
355+ "doc": "Idempotency key"
356+ },
357+ {
358+ "name": "document_metadata",
359+ "type": {
360+ "type": "record",
361+ "name": "DocumentMetadata",
362+ "fields": [
363+ {
364+ "name": "content_type",
365+ "type": "string"
366+ },
367+ {
368+ "name": "original_filename",
369+ "type": ["null", "string"],
370+ "default": null
371+ },
372+ {
373+ "name": "source",
374+ "type": {
375+ "type": "enum",
376+ "name": "DocumentSource",
377+ "symbols": [
378+ "Unknown",
379+ "Import",
380+ "PatientUpload",
381+ "UserUpload",
382+ "UserUploadFax"
383+ ]
384+ }
385+ },
386+ {
387+ "name": "type",
388+ "type": {
389+ "type": "enum",
390+ "name": "DocumentType",
391+ "symbols": ["Unknown", "InsuranceCard", "MiscReport"]
392+ }
393+ }
394+ ]
395+ }
396+ },
397+ {
398+ "name": "s3_path",
399+ "type": {
400+ "type": "record",
401+ "name": "S3Path",
402+ "fields": [
403+ { "name": "bucket", "type": "string" },
404+ { "name": "key", "type": "string" }
405+ ]
406+ }
407+ }
408+ ]
409+ }`
410+ schemaOcr , _ := registry .CreateSchema ("my-ocr-topic-value" , testSchemaOcr , srclient .Avro )
411+ valueOcr := map [string ]any {"id" : "123" , "document_metadata" : map [string ]any {"content_type" : "application/pdf" , "original_filename" : nil , "source" : "UserUpload" , "type" : "InsuranceCard" }, "s3_path" : map [string ]any {"bucket" : "test-bucket" , "key" : "test-key" }}
412+ valJSONOcr , _ := json .Marshal (valueOcr )
413+ act , err := k .SerializeValue ("my-ocr-topic" , valJSONOcr , map [string ]string {"valueSchemaType" : "Avro" })
414+ assertValueSerialized (t , act , valJSONOcr , schemaOcr )
415+ require .NoError (t , err )
416+ })
343417}
344418
345419func TestLatestSchemaCaching (t * testing.T ) {
0 commit comments