Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 9b10e2b

Browse files
authored
Merge pull request #21 from inloco/fix/error-check-after-unmarshal
fix(encode_decode): check if unmarshal was successful before setting map
2 parents b8a93dd + 140d449 commit 9b10e2b

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

src/kafka/encode_decode.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import (
1010

1111
"sync"
1212

13-
"github.com/inloco/kafka-elasticsearch-injector/src/models"
14-
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1513
"github.com/Shopify/sarama"
1614
"github.com/inloco/goavro"
15+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
16+
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
1717
)
1818

1919
// DecodeMessageFunc extracts a user-domain request object from an Kafka
@@ -93,12 +93,13 @@ func makeTimestamp(timestamp time.Time) int64 {
9393
func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) {
9494
var jsonValue map[string]interface{}
9595
err := json.Unmarshal(msg.Value, &jsonValue)
96-
jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
9796

9897
if err != nil {
9998
return nil, err
10099
}
101100

101+
jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp)
102+
102103
return &models.Record{
103104
Topic: msg.Topic,
104105
Partition: msg.Partition,

src/kafka/encode_decode_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,17 @@ func TestDecoder_JsonMessageToRecord(t *testing.T) {
3535
assert.Nil(t, err)
3636
assert.Equal(t, val, returnedVal)
3737
}
38+
39+
func TestDecoder_JsonMessageToRecord_MalformedJson(t *testing.T) {
40+
d := &Decoder{CodecCache: sync.Map{}}
41+
jsonBytes := []byte(`{"alo": 60"`)
42+
record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{
43+
Value: jsonBytes,
44+
Topic: "test",
45+
Partition: 1,
46+
Offset: 54,
47+
Timestamp: time.Now(),
48+
})
49+
assert.Nil(t, record)
50+
assert.NotNil(t, err)
51+
}

0 commit comments

Comments
 (0)