Skip to content

Commit c23e006

Browse files
authored
Merge branch 'main' into dapr-state-store-clickhouse
2 parents fa0f160 + de92aa7 commit c23e006

File tree

11 files changed

+209
-85
lines changed

11 files changed

+209
-85
lines changed

bindings/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,13 @@ metadata:
352352
It allows sending headers with special characters that are usually not allowed in HTTP headers.
353353
example: "true"
354354
default: "false"
355+
- name: useAvroJSON
356+
type: bool
357+
required: false
358+
description: |
359+
Enables Avro JSON schema for serialization. Only applicable when the subscription uses valueSchemaType=Avro
360+
example: "true"
361+
default: "false"
355362
- name: compression
356363
type: string
357364
required: false

common/component/kafka/kafka.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
223223
}
224224
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
225225
k.consumeRetryInterval = meta.ConsumeRetryInterval
226-
227226
if meta.SchemaRegistryURL != "" {
228227
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
229228
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
229+
k.srClient.CodecJsonEnabled(!meta.UseAvroJSON)
230230
// Empty password is a possibility
231231
if meta.SchemaRegistryAPIKey != "" {
232232
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
@@ -364,12 +364,7 @@ func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config Subscri
364364
if err != nil {
365365
return nil, err
366366
}
367-
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
368-
// Use this specific codec instead.
369-
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
370-
if err != nil {
371-
return nil, err
372-
}
367+
codec := schema.Codec() // The value returned in Avro JSON format
373368
native, _, err := codec.NativeFromBinary(message.Value[5:])
374369
if err != nil {
375370
return nil, err
@@ -405,12 +400,8 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
405400
if errSchema != nil {
406401
return nil, nil, errSchema
407402
}
408-
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
409-
// Since standard json is passed from dapr, it is needed.
410-
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
411-
if errCodec != nil {
412-
return nil, nil, errCodec
413-
}
403+
codec := schema.Codec()
404+
414405
k.latestSchemaCacheWriteLock.Lock()
415406
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
416407
k.latestSchemaCacheWriteLock.Unlock()
@@ -420,12 +411,7 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
420411
if err != nil {
421412
return nil, nil, err
422413
}
423-
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
424-
if err != nil {
425-
return nil, nil, err
426-
}
427-
428-
return schema, codec, nil
414+
return schema, schema.Codec(), nil
429415
}
430416

431417
func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {

common/component/kafka/kafka_test.go

Lines changed: 117 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"encoding/json"
66
"errors"
7+
"math/big"
78
"testing"
89
"time"
910

@@ -51,57 +52,93 @@ func TestGetValueSchemaType(t *testing.T) {
5152
}
5253

5354
var (
54-
testSchema1 = `{"type": "record", "name": "cupcake", "fields": [{"name": "flavor", "type": "string"}, {"name": "created_date", "type": ["null",{"type": "long","logicalType": "timestamp-millis"}],"default": null}]}`
55-
testValue1 = map[string]interface{}{"flavor": "chocolate", "created_date": float64(time.Now().UnixMilli())}
56-
invValue = map[string]string{"xxx": "chocolate"}
55+
now = time.Now()
56+
testSchema1 = `{
57+
"type": "record",
58+
"name": "cupcake",
59+
"fields": [
60+
{"name": "flavor", "type": "string"},
61+
{"name": "weight", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 2}},
62+
{"name": "created_date", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}
63+
]
64+
}`
65+
testValue1 = map[string]any{"flavor": "chocolate", "weight": "100.24", "created_date": float64(now.UnixMilli())}
66+
testValue1AvroJSON = map[string]any{"flavor": "chocolate", "weight": big.NewRat(10024, 100), "created_date": goavro.Union("long.timestamp-millis", float64(now.UnixMilli()))}
67+
invValue = map[string]string{"xxx": "chocolate"}
5768
)
5869

5970
func TestDeserializeValue(t *testing.T) {
60-
registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
61-
schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
6271
handlerConfig := SubscriptionHandlerConfig{
6372
IsBulkSubscribe: false,
6473
ValueSchemaType: Avro,
6574
}
66-
k := Kafka{
67-
srClient: registry,
75+
76+
registryJSON := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
77+
kJSON := Kafka{
78+
srClient: registryJSON,
6879
schemaCachingEnabled: true,
6980
logger: logger.NewLogger("kafka_test"),
7081
}
82+
kJSON.srClient.CodecJsonEnabled(true)
83+
schemaJSON, _ := registryJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
7184

72-
schemaIDBytes := make([]byte, 4)
73-
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) //nolint:gosec
74-
75-
valJSON, _ := json.Marshal(testValue1)
85+
// set up for Standard JSON
7686
codec, _ := goavro.NewCodecForStandardJSONFull(testSchema1)
87+
valJSON, _ := json.Marshal(testValue1)
7788
native, _, _ := codec.NativeFromTextual(valJSON)
78-
valueBytes, _ := codec.BinaryFromNative(nil, native)
89+
valueBytesFromJSON, _ := codec.BinaryFromNative(nil, native)
90+
recordValueFromJSON := formatByteRecord(schemaJSON.ID(), valueBytesFromJSON)
7991

80-
var recordValue []byte
81-
recordValue = append(recordValue, byte(0))
82-
recordValue = append(recordValue, schemaIDBytes...)
83-
recordValue = append(recordValue, valueBytes...)
92+
// setup for Avro JSON
93+
registryAvroJSON := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
94+
kAvroJSON := Kafka{
95+
srClient: registryAvroJSON,
96+
schemaCachingEnabled: true,
97+
logger: logger.NewLogger("kafka_test"),
98+
}
99+
kAvroJSON.srClient.CodecJsonEnabled(false)
100+
schemaAvroJSON, _ := registryAvroJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
101+
102+
codecAvroJSON, _ := goavro.NewCodec(testSchema1)
103+
valueBytesFromAvroJSON, _ := codecAvroJSON.BinaryFromNative(nil, testValue1AvroJSON)
104+
valueAvroJSON, _ := codecAvroJSON.TextualFromNative(nil, testValue1AvroJSON)
105+
recordValueFromAvroJSON := formatByteRecord(schemaAvroJSON.ID(), valueBytesFromAvroJSON)
84106

85107
t.Run("Schema found, return value", func(t *testing.T) {
86108
msg := sarama.ConsumerMessage{
87109
Key: []byte("my_key"),
88-
Value: recordValue,
110+
Value: recordValueFromJSON,
89111
Topic: "my-topic",
90112
}
91-
act, err := k.DeserializeValue(&msg, handlerConfig)
113+
act, err := kJSON.DeserializeValue(&msg, handlerConfig)
92114
var actMap map[string]any
93115
json.Unmarshal(act, &actMap)
94116
require.Equal(t, testValue1, actMap)
95117
require.NoError(t, err)
96118
})
97119

120+
t.Run("Schema found and useAvroJson, return value as Avro Json", func(t *testing.T) {
121+
msg := sarama.ConsumerMessage{
122+
Key: []byte("my_key"),
123+
Value: recordValueFromAvroJSON,
124+
Topic: "my-topic",
125+
}
126+
127+
actText, err := kAvroJSON.DeserializeValue(&msg, handlerConfig)
128+
// test if flaky if comparing the textual version. Convert to native for comparison
129+
exp, _, _ := codecAvroJSON.NativeFromTextual(valueAvroJSON)
130+
act, _, _ := codecAvroJSON.NativeFromTextual(actText)
131+
require.Equal(t, exp, act)
132+
require.NoError(t, err)
133+
})
134+
98135
t.Run("Data null, return as JSON null", func(t *testing.T) {
99136
msg := sarama.ConsumerMessage{
100137
Key: []byte("my_key"),
101138
Value: nil,
102139
Topic: "my-topic",
103140
}
104-
act, err := k.DeserializeValue(&msg, handlerConfig)
141+
act, err := kJSON.DeserializeValue(&msg, handlerConfig)
105142
require.Equal(t, []byte("null"), act)
106143
require.NoError(t, err)
107144
})
@@ -112,7 +149,7 @@ func TestDeserializeValue(t *testing.T) {
112149
Value: []byte("xxxx"),
113150
Topic: "my-topic",
114151
}
115-
_, err := k.DeserializeValue(&msg, handlerConfig)
152+
_, err := kJSON.DeserializeValue(&msg, handlerConfig)
116153

117154
require.Error(t, err)
118155
})
@@ -123,23 +160,23 @@ func TestDeserializeValue(t *testing.T) {
123160
Value: []byte("xxxxx"),
124161
Topic: "my-topic",
125162
}
126-
_, err := k.DeserializeValue(&msg, handlerConfig)
163+
_, err := kJSON.DeserializeValue(&msg, handlerConfig)
127164

128165
require.Error(t, err)
129166
})
130167

131168
t.Run("Invalid data, return error", func(t *testing.T) {
132169
var invalidVal []byte
133170
invalidVal = append(invalidVal, byte(0))
134-
invalidVal = append(invalidVal, schemaIDBytes...)
171+
invalidVal = append(invalidVal, recordValueFromJSON[0:5]...)
135172
invalidVal = append(invalidVal, []byte("xxx")...)
136173

137174
msg := sarama.ConsumerMessage{
138175
Key: []byte("my_key"),
139176
Value: invalidVal,
140177
Topic: "my-topic",
141178
}
142-
_, err := k.DeserializeValue(&msg, handlerConfig)
179+
_, err := kJSON.DeserializeValue(&msg, handlerConfig)
143180

144181
require.Error(t, err)
145182
})
@@ -151,14 +188,25 @@ func TestDeserializeValue(t *testing.T) {
151188
}
152189
msg := sarama.ConsumerMessage{
153190
Key: []byte("my_key"),
154-
Value: recordValue,
191+
Value: recordValueFromJSON,
155192
Topic: "my-topic",
156193
}
157194
_, err := kInv.DeserializeValue(&msg, handlerConfig)
158195
require.Error(t, err, "schema registry details not set")
159196
})
160197
}
161198

199+
func formatByteRecord(schemaID int, valueBytes []byte) []byte {
200+
schemaIDBytes := make([]byte, 4)
201+
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaID)) //nolint:gosec
202+
203+
var recordValue []byte
204+
recordValue = append(recordValue, byte(0))
205+
recordValue = append(recordValue, schemaIDBytes...)
206+
recordValue = append(recordValue, valueBytes...)
207+
return recordValue
208+
}
209+
162210
func assertValueSerialized(t *testing.T, act []byte, valJSON []byte, schema *srclient.Schema) {
163211
require.NotEqual(t, act, valJSON)
164212

@@ -174,81 +222,101 @@ func assertValueSerialized(t *testing.T, act []byte, valJSON []byte, schema *src
174222
}
175223

176224
func TestSerializeValueCachingDisabled(t *testing.T) {
177-
registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
178-
schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
225+
registryJSON := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
226+
registryJSON.CodecJsonEnabled(true)
227+
schemaJSON, _ := registryJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
179228

180-
k := Kafka{
181-
srClient: registry,
229+
schemaIDBytes := make([]byte, 4)
230+
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaJSON.ID())) //nolint:gosec
231+
232+
// set up for Avro JSON
233+
registryAvroJSON := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
234+
registryAvroJSON.CodecJsonEnabled(false)
235+
codecAvroJSON, _ := goavro.NewCodec(testSchema1)
236+
schemaAvroJSON, _ := registryAvroJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
237+
schemaAvroJSONIDBytes := make([]byte, 4)
238+
binary.BigEndian.PutUint32(schemaAvroJSONIDBytes, uint32(schemaAvroJSON.ID())) //nolint:gosec
239+
valueBytesFromAvroJSON, _ := codecAvroJSON.BinaryFromNative(nil, testValue1AvroJSON)
240+
valueAvroJSON, _ := codecAvroJSON.TextualFromNative(nil, testValue1AvroJSON)
241+
var recordValueFromAvroJSON []byte
242+
recordValueFromAvroJSON = append(recordValueFromAvroJSON, byte(0))
243+
recordValueFromAvroJSON = append(recordValueFromAvroJSON, schemaAvroJSONIDBytes...)
244+
recordValueFromAvroJSON = append(recordValueFromAvroJSON, valueBytesFromAvroJSON...)
245+
246+
kJSON := Kafka{
247+
srClient: registryJSON,
248+
schemaCachingEnabled: false,
249+
logger: logger.NewLogger("kafka_test"),
250+
}
251+
252+
kAvroJSON := Kafka{
253+
srClient: registryAvroJSON,
182254
schemaCachingEnabled: false,
183255
logger: logger.NewLogger("kafka_test"),
184256
}
185257

186258
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
187259
valJSON, _ := json.Marshal(testValue1)
188-
189-
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{})
190-
260+
act, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{})
191261
require.JSONEq(t, string(valJSON), string(act))
192262
require.NoError(t, err)
193263
})
194264

195265
t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) {
196266
valJSON, _ := json.Marshal(testValue1)
197-
198-
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "None"})
199-
267+
act, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "None"})
200268
require.JSONEq(t, string(valJSON), string(act))
201269
require.NoError(t, err)
202270
})
203271

204272
t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) {
205273
valJSON, _ := json.Marshal(testValue1)
206-
207-
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "NONE"})
208-
274+
act, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "NONE"})
209275
require.JSONEq(t, string(valJSON), string(act))
210276
require.NoError(t, err)
211277
})
212278

213279
t.Run("valueSchemaType invalid, return error", func(t *testing.T) {
214280
valJSON, _ := json.Marshal(testValue1)
215-
216-
_, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "xx"})
217-
281+
_, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "xx"})
218282
require.Error(t, err, "error parsing schema type. 'xx' is not a supported value")
219283
})
220284

221285
t.Run("schema found, serialize value as Avro binary", func(t *testing.T) {
222286
valJSON, _ := json.Marshal(testValue1)
223-
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
224-
assertValueSerialized(t, act, valJSON, schema)
287+
act, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
288+
assertValueSerialized(t, act, valJSON, schemaJSON)
225289
require.NoError(t, err)
226290
})
227291

228-
t.Run("value published 'null', no error", func(t *testing.T) {
229-
act, err := k.SerializeValue("my-topic", []byte("null"), map[string]string{"valueSchemaType": "Avro"})
292+
t.Run("schema found, useAvroJson=true, serialize value as Avro binary", func(t *testing.T) {
293+
act, err := kAvroJSON.SerializeValue("my-topic", valueAvroJSON, map[string]string{"valueSchemaType": "Avro"})
294+
require.Equal(t, act[:5], recordValueFromAvroJSON[:5])
295+
require.NoError(t, err)
296+
})
230297

298+
t.Run("value published 'null', no error", func(t *testing.T) {
299+
act, err := kJSON.SerializeValue("my-topic", []byte("null"), map[string]string{"valueSchemaType": "Avro"})
231300
require.Nil(t, act)
232301
require.NoError(t, err)
233302
})
234303

235304
t.Run("value published nil, no error", func(t *testing.T) {
236-
act, err := k.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"})
237-
305+
act, err := kJSON.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"})
238306
require.Nil(t, act)
239307
require.NoError(t, err)
240308
})
241309

242310
t.Run("invalid data, return error", func(t *testing.T) {
243311
valJSON, _ := json.Marshal(invValue)
244-
_, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
245-
312+
_, err := kJSON.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
246313
require.Error(t, err, "cannot decode textual record \"cupcake\": cannot decode textual map: cannot determine codec: \"xxx\"")
247314
})
248315
}
249316

250317
func TestSerializeValueCachingEnabled(t *testing.T) {
251318
registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081")
319+
registry.CodecJsonEnabled(true)
252320
schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
253321

254322
k := Kafka{
@@ -278,6 +346,7 @@ func TestLatestSchemaCaching(t *testing.T) {
278346
ctrl := gomock.NewController(t)
279347
defer ctrl.Finish()
280348
registry := srclient.CreateMockSchemaRegistryClient("http://locahost:8081")
349+
registry.CodecJsonEnabled(true)
281350
m := mock_srclient.NewMockISchemaRegistryClient(ctrl)
282351
schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
283352

@@ -339,9 +408,7 @@ func TestLatestSchemaCaching(t *testing.T) {
339408
}
340409

341410
m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2)
342-
343411
valJSON, _ := json.Marshal(testValue1)
344-
345412
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
346413

347414
assertValueSerialized(t, act, valJSON, schema)

0 commit comments

Comments
 (0)