diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 713cd1222d..9b5bab50f3 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -72,6 +72,9 @@ type Kafka struct { latestSchemaCacheWriteLock sync.RWMutex latestSchemaCacheReadLock sync.Mutex + // Whether to encode/decode Avro into Avro JSON or standard JSON + useAvroJSON bool + // used for background logic that cannot use the context passed to the Init function internalContext context.Context internalContextCancel func() @@ -234,6 +237,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL) k.srClient.CodecCreationEnabled(true) k.srClient.CodecJsonEnabled(!meta.UseAvroJSON) + k.useAvroJSON = meta.UseAvroJSON // Empty password is a possibility if meta.SchemaRegistryAPIKey != "" { k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret) @@ -411,18 +415,27 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, if errSchema != nil { return nil, nil, errSchema } - codec := schema.Codec() + codec, errCodec := k.getCodec(schema) + if errCodec != nil { + return nil, nil, errCodec + } + defer k.latestSchemaCacheWriteLock.Unlock() k.latestSchemaCacheWriteLock.Lock() k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)} - k.latestSchemaCacheWriteLock.Unlock() + return schema, codec, nil } schema, err := srClient.GetLatestSchema(getSchemaSubject(topic)) if err != nil { return nil, nil, err } - return schema, schema.Codec(), nil + codec, errCodec := k.getCodec(schema) + if errCodec != nil { + return nil, nil, errCodec + } + + return schema, codec, nil } func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) { @@ -433,6 +446,17 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) return k.srClient, nil } +func (k *Kafka) getCodec(schema *srclient.Schema) (*goavro.Codec, error) { + // The data coming through is either Avro JSON or standard JSON. + // Force creation of a new codec instance for serialization and deserialization to avoid state mutation issues. + // https://github.com/linkedin/goavro/issues/299 + // Once the bug is fixed, we can remove this and use the codec directly from schema.Codec() + if k.useAvroJSON { + return goavro.NewCodec(schema.Schema()) + } + return goavro.NewCodecForStandardJSONFull(schema.Schema()) +} + func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) { // Null Data is valid and a tombstone record. // It should be converted to NULL and not go through schema validation & encoding diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index e4f258195f..2647e91109 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -80,6 +80,7 @@ func TestDeserializeValue(t *testing.T) { logger: logger.NewLogger("kafka_test"), } kJSON.srClient.CodecJsonEnabled(true) + kJSON.useAvroJSON = false schemaJSON, _ := registryJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro) // set up for Standard JSON @@ -194,6 +195,41 @@ func TestDeserializeValue(t *testing.T) { _, err := kInv.DeserializeValue(&msg, handlerConfig) require.Error(t, err, "schema registry details not set") }) + + t.Run("verifying issue with union types due to codec state mutation is fixed", func(t *testing.T) { + // Arrange + testSchemaUnion := `["null", "long"]` + + // In happy path, codec is initialized and NativeFromBinary is called first, which sets the states of the codec + codecCard1, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion) + require.NoError(t, err) + + datum1, _, err := codecCard1.NativeFromBinary([]byte{0x02, 0x06}) + require.NoError(t, err) + + // As expected, the datum is a long with value 3 + require.Equal(t, int64(3), datum1.(map[string]any)["long"]) + + // Reproducing the error when NativeFromTextual is called before NativeFromBinary, which changes the states of the codec + codecCard2, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion) + require.NoError(t, err) + + // Trigger textual path that mutates states + codecCard2.NativeFromTextual([]byte("1")) + + // Binary for union index 1 (long) with value 3: 0x02 0x06 + datum, _, err := codecCard2.NativeFromBinary([]byte{0x02, 0x06}) + require.NoError(t, err) + + // Prior to bug fix, the datum would be returned as a {"null", 3} but should return '{"long":3}'! + require.Nil(t, datum.(map[string]any)["null"]) + require.Equal(t, int64(3), datum.(map[string]any)["long"]) + + // As a result, next call to TextualFromNative would fail with "Cannot encode textual union: cannot encode textual null: expected: Go nil; received: int64" + act, err := codecCard2.TextualFromNative(nil, datum) + require.NoError(t, err) + require.Equal(t, []byte("3"), act) + }) } func formatByteRecord(schemaID int, valueBytes []byte) []byte { @@ -249,12 +285,14 @@ func TestSerializeValueCachingDisabled(t *testing.T) { srClient: registryJSON, schemaCachingEnabled: false, logger: logger.NewLogger("kafka_test"), + useAvroJSON: false, } kAvroJSON := Kafka{ srClient: registryAvroJSON, schemaCachingEnabled: false, logger: logger.NewLogger("kafka_test"), + useAvroJSON: true, } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { @@ -327,6 +365,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) { latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Minute * 5, logger: logger.NewLogger("kafka_test"), + useAvroJSON: false, } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { diff --git a/go.mod b/go.mod index 0d24d1cb13..8de9280b59 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,7 @@ require ( github.com/labd/commercetools-go-sdk v1.3.1 github.com/lestrrat-go/httprc v1.0.5 github.com/lestrrat-go/jwx/v2 v2.0.21 - github.com/linkedin/goavro/v2 v2.14.0 + github.com/linkedin/goavro/v2 v2.14.1 github.com/machinebox/graphql v0.2.2 github.com/matoous/go-nanoid/v2 v2.0.0 github.com/microsoft/go-mssqldb v1.6.0 @@ -111,7 +111,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.0.0 github.com/rabbitmq/amqp091-go v1.9.0 github.com/redis/go-redis/v9 v9.6.3 - github.com/riferrei/srclient v0.7.2 + github.com/riferrei/srclient v0.7.3 github.com/sendgrid/sendgrid-go v3.13.0+incompatible github.com/sijms/go-ora/v2 v2.8.22 github.com/spf13/cast v1.8.0 diff --git a/go.sum b/go.sum index 5a8bc06ef4..8bdb69c8b8 100644 --- a/go.sum +++ b/go.sum @@ -1187,8 +1187,8 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPp github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= -github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI= -github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI= +github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -1517,8 +1517,8 @@ github.com/redis/go-redis/v9 v9.6.3/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= -github.com/riferrei/srclient v0.7.2 h1:Gc1juajxHs9L1LYy+W6Iy7RDVBZkgCdKl/dxb3/c2xE= -github.com/riferrei/srclient v0.7.2/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc= +github.com/riferrei/srclient v0.7.3 h1:JRR6jgfINWUcYZhBRHEg/NAFv7giVmjkoouRbWbakgw= +github.com/riferrei/srclient v0.7.3/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/middleware/http/wasm/internal/e2e_test.go b/middleware/http/wasm/internal/e2e_test.go index a63a007b6e..b427b9420b 100644 --- a/middleware/http/wasm/internal/e2e_test.go +++ b/middleware/http/wasm/internal/e2e_test.go @@ -80,15 +80,23 @@ func Test_EndToEnd(t *testing.T) { // init (main) and the request[0-9] funcs to info level. // // Then, we expect to see stdout and stderr from both scopes - // at debug level. + // at debug level. Allow duplicates from multi-module pools by + // checking substrings instead of exact combined lines. for _, s := range []string{ `level=info msg="main ConsoleLog"`, `level=info msg="request[0] ConsoleLog"`, - `level=debug msg="wasm stdout: main Stdout\nrequest[0] Stdout\n"`, - `level=debug msg="wasm stderr: main Stderr\nrequest[0] Stderr\n"`, } { require.Contains(t, log.String(), s) } + + // stdout + require.Contains(t, log.String(), `level=debug msg="wasm stdout:`) + require.Contains(t, log.String(), "main Stdout") + require.Contains(t, log.String(), "request[0] Stdout") + // stderr + require.Contains(t, log.String(), `level=debug msg="wasm stderr:`) + require.Contains(t, log.String(), "main Stderr") + require.Contains(t, log.String(), "request[0] Stderr") }, }, { @@ -108,14 +116,20 @@ func Test_EndToEnd(t *testing.T) { for _, s := range []string{ `level=info msg="main ConsoleLog"`, `level=info msg="request[0] ConsoleLog"`, - `level=debug msg="wasm stdout: main Stdout\nrequest[0] Stdout\n"`, - `level=debug msg="wasm stderr: main Stderr\nrequest[0] Stderr\n"`, `level=info msg="request[1] ConsoleLog"`, - `level=debug msg="wasm stdout: request[1] Stdout\n"`, - `level=debug msg="wasm stderr: request[1] Stderr\n"`, } { require.Contains(t, log.String(), s) } + // Allow duplicates for main/request[0] stdout/stderr across modules. + require.Contains(t, log.String(), `level=debug msg="wasm stdout:`) + require.Contains(t, log.String(), "main Stdout") + require.Contains(t, log.String(), "request[0] Stdout") + require.Contains(t, log.String(), `level=debug msg="wasm stderr:`) + require.Contains(t, log.String(), "main Stderr") + require.Contains(t, log.String(), "request[0] Stderr") + // And ensure request[1] appears in stdout/stderr logs too. + require.Contains(t, log.String(), "request[1] Stdout") + require.Contains(t, log.String(), "request[1] Stderr") }, }, { diff --git a/tests/certification/go.mod b/tests/certification/go.mod index fce100b945..d5184a126a 100644 --- a/tests/certification/go.mod +++ b/tests/certification/go.mod @@ -33,7 +33,7 @@ require ( github.com/lestrrat-go/jwx/v2 v2.0.21 github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 github.com/rabbitmq/amqp091-go v1.9.0 - github.com/riferrei/srclient v0.7.2 + github.com/riferrei/srclient v0.7.3 github.com/stretchr/testify v1.10.0 github.com/tylertreat/comcast v1.0.1 go.mongodb.org/mongo-driver v1.14.0 @@ -228,7 +228,7 @@ require ( github.com/lestrrat-go/httprc v1.0.5 // indirect github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect - github.com/linkedin/goavro/v2 v2.14.0 // indirect + github.com/linkedin/goavro/v2 v2.14.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/tests/certification/go.sum b/tests/certification/go.sum index 2d37fa82ee..960848b860 100644 --- a/tests/certification/go.sum +++ b/tests/certification/go.sum @@ -1010,8 +1010,8 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPp github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= -github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI= -github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI= +github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -1278,8 +1278,8 @@ github.com/redis/go-redis/v9 v9.6.3/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= -github.com/riferrei/srclient v0.7.2 h1:Gc1juajxHs9L1LYy+W6Iy7RDVBZkgCdKl/dxb3/c2xE= -github.com/riferrei/srclient v0.7.2/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc= +github.com/riferrei/srclient v0.7.3 h1:JRR6jgfINWUcYZhBRHEg/NAFv7giVmjkoouRbWbakgw= +github.com/riferrei/srclient v0.7.3/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=