Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a665713
Bump github.com/go-git/go-git/v5 from 5.16.3 to 5.16.4
dependabot[bot] Nov 28, 2025
422ff26
Bump @ssthouse/vue3-tree-chart from 0.2.6 to 0.3.0 in /webui
dependabot[bot] Nov 28, 2025
4dac90d
Merge pull request #756 from marle3003/dependabot/go_modules/develop/…
github-actions[bot] Nov 28, 2025
fed060d
Merge branch 'develop' into dependabot/npm_and_yarn/webui/develop/sst…
marle3003 Nov 28, 2025
8ed84ef
Merge pull request #760 from marle3003/dependabot/npm_and_yarn/webui/…
github-actions[bot] Nov 28, 2025
26dc55b
Bump @playwright/test from 1.56.1 to 1.57.0 in /webui
dependabot[bot] Nov 28, 2025
2a75b51
Merge pull request #758 from marle3003/dependabot/npm_and_yarn/webui/…
github-actions[bot] Nov 28, 2025
e06c333
Bump eslint-plugin-vue from 10.5.1 to 10.6.2 in /webui
dependabot[bot] Nov 28, 2025
63ed8d4
Merge pull request #755 from marle3003/dependabot/npm_and_yarn/webui/…
github-actions[bot] Nov 28, 2025
a986000
Bump vue from 3.5.24 to 3.5.25 in /webui
dependabot[bot] Nov 28, 2025
817c75d
Merge pull request #759 from marle3003/dependabot/npm_and_yarn/webui/…
github-actions[bot] Nov 28, 2025
ad85ecb
Bump vue-tsc from 3.1.4 to 3.1.5 in /webui
dependabot[bot] Nov 28, 2025
a780f9d
Merge pull request #757 from marle3003/dependabot/npm_and_yarn/webui/…
github-actions[bot] Nov 28, 2025
f0fde7e
add support for idempotent Kafka producer
marle3003 Nov 29, 2025
2aa380a
Merge remote-tracking branch 'origin/develop' into develop
marle3003 Nov 29, 2025
bfb3e07
add Kafka record producer info to dashboard
marle3003 Nov 29, 2025
08e9fda
fix time zone issue in tests
marle3003 Nov 29, 2025
e7aad4b
fix time zone issue in tests
marle3003 Nov 29, 2025
7e01451
fix AsyncAPI using references to schemas in components
marle3003 Dec 1, 2025
066014a
improve Kafka client with supporting different content-types
marle3003 Dec 1, 2025
1e5058b
fix XML parsing
marle3003 Dec 1, 2025
e8a1ad1
fix test
marle3003 Dec 1, 2025
0af659c
improve Kafka client to read data
marle3003 Dec 2, 2025
6f8043e
fix not executed JavaScript file when from GIT provider
marle3003 Dec 2, 2025
8fb2348
Merge branch 'main' into develop
marle3003 Dec 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions api/handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package api

import (
"bytes"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"io/fs"
"mokapi/config/static"
"mokapi/runtime"
Expand All @@ -15,6 +15,8 @@ import (
"slices"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
)

type handler struct {
Expand Down Expand Up @@ -220,12 +222,18 @@ func (h *handler) getInfo(w http.ResponseWriter, _ *http.Request) {
writeJsonBody(w, i)
}

func writeJsonBody(w http.ResponseWriter, i interface{}) {
b, err := json.Marshal(i)
func writeJsonBody(w http.ResponseWriter, v interface{}) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
err := enc.Encode(v) // includes newline

if err != nil {
writeError(w, err, http.StatusInternalServerError)
return
}

b := bytes.TrimSuffix(buf.Bytes(), []byte("\n"))
_, err = w.Write(b)
if err != nil {
log.Errorf("write response body failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion api/handler_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestHandler_Config(t *testing.T) {
test: []try.ResponseCondition{
try.HasStatusCode(http.StatusOK),
try.HasHeader("Content-Type", "application/json"),
try.HasBody(`{"id":"61373430-3061-3131-6663-326332386638","url":"https://git.bar?file=/foo/foo.json\u0026ref=main","provider":"git","time":"2023-12-27T13:01:30Z"}`),
try.HasBody(`{"id":"61373430-3061-3131-6663-326332386638","url":"https://git.bar?file=/foo/foo.json&ref=main","provider":"git","time":"2023-12-27T13:01:30Z"}`),
},
},
{
Expand Down
125 changes: 125 additions & 0 deletions api/handler_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"mokapi/config/static"
"mokapi/providers/asyncapi3/kafka/store"
"mokapi/providers/openapi"
"mokapi/runtime"
"mokapi/runtime/events"
Expand Down Expand Up @@ -208,3 +209,127 @@ func TestHandler_Events(t *testing.T) {
})
}
}

func TestHandler_KafkaEvents(t *testing.T) {
testcases := []struct {
name string
fn func(t *testing.T, h http.Handler, sm *events.StoreManager)
}{
{
name: "empty kafka events",
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
try.Handler(t,
http.MethodGet,
"http://foo.api/api/events?namespace=kafka",
nil,
"",
h,
try.HasStatusCode(200),
try.HasHeader("Content-Type", "application/json"),
try.HasBody(`[]`))
},
},
{
name: "with kafka events",
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))
err := sm.Push(&eventstest.Event{Name: "foo"}, events.NewTraits().WithNamespace("kafka"))
event := sm.GetEvents(events.NewTraits())[0]
require.NoError(t, err)
try.Handler(t,
http.MethodGet,
"http://foo.api/api/events?namespace=kafka",
nil,
"",
h,
try.HasStatusCode(200),
try.HasHeader("Content-Type", "application/json"),
try.HasBody(fmt.Sprintf(`[{"id":"%v","traits":{"namespace":"kafka"},"data":{"Name":"foo","api":""},"time":"%v"}]`,
event.Id,
event.Time.Format(time.RFC3339Nano))))
},
},
{
name: "get specific event",
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))
err := sm.Push(&eventstest.Event{Name: "foo"}, events.NewTraits().WithNamespace("kafka"))
event := sm.GetEvents(events.NewTraits())[0]
require.NoError(t, err)
try.Handler(t,
http.MethodGet,
"http://foo.api/api/events/"+event.Id,
nil,
"",
h,
try.HasStatusCode(200),
try.HasHeader("Content-Type", "application/json"),
try.HasBody(fmt.Sprintf(`{"id":"%v","traits":{"namespace":"kafka"},"data":{"Name":"foo","api":""},"time":"%v"}`,
event.Id,
event.Time.Format(time.RFC3339Nano))))
},
},
{
name: "get kafka with producerId",
fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) {
sm.SetStore(1, events.NewTraits().WithNamespace("kafka"))

err := sm.Push(&store.KafkaLog{
Offset: 123,
Key: store.LogValue{},
Message: store.LogValue{},
MessageId: "foo-1",
Partition: 1,
ProducerId: 3,
ProducerEpoch: 1,
SequenceNumber: 2,
}, events.NewTraits().WithNamespace("kafka"))
require.NoError(t, err)

try.Handler(t,
http.MethodGet,
"http://foo.api/api/events?namespace=kafka",
nil,
"",
h,
try.HasStatusCode(200),
try.AssertBody(func(t *testing.T, body string) {
var m []map[string]any
require.NoError(t, json.Unmarshal([]byte(body), &m))
require.Equal(t, map[string]any{
"api": "",
"deleted": false,
"headers": interface{}(nil),
"key": map[string]interface{}{
"binary": interface{}(nil),
"value": "",
},
"message": map[string]interface{}{
"binary": interface{}(nil),
"value": "",
},
"messageId": "foo-1",
"offset": float64(123),
"partition": float64(1),
"producerEpoch": float64(1),
"producerId": float64(3),
"schemaId": float64(0),
"sequenceNumber": float64(2),
},
m[0]["data"])
}))
},
},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cfg := &static.Config{}
app := runtime.New(cfg)

h := New(app, static.Api{})
tc.fn(t, h, app.Events)
})
}
}
37 changes: 18 additions & 19 deletions api/handler_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ type produceRequest struct {
Records []store.Record `json:"records"`
}

type produceResponse struct {
Offsets []recordResult `json:"offsets"`
type ProduceResponse struct {
Offsets []RecordResult `json:"offsets"`
}

type recordResult struct {
type RecordResult struct {
Partition int
Offset int64
Error string
Expand Down Expand Up @@ -222,9 +222,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
writeError(w, err, http.StatusBadRequest)
}
}
res := produceResponse{}
res := ProduceResponse{}
for _, rec := range result {
res.Offsets = append(res.Offsets, recordResult{
res.Offsets = append(res.Offsets, RecordResult{
Partition: rec.Partition,
Offset: rec.Offset,
Error: rec.Error,
Expand All @@ -247,7 +247,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
} else {
w.Header().Set("Content-Type", "application/json")
writeJsonBody(w, getPartitions(k, t))
writeJsonBody(w, getPartitions(t))
}
return
// /api/services/kafka/{cluster}/topics/{topic}/partitions/{id}
Expand Down Expand Up @@ -276,7 +276,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
}
if r.Method == "GET" {
w.Header().Set("Content-Type", "application/json")
writeJsonBody(w, newPartition(k.Store, p))
writeJsonBody(w, newPartition(p))
} else {
records, err := getProduceRecords(r)
if err != nil {
Expand All @@ -296,9 +296,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
writeError(w, err, http.StatusBadRequest)
}
}
res := produceResponse{}
res := ProduceResponse{}
for _, rec := range result {
res.Offsets = append(res.Offsets, recordResult{
res.Offsets = append(res.Offsets, RecordResult{
Partition: rec.Partition,
Offset: rec.Offset,
Error: rec.Error,
Expand Down Expand Up @@ -458,7 +458,7 @@ func getTopics(info *runtime.KafkaInfo) []topic {
addr = name
}
t := info.Store.Topic(addr)
topics = append(topics, newTopic(info.Store, t, ch.Value, info.Config))
topics = append(topics, newTopic(t, ch.Value, info.Config))
}
sort.Slice(topics, func(i, j int) bool {
return strings.Compare(topics[i].Name, topics[j].Name) < 0
Expand All @@ -477,18 +477,18 @@ func getTopic(info *runtime.KafkaInfo, name string) *topic {
}
if addr == name {
t := info.Store.Topic(addr)
r := newTopic(info.Store, t, ch.Value, info.Config)
r := newTopic(t, ch.Value, info.Config)
return &r
}

}
return nil
}

func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asyncapi3.Config) topic {
func newTopic(t *store.Topic, ch *asyncapi3.Channel, cfg *asyncapi3.Config) topic {
var partitions []partition
for _, p := range t.Partitions {
partitions = append(partitions, newPartition(s, p))
partitions = append(partitions, newPartition(p))
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Id < partitions[j].Id
Expand Down Expand Up @@ -549,10 +549,10 @@ func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asynca
return result
}

func getPartitions(info *runtime.KafkaInfo, t *store.Topic) []partition {
func getPartitions(t *store.Topic) []partition {
var partitions []partition
for _, p := range t.Partitions {
partitions = append(partitions, newPartition(info.Store, p))
partitions = append(partitions, newPartition(p))
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Id < partitions[j].Id
Expand Down Expand Up @@ -594,13 +594,12 @@ func newGroup(g *store.Group) group {
return grp
}

func newPartition(s *store.Store, p *store.Partition) partition {
leader, _ := s.Broker(p.Leader)
func newPartition(p *store.Partition) partition {
return partition{
Id: p.Index,
StartOffset: p.StartOffset(),
Offset: p.Offset(),
Leader: newBroker(leader),
Leader: newBroker(p.Leader),
Segments: len(p.Segments),
}
}
Expand Down Expand Up @@ -649,7 +648,7 @@ func getProduceRecords(r *http.Request) ([]store.Record, error) {
return pr.Records, nil
}

func (r *recordResult) MarshalJSON() ([]byte, error) {
func (r *RecordResult) MarshalJSON() ([]byte, error) {
aux := &struct {
Partition int `json:"partition"`
Offset int64 `json:"offset"`
Expand Down
Loading
Loading