Skip to content

Commit 381eac9

Browse files
authored
Merge pull request #517 from marle3003/develop
v0.14.4
2 parents 8918c6d + f272ed1 commit 381eac9

File tree

14 files changed

+185
-49
lines changed

14 files changed

+185
-49
lines changed

acceptance/petstore_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func (suite *PetStoreSuite) TestApi() {
6363
},
6464

6565
"topics": []interface{}{map[string]interface{}{
66+
"bindings": map[string]interface{}{"partitions": float64(2), "segmentMs": float64(30000), "valueSchemaValidation": true},
6667
"description": "",
6768
"messages": map[string]interface{}{
6869
"#/components/messages/order": map[string]interface{}{

api/handler_kafka.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type topic struct {
7070
Description string `json:"description"`
7171
Partitions []partition `json:"partitions"`
7272
Messages map[string]messageConfig `json:"messages,omitempty"`
73+
Bindings bindings `json:"bindings,omitempty"`
7374
}
7475

7576
type partition struct {
@@ -96,6 +97,16 @@ type messageConfig struct {
9697
ContentType string `json:"contentType"`
9798
}
9899

100+
type bindings struct {
101+
Partitions int `json:"partitions,omitempty"`
102+
RetentionBytes int64 `json:"retentionBytes,omitempty"`
103+
RetentionMs int64 `json:"retentionMs,omitempty"`
104+
SegmentBytes int64 `json:"segmentBytes,omitempty"`
105+
SegmentMs int64 `json:"segmentMs,omitempty"`
106+
ValueSchemaValidation bool `json:"valueSchemaValidation,omitempty"`
107+
KeySchemaValidation bool `json:"keySchemaValidation,omitempty"`
108+
}
109+
99110
func getKafkaServices(store *runtime.KafkaStore, m *monitor.Monitor) []interface{} {
100111
list := store.List()
101112
result := make([]interface{}, 0, len(list))
@@ -188,7 +199,7 @@ func getKafka(info *runtime.KafkaInfo) kafka {
188199
addr = name
189200
}
190201
t := info.Store.Topic(addr)
191-
k.Topics = append(k.Topics, newTopic(info.Store, t, ch.Value, info.DefaultContentType))
202+
k.Topics = append(k.Topics, newTopic(info.Store, t, ch.Value, info.Config))
192203
}
193204
sort.Slice(k.Topics, func(i, j int) bool {
194205
return strings.Compare(k.Topics[i].Name, k.Topics[j].Name) < 0
@@ -206,7 +217,7 @@ func getKafka(info *runtime.KafkaInfo) kafka {
206217
return k
207218
}
208219

209-
func newTopic(s *store.Store, t *store.Topic, config *asyncapi3.Channel, defaultContentType string) topic {
220+
func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asyncapi3.Config) topic {
210221
var partitions []partition
211222
for _, p := range t.Partitions {
212223
partitions = append(partitions, newPartition(s, p))
@@ -217,11 +228,20 @@ func newTopic(s *store.Store, t *store.Topic, config *asyncapi3.Channel, default
217228

218229
result := topic{
219230
Name: t.Name,
220-
Description: config.Description,
231+
Description: ch.Description,
221232
Partitions: partitions,
233+
Bindings: bindings{
234+
Partitions: t.Config.Bindings.Kafka.Partitions,
235+
RetentionBytes: t.Config.Bindings.Kafka.RetentionBytes,
236+
RetentionMs: t.Config.Bindings.Kafka.RetentionMs,
237+
SegmentBytes: t.Config.Bindings.Kafka.SegmentBytes,
238+
SegmentMs: t.Config.Bindings.Kafka.SegmentMs,
239+
ValueSchemaValidation: t.Config.Bindings.Kafka.ValueSchemaValidation,
240+
KeySchemaValidation: t.Config.Bindings.Kafka.KeySchemaValidation,
241+
},
222242
}
223243

224-
for messageId, ref := range config.Messages {
244+
for messageId, ref := range ch.Messages {
225245
if ref.Value == nil {
226246
continue
227247
}
@@ -243,7 +263,7 @@ func newTopic(s *store.Store, t *store.Topic, config *asyncapi3.Channel, default
243263
}
244264

245265
if m.ContentType == "" {
246-
m.ContentType = defaultContentType
266+
m.ContentType = cfg.DefaultContentType
247267
}
248268

249269
if msg.Bindings.Kafka.Key != nil {

api/handler_kafka_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestHandler_Kafka(t *testing.T) {
145145
}))
146146
},
147147
requestUrl: "http://foo.api/api/services/kafka/foo",
148-
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"schema":{"type":"string"}},"contentType":"application/json"}}}]}`,
148+
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`,
149149
},
150150
{
151151
name: "get specific with topic and multi schema format",
@@ -168,7 +168,7 @@ func TestHandler_Kafka(t *testing.T) {
168168
}))
169169
},
170170
requestUrl: "http://foo.api/api/services/kafka/foo",
171-
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}}}]}`,
171+
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`,
172172
},
173173
{
174174
name: "get specific with group",
@@ -268,7 +268,7 @@ func TestHandler_Kafka(t *testing.T) {
268268
return app
269269
},
270270
requestUrl: "http://foo.api/api/services/kafka/foo",
271-
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}}}]}`,
271+
responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`,
272272
},
273273
}
274274

engine/kafka_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ func TestKafkaClient_Produce(t *testing.T) {
8282
require.NotNil(t, b)
8383
require.Equal(t, "XidZuoWq ", kafka.BytesToString(b.Records[0].Key))
8484
require.Equal(t, "\"\"", kafka.BytesToString(b.Records[0].Value))
85+
86+
require.Equal(t, float64(1), app.Monitor.Kafka.Messages.Sum())
8587
},
8688
},
8789
{
@@ -110,6 +112,34 @@ func TestKafkaClient_Produce(t *testing.T) {
110112
require.Equal(t, []byte("1.0"), b.Records[0].Headers[0].Value)
111113
},
112114
},
115+
{
116+
name: "multiple messages",
117+
test: func(t *testing.T, app *runtime.App, s *store.Store, engine *engine.Engine) {
118+
err := engine.AddScript(newScript("test.js", `
119+
import { produce } from 'mokapi/kafka'
120+
export default function() {
121+
const result = produce({
122+
topic: 'foo',
123+
messages: [
124+
{ key: 'key1', data: 'foo'},
125+
{ key: 'key2', data: 'bar'}
126+
],
127+
})
128+
console.log(result)
129+
}
130+
`))
131+
require.NoError(t, err)
132+
b, errCode := s.Topic("foo").Partition(0).Read(0, 1000)
133+
require.Equal(t, kafka.None, errCode)
134+
require.NotNil(t, b)
135+
require.Equal(t, "key1", kafka.BytesToString(b.Records[0].Key))
136+
require.Equal(t, `"foo"`, kafka.BytesToString(b.Records[0].Value))
137+
require.Equal(t, "key2", kafka.BytesToString(b.Records[1].Key))
138+
require.Equal(t, `"bar"`, kafka.BytesToString(b.Records[1].Value))
139+
140+
require.Equal(t, float64(2), app.Monitor.Kafka.Messages.Sum())
141+
},
142+
},
113143
{
114144
name: "to partition 5",
115145
test: func(t *testing.T, app *runtime.App, s *store.Store, engine *engine.Engine) {

examples/mokapi/common.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ components:
8989
type: string
9090
binary:
9191
type: string
92+
deleted:
93+
type: boolean
9294
KafkaValue:
9395
type: object
9496
properties:

examples/mokapi/kafka.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,15 @@ export let clusters = [
133133
payload: { schema: Product },
134134
contentType: 'application/json'
135135
}
136+
},
137+
bindings: {
138+
partitions: 3,
139+
retentionBytes: 1500,
140+
retentionMs: 30000,
141+
segmentBytes: 30000,
142+
segmentMs: 1500,
143+
valueSchemaValidation: true,
144+
keySchemaValidation: false
136145
}
137146
},
138147
{
@@ -310,7 +319,8 @@ export let events = [
310319
})
311320
},
312321
partition: 1,
313-
messageId: 'shopOrder'
322+
messageId: 'shopOrder',
323+
deleted: true
314324
}
315325
},
316326
{

examples/mokapi/kafka.yml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ components:
6565
type: object
6666
additionalProperties:
6767
$ref: '#/components/schemas/MessageConfig'
68+
bindings:
69+
$ref: '#/components/schemas/TopicBindings'
6870
Partition:
6971
type: object
7072
properties:
@@ -143,4 +145,19 @@ components:
143145
header:
144146
$ref: 'schema.yml#/components/schemas/Schema'
145147
contentType:
146-
type: string
148+
type: string
149+
TopicBindings:
150+
partitions:
151+
type: integer
152+
retentionBytes:
153+
type: integer
154+
retentionMs:
155+
type: integer
156+
segmentBytes:
157+
type: integer
158+
segmentMs:
159+
type: integer
160+
valueSchemaValidation:
161+
type: boolean
162+
keySchemaValidation:
163+
type: boolean

providers/asyncapi3/kafka/store/log.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type KafkaLog struct {
1414
MessageId string `json:"messageId"`
1515
Partition int `json:"partition"`
1616
Headers map[string]LogValue `json:"headers"`
17+
Deleted bool `json:"deleted"`
1718
}
1819

1920
type LogValue struct {

providers/asyncapi3/kafka/store/partition.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,18 @@ type Partition struct {
3232
type Segment struct {
3333
Head int64
3434
Tail int64
35-
Log []*kafka.Record
35+
Log []*record
3636
Size int
3737
Opened time.Time
3838
Closed time.Time
3939
LastWritten time.Time
4040
}
4141

42+
type record struct {
43+
Data *kafka.Record
44+
Log *KafkaLog
45+
}
46+
4247
type WriteOptions func(args *WriteArgs)
4348

4449
type WriteArgs struct {
@@ -154,7 +159,7 @@ func (p *Partition) Write(batch kafka.RecordBatch) (baseOffset int64, records []
154159
segment = p.addSegment()
155160
}
156161

157-
segment.Log = append(segment.Log, r)
162+
segment.Log = append(segment.Log, &record{Data: r, Log: result})
158163
segment.Tail++
159164
segment.LastWritten = now
160165
segment.Size += r.Size(baseOffset, baseTime)
@@ -255,7 +260,7 @@ func newSegment(offset int64) *Segment {
255260
return &Segment{
256261
Head: offset,
257262
Tail: offset,
258-
Log: make([]*kafka.Record, 0),
263+
Log: make([]*record, 0),
259264
Opened: time.Now(),
260265
}
261266
}
@@ -266,13 +271,14 @@ func (s *Segment) contains(offset int64) bool {
266271

267272
func (s *Segment) record(offset int64) *kafka.Record {
268273
index := offset - s.Head
269-
return s.Log[index]
274+
return s.Log[index].Data
270275
}
271276

272277
func (s *Segment) delete() {
273278
for _, r := range s.Log {
274-
log.Debugf("delete record: %v", r.Offset)
275-
r.Key.Close()
276-
r.Value.Close()
279+
log.Debugf("delete record: %v", r.Data.Offset)
280+
r.Data.Key.Close()
281+
r.Data.Value.Close()
282+
r.Log.Deleted = true
277283
}
278284
}

runtime/runtime_kafka.go

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,22 @@ type KafkaStore struct {
2626
type KafkaInfo struct {
2727
*asyncapi3.Config
2828
*store.Store
29-
configs map[string]*dynamic.Config
30-
seenTopics map[string]bool
29+
configs map[string]*dynamic.Config
30+
seenTopics map[string]bool
31+
updateEventAndMetrics func(k *KafkaInfo)
3132
}
3233

3334
type KafkaHandler struct {
3435
kafka *monitor.Kafka
3536
next kafka.Handler
3637
}
3738

38-
func NewKafkaInfo(c *dynamic.Config, store *store.Store) *KafkaInfo {
39+
func NewKafkaInfo(c *dynamic.Config, store *store.Store, updateEventAndMetrics func(info *KafkaInfo)) *KafkaInfo {
3940
hc := &KafkaInfo{
40-
configs: map[string]*dynamic.Config{},
41-
Store: store,
42-
seenTopics: map[string]bool{},
41+
configs: map[string]*dynamic.Config{},
42+
Store: store,
43+
seenTopics: map[string]bool{},
44+
updateEventAndMetrics: updateEventAndMetrics,
4345
}
4446
hc.AddConfig(c)
4547
return hc
@@ -88,32 +90,15 @@ func (s *KafkaStore) Add(c *dynamic.Config, emitter common.EventEmitter) (*Kafka
8890
}
8991

9092
if !ok {
91-
ki = NewKafkaInfo(c, store.New(cfg, emitter))
92-
s.infos[cfg.Info.Name] = ki
93-
9493
events.ResetStores(events.NewTraits().WithNamespace("kafka").WithName(cfg.Info.Name))
9594
events.SetStore(int(eventStore.Size), events.NewTraits().WithNamespace("kafka").WithName(cfg.Info.Name))
95+
96+
ki = NewKafkaInfo(c, store.New(cfg, emitter), s.updateEventStore)
97+
s.infos[cfg.Info.Name] = ki
9698
} else {
9799
ki.AddConfig(c)
98100
}
99101

100-
for topicName, topic := range cfg.Channels {
101-
if topic.Value == nil {
102-
continue
103-
}
104-
if topic.Value.Address != "" {
105-
topicName = topic.Value.Address
106-
}
107-
if _, ok := ki.seenTopics[topicName]; ok {
108-
continue
109-
}
110-
s.monitor.Kafka.Messages.WithLabel(cfg.Info.Name, topicName)
111-
s.monitor.Kafka.LastMessage.WithLabel(cfg.Info.Name, topicName)
112-
traits := events.NewTraits().WithNamespace("kafka").WithName(cfg.Info.Name).With("topic", topicName)
113-
events.SetStore(int(eventStore.Size), traits)
114-
ki.seenTopics[topicName] = true
115-
}
116-
117102
return ki, nil
118103
}
119104

@@ -189,6 +174,7 @@ func (c *KafkaInfo) update() {
189174
}
190175

191176
c.Config = cfg
177+
c.updateEventAndMetrics(c)
192178
c.Store.Update(cfg)
193179
}
194180

@@ -238,3 +224,27 @@ func getKafkaConfig(c *dynamic.Config) (*asyncapi3.Config, error) {
238224
return old.Convert()
239225
}
240226
}
227+
228+
func (c *KafkaStore) updateEventStore(k *KafkaInfo) {
229+
eventStore, hasStoreConfig := c.cfg.Event.Store[k.Config.Info.Name]
230+
if !hasStoreConfig {
231+
eventStore = c.cfg.Event.Store["default"]
232+
}
233+
234+
for topicName, topic := range k.Config.Channels {
235+
if topic.Value == nil {
236+
continue
237+
}
238+
if topic.Value.Address != "" {
239+
topicName = topic.Value.Address
240+
}
241+
if _, ok := k.seenTopics[topicName]; ok {
242+
continue
243+
}
244+
c.monitor.Kafka.Messages.WithLabel(k.Config.Info.Name, topicName)
245+
c.monitor.Kafka.LastMessage.WithLabel(k.Config.Info.Name, topicName)
246+
traits := events.NewTraits().WithNamespace("kafka").WithName(k.Config.Info.Name).With("topic", topicName)
247+
events.SetStore(int(eventStore.Size), traits)
248+
k.seenTopics[topicName] = true
249+
}
250+
}

0 commit comments

Comments
 (0)