Skip to content

Commit 6da01c8

Browse files
authored
Merge pull request #762 from marle3003/develop
Develop
2 parents 08a6214 + 8fb2348 commit 6da01c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1961
-620
lines changed

api/handler.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package api
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"fmt"
6-
log "github.com/sirupsen/logrus"
77
"io/fs"
88
"mokapi/config/static"
99
"mokapi/runtime"
@@ -15,6 +15,8 @@ import (
1515
"slices"
1616
"strconv"
1717
"strings"
18+
19+
log "github.com/sirupsen/logrus"
1820
)
1921

2022
type handler struct {
@@ -220,12 +222,18 @@ func (h *handler) getInfo(w http.ResponseWriter, _ *http.Request) {
220222
writeJsonBody(w, i)
221223
}
222224

223-
func writeJsonBody(w http.ResponseWriter, i interface{}) {
224-
b, err := json.Marshal(i)
225+
func writeJsonBody(w http.ResponseWriter, v interface{}) {
226+
var buf bytes.Buffer
227+
enc := json.NewEncoder(&buf)
228+
enc.SetEscapeHTML(false)
229+
err := enc.Encode(v) // includes newline
230+
225231
if err != nil {
226232
writeError(w, err, http.StatusInternalServerError)
227233
return
228234
}
235+
236+
b := bytes.TrimSuffix(buf.Bytes(), []byte("\n"))
229237
_, err = w.Write(b)
230238
if err != nil {
231239
log.Errorf("write response body failed: %v", err)

api/handler_config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func TestHandler_Config(t *testing.T) {
248248
test: []try.ResponseCondition{
249249
try.HasStatusCode(http.StatusOK),
250250
try.HasHeader("Content-Type", "application/json"),
251-
try.HasBody(`{"id":"61373430-3061-3131-6663-326332386638","url":"https://git.bar?file=/foo/foo.json\u0026ref=main","provider":"git","time":"2023-12-27T13:01:30Z"}`),
251+
try.HasBody(`{"id":"61373430-3061-3131-6663-326332386638","url":"https://git.bar?file=/foo/foo.json&ref=main","provider":"git","time":"2023-12-27T13:01:30Z"}`),
252252
},
253253
},
254254
{

api/handler_events_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"mokapi/config/static"
8+
"mokapi/providers/asyncapi3/kafka/store"
89
"mokapi/providers/openapi"
910
"mokapi/runtime"
1011
"mokapi/runtime/events"
@@ -208,3 +209,127 @@ func TestHandler_Events(t *testing.T) {
208209
})
209210
}
210211
}
212+
213+
func TestHandler_KafkaEvents(t *testing.T) {
214+
testcases := []struct {
215+
name string
216+
fn func(t *testing.T, h http.Handler, sm *events.StoreManager)
217+
}{
218+
{
219+
name: "empty kafka events",
220+
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
221+
try.Handler(t,
222+
http.MethodGet,
223+
"http://foo.api/api/events?namespace=kafka",
224+
nil,
225+
"",
226+
h,
227+
try.HasStatusCode(200),
228+
try.HasHeader("Content-Type", "application/json"),
229+
try.HasBody(`[]`))
230+
},
231+
},
232+
{
233+
name: "with kafka events",
234+
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
235+
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))
236+
err := sm.Push(&eventstest.Event{Name: "foo"}, events.NewTraits().WithNamespace("kafka"))
237+
event := sm.GetEvents(events.NewTraits())[0]
238+
require.NoError(t, err)
239+
try.Handler(t,
240+
http.MethodGet,
241+
"http://foo.api/api/events?namespace=kafka",
242+
nil,
243+
"",
244+
h,
245+
try.HasStatusCode(200),
246+
try.HasHeader("Content-Type", "application/json"),
247+
try.HasBody(fmt.Sprintf(`[{"id":"%v","traits":{"namespace":"kafka"},"data":{"Name":"foo","api":""},"time":"%v"}]`,
248+
event.Id,
249+
event.Time.Format(time.RFC3339Nano))))
250+
},
251+
},
252+
{
253+
name: "get specific event",
254+
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
255+
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))
256+
err := sm.Push(&eventstest.Event{Name: "foo"}, events.NewTraits().WithNamespace("kafka"))
257+
event := sm.GetEvents(events.NewTraits())[0]
258+
require.NoError(t, err)
259+
try.Handler(t,
260+
http.MethodGet,
261+
"http://foo.api/api/events/"+event.Id,
262+
nil,
263+
"",
264+
h,
265+
try.HasStatusCode(200),
266+
try.HasHeader("Content-Type", "application/json"),
267+
try.HasBody(fmt.Sprintf(`{"id":"%v","traits":{"namespace":"kafka"},"data":{"Name":"foo","api":""},"time":"%v"}`,
268+
event.Id,
269+
event.Time.Format(time.RFC3339Nano))))
270+
},
271+
},
272+
{
273+
name: "get kafka with producerId",
274+
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
275+
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))
276+
277+
err := sm.Push(&store.KafkaLog{
278+
Offset: 123,
279+
Key: store.LogValue{},
280+
Message: store.LogValue{},
281+
MessageId: "foo-1",
282+
Partition: 1,
283+
ProducerId: 3,
284+
ProducerEpoch: 1,
285+
SequenceNumber: 2,
286+
}, events.NewTraits().WithNamespace("kafka"))
287+
require.NoError(t, err)
288+
289+
try.Handler(t,
290+
http.MethodGet,
291+
"http://foo.api/api/events?namespace=kafka",
292+
nil,
293+
"",
294+
h,
295+
try.HasStatusCode(200),
296+
try.AssertBody(func(t *testing.T, body string) {
297+
var m []map[string]any
298+
require.NoError(t, json.Unmarshal([]byte(body), &m))
299+
require.Equal(t, map[string]any{
300+
"api": "",
301+
"deleted": false,
302+
"headers": interface{}(nil),
303+
"key": map[string]interface{}{
304+
"binary": interface{}(nil),
305+
"value": "",
306+
},
307+
"message": map[string]interface{}{
308+
"binary": interface{}(nil),
309+
"value": "",
310+
},
311+
"messageId": "foo-1",
312+
"offset": float64(123),
313+
"partition": float64(1),
314+
"producerEpoch": float64(1),
315+
"producerId": float64(3),
316+
"schemaId": float64(0),
317+
"sequenceNumber": float64(2),
318+
},
319+
m[0]["data"])
320+
}))
321+
},
322+
},
323+
}
324+
325+
for _, tc := range testcases {
326+
tc := tc
327+
t.Run(tc.name, func(t *testing.T) {
328+
cfg := &static.Config{}
329+
app := runtime.New(cfg)
330+
331+
h := New(app, static.Api{})
332+
tc.fn(t, h, app.Events)
333+
})
334+
}
335+
}

api/handler_kafka.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ type produceRequest struct {
120120
Records []store.Record `json:"records"`
121121
}
122122

123-
type produceResponse struct {
124-
Offsets []recordResult `json:"offsets"`
123+
type ProduceResponse struct {
124+
Offsets []RecordResult `json:"offsets"`
125125
}
126126

127-
type recordResult struct {
127+
type RecordResult struct {
128128
Partition int
129129
Offset int64
130130
Error string
@@ -222,9 +222,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
222222
writeError(w, err, http.StatusBadRequest)
223223
}
224224
}
225-
res := produceResponse{}
225+
res := ProduceResponse{}
226226
for _, rec := range result {
227-
res.Offsets = append(res.Offsets, recordResult{
227+
res.Offsets = append(res.Offsets, RecordResult{
228228
Partition: rec.Partition,
229229
Offset: rec.Offset,
230230
Error: rec.Error,
@@ -247,7 +247,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
247247
w.WriteHeader(http.StatusNotFound)
248248
} else {
249249
w.Header().Set("Content-Type", "application/json")
250-
writeJsonBody(w, getPartitions(k, t))
250+
writeJsonBody(w, getPartitions(t))
251251
}
252252
return
253253
// /api/services/kafka/{cluster}/topics/{topic}/partitions/{id}
@@ -276,7 +276,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
276276
}
277277
if r.Method == "GET" {
278278
w.Header().Set("Content-Type", "application/json")
279-
writeJsonBody(w, newPartition(k.Store, p))
279+
writeJsonBody(w, newPartition(p))
280280
} else {
281281
records, err := getProduceRecords(r)
282282
if err != nil {
@@ -296,9 +296,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
296296
writeError(w, err, http.StatusBadRequest)
297297
}
298298
}
299-
res := produceResponse{}
299+
res := ProduceResponse{}
300300
for _, rec := range result {
301-
res.Offsets = append(res.Offsets, recordResult{
301+
res.Offsets = append(res.Offsets, RecordResult{
302302
Partition: rec.Partition,
303303
Offset: rec.Offset,
304304
Error: rec.Error,
@@ -458,7 +458,7 @@ func getTopics(info *runtime.KafkaInfo) []topic {
458458
addr = name
459459
}
460460
t := info.Store.Topic(addr)
461-
topics = append(topics, newTopic(info.Store, t, ch.Value, info.Config))
461+
topics = append(topics, newTopic(t, ch.Value, info.Config))
462462
}
463463
sort.Slice(topics, func(i, j int) bool {
464464
return strings.Compare(topics[i].Name, topics[j].Name) < 0
@@ -477,18 +477,18 @@ func getTopic(info *runtime.KafkaInfo, name string) *topic {
477477
}
478478
if addr == name {
479479
t := info.Store.Topic(addr)
480-
r := newTopic(info.Store, t, ch.Value, info.Config)
480+
r := newTopic(t, ch.Value, info.Config)
481481
return &r
482482
}
483483

484484
}
485485
return nil
486486
}
487487

488-
func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asyncapi3.Config) topic {
488+
func newTopic(t *store.Topic, ch *asyncapi3.Channel, cfg *asyncapi3.Config) topic {
489489
var partitions []partition
490490
for _, p := range t.Partitions {
491-
partitions = append(partitions, newPartition(s, p))
491+
partitions = append(partitions, newPartition(p))
492492
}
493493
sort.Slice(partitions, func(i, j int) bool {
494494
return partitions[i].Id < partitions[j].Id
@@ -549,10 +549,10 @@ func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asynca
549549
return result
550550
}
551551

552-
func getPartitions(info *runtime.KafkaInfo, t *store.Topic) []partition {
552+
func getPartitions(t *store.Topic) []partition {
553553
var partitions []partition
554554
for _, p := range t.Partitions {
555-
partitions = append(partitions, newPartition(info.Store, p))
555+
partitions = append(partitions, newPartition(p))
556556
}
557557
sort.Slice(partitions, func(i, j int) bool {
558558
return partitions[i].Id < partitions[j].Id
@@ -594,13 +594,12 @@ func newGroup(g *store.Group) group {
594594
return grp
595595
}
596596

597-
func newPartition(s *store.Store, p *store.Partition) partition {
598-
leader, _ := s.Broker(p.Leader)
597+
func newPartition(p *store.Partition) partition {
599598
return partition{
600599
Id: p.Index,
601600
StartOffset: p.StartOffset(),
602601
Offset: p.Offset(),
603-
Leader: newBroker(leader),
602+
Leader: newBroker(p.Leader),
604603
Segments: len(p.Segments),
605604
}
606605
}
@@ -649,7 +648,7 @@ func getProduceRecords(r *http.Request) ([]store.Record, error) {
649648
return pr.Records, nil
650649
}
651650

652-
func (r *recordResult) MarshalJSON() ([]byte, error) {
651+
func (r *RecordResult) MarshalJSON() ([]byte, error) {
653652
aux := &struct {
654653
Partition int `json:"partition"`
655654
Offset int64 `json:"offset"`

0 commit comments

Comments
 (0)