Skip to content

Commit fa356ea

Browse files
authored
Merge pull request #592 from marle3003/develop
Develop
2 parents 7526d89 + ef96d19 commit fa356ea

File tree

6 files changed

+289
-199
lines changed

6 files changed

+289
-199
lines changed

media/contenttype_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ func TestParseContentType(t *testing.T) {
6868
require.False(t, ct.IsRange())
6969
},
7070
},
71+
{
72+
s: `application/xml;boundary="foo"`,
73+
validate: func(t *testing.T, ct ContentType) {
74+
require.Equal(t, "application", ct.Type)
75+
require.Equal(t, "xml", ct.Subtype)
76+
require.Equal(t, "foo", ct.Parameters["boundary"])
77+
require.False(t, ct.IsRange())
78+
},
79+
},
7180
}
7281

7382
for _, testcase := range testcases {

media/contentype.go

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package media
33
import (
44
"fmt"
55
log "github.com/sirupsen/logrus"
6+
"mime"
67
"reflect"
78
"strconv"
89
"strings"
@@ -22,32 +23,19 @@ type ContentType struct {
2223

2324
func ParseContentType(s string) ContentType {
2425
c := ContentType{raw: s, Parameters: make(map[string]string), Q: 1.0}
25-
a := strings.Split(s, ";")
26-
m := strings.Split(a[0], "/")
27-
c.Type = strings.ToLower(strings.TrimSpace(m[0]))
28-
if len(m) > 1 {
29-
c.Subtype = strings.ToLower(strings.TrimSpace(m[1]))
30-
}
31-
for _, p := range a[1:] {
32-
kv := strings.Split(p, "=")
33-
switch kv[0] {
34-
case "q":
35-
if len(kv) > 1 {
36-
var err error
37-
if c.Q, err = strconv.ParseFloat(kv[1], 64); err != nil {
38-
log.Debugf("invalid q parameter in %v", s)
39-
c.Q = 1.0
40-
}
41-
}
42-
default:
43-
if len(kv) > 1 {
44-
c.Parameters[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
45-
} else {
46-
c.Parameters[kv[0]] = ""
47-
}
26+
c.Type, c.Parameters, _ = mime.ParseMediaType(s)
27+
if q, ok := c.Parameters["q"]; ok {
28+
var err error
29+
if c.Q, err = strconv.ParseFloat(q, 64); err != nil {
30+
log.Debugf("invalid q parameter in %v", s)
31+
c.Q = 1.0
4832
}
4933
}
50-
34+
t := strings.Split(c.Type, "/")
35+
if len(t) > 1 {
36+
c.Type = strings.TrimSpace(t[0])
37+
c.Subtype = strings.TrimSpace(t[1])
38+
}
5139
return c
5240
}
5341

providers/asyncapi3/kafka/store/validation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (mv *messageValidator) Validate(record *kafka.Record) (*KafkaLog, error) {
170170
return r, err
171171
} else {
172172
b, _ := json.Marshal(k)
173-
r.Message.Value = string(b)
173+
r.Key.Value = string(b)
174174
}
175175
} else {
176176
r.Key.Binary = kafka.Read(record.Key)

providers/asyncapi3/kafka/store/validation_test.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ func TestValidation(t *testing.T) {
7676
asyncapi3test.WithKey(schematest.New("integer")),
7777
),
7878
asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{
79-
Partitions: 1,
8079
KeySchemaValidation: false,
80+
Partitions: 1,
8181
}),
8282
),
8383
),
@@ -92,6 +92,44 @@ func TestValidation(t *testing.T) {
9292
})
9393
require.NoError(t, err)
9494
require.Len(t, batch, 0)
95+
96+
e := events.GetEvents(events.NewTraits())
97+
require.Len(t, e, 1)
98+
require.Equal(t, []byte("foo"), e[0].Data.(*store.KafkaLog).Key.Binary)
99+
},
100+
},
101+
{
102+
name: "validating value and key",
103+
cfg: asyncapi3test.NewConfig(
104+
asyncapi3test.WithChannel("foo",
105+
asyncapi3test.WithMessage("foo",
106+
asyncapi3test.WithPayload(schematest.New("string")),
107+
asyncapi3test.WithKey(schematest.New("string")),
108+
),
109+
asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{
110+
KeySchemaValidation: true,
111+
ValueSchemaValidation: true,
112+
Partitions: 1,
113+
}),
114+
),
115+
),
116+
test: func(t *testing.T, s *store.Store) {
117+
p := s.Topic("foo").Partition(0)
118+
_, batch, err := p.Write(kafka.RecordBatch{
119+
Records: []*kafka.Record{
120+
{
121+
Key: kafka.NewBytes([]byte("12")),
122+
Value: kafka.NewBytes([]byte("foo")),
123+
},
124+
},
125+
})
126+
require.NoError(t, err)
127+
require.Len(t, batch, 0)
128+
129+
e := events.GetEvents(events.NewTraits())
130+
require.Len(t, e, 1)
131+
require.Equal(t, `"12"`, e[0].Data.(*store.KafkaLog).Key.Value)
132+
require.Equal(t, `"foo"`, e[0].Data.(*store.KafkaLog).Message.Value)
95133
},
96134
},
97135
{

0 commit comments

Comments
 (0)