Skip to content

Commit b6e622e

Browse files
authored
Merge pull request #572 from marle3003/develop
Develop
2 parents e4246d2 + 44ed962 commit b6e622e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+3495
-161
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ the risk of bugs in production.
3535
- **Multiple Provider support**: File, HTTP, GIT, NPM to gather configurations and scripts.
3636
- **Dashboard** to see what's going on.
3737

38+
## 🧩 OpenAPI & AsyncAPI Support
39+
40+
Mokapi supports mocking REST APIs using **OpenAPI 2.0 / 3.0 / 3.1** and event-driven systems using **AsyncAPI**.
41+
3842
## 🔧 Spin Up Mokapi
3943

4044
Install and start Mokapi using one of the following methods.

acceptance/petstore_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (suite *PetStoreSuite) TestApi() {
6060
"description": "",
6161
"host": "127.0.0.1:19092",
6262
"name": "broker",
63+
"protocol": "kafka",
6364
},
6465
},
6566

api/handler_kafka.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type kafka struct {
3131
type kafkaServer struct {
3232
Name string `json:"name"`
3333
Host string `json:"host"`
34+
Protocol string `json:"protocol"`
3435
Description string `json:"description"`
3536
Tags []kafkaServerTag `json:"tags,omitempty"`
3637
}
@@ -173,6 +174,7 @@ func getKafka(info *runtime.KafkaInfo) kafka {
173174
Name: name,
174175
Host: s.Value.Host,
175176
Description: s.Value.Description,
177+
Protocol: s.Value.Protocol,
176178
}
177179
for _, r := range s.Value.Tags {
178180
if r.Value == nil {

api/handler_kafka_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestHandler_Kafka(t *testing.T) {
103103
}))
104104
},
105105
requestUrl: "http://foo.api/api/services/kafka/foo",
106-
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","description":"bar"}]}`,
106+
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":"bar"}]}`,
107107
},
108108
{
109109
name: "server with tags",
@@ -122,7 +122,7 @@ func TestHandler_Kafka(t *testing.T) {
122122
}))
123123
},
124124
requestUrl: "http://foo.api/api/services/kafka/foo",
125-
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","description":"bar","tags":[{"name":"env:test","description":"This environment is for running internal tests"}]}]}`,
125+
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":"bar","tags":[{"name":"env:test","description":"This environment is for running internal tests"}]}]}`,
126126
},
127127
{
128128
name: "get specific with topic",
@@ -193,7 +193,7 @@ func TestHandler_Kafka(t *testing.T) {
193193
return app
194194
},
195195
requestUrl: "http://foo.api/api/services/kafka/foo",
196-
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","description":""}],"groups":[{"name":"foo","members":null,"coordinator":"foo.bar:9092","leader":"","state":"PreparingRebalance","protocol":"range","topics":null}]}`,
196+
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","members":null,"coordinator":"foo.bar:9092","leader":"","state":"PreparingRebalance","protocol":"range","topics":null}]}`,
197197
},
198198
{
199199
name: "get specific with group containing members",
@@ -249,7 +249,7 @@ func TestHandler_Kafka(t *testing.T) {
249249
return app
250250
},
251251
requestUrl: "http://foo.api/api/services/kafka/foo",
252-
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","description":""}],"groups":[{"name":"foo","members":[{"name":"m1","addr":"192.168.0.100","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:05+07:00","partitions":{"topic":[1,2,5]}},{"name":"m2","addr":"192.168.0.200","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:10+07:00","partitions":{"topic":[3,4,6]}}],"coordinator":"foo.bar:9092","leader":"m1","state":"PreparingRebalance","protocol":"range","topics":null}]}`,
252+
responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","members":[{"name":"m1","addr":"192.168.0.100","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:05+07:00","partitions":{"topic":[1,2,5]}},{"name":"m2","addr":"192.168.0.200","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:10+07:00","partitions":{"topic":[3,4,6]}}],"coordinator":"foo.bar:9092","leader":"m1","state":"PreparingRebalance","protocol":"range","topics":null}]}`,
253253
},
254254
{
255255
name: "get specific with topic and openapi schema",

buffer/page.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package buffer
2+
3+
import (
4+
"io"
5+
"sync"
6+
)
7+
8+
const pageSize = 65536
9+
10+
var (
11+
pagePool = sync.Pool{New: func() interface{} { return new(page) }}
12+
)
13+
14+
type page struct {
15+
offset int // absolute offset
16+
buffer [pageSize]byte
17+
length int // length of this page
18+
refs refCounter
19+
}
20+
21+
type fragment struct {
22+
pages []*page
23+
offset int
24+
cursor int
25+
length int
26+
}
27+
28+
func newPage(offset int) *page {
29+
p := pagePool.Get().(*page)
30+
p.offset = offset
31+
p.refs.inc()
32+
return p
33+
}
34+
35+
func (p *page) unref() {
36+
if p.refs.dec() {
37+
p.offset = 0
38+
p.length = 0
39+
pagePool.Put(p)
40+
}
41+
}
42+
43+
func (p *page) ReadAt(b []byte, offset int) (int, error) {
44+
if offset -= p.offset; offset < 0 || offset > pageSize {
45+
panic("offset out of range")
46+
}
47+
48+
if offset > p.length {
49+
return 0, nil
50+
}
51+
52+
n := copy(b, p.buffer[offset:p.length])
53+
return n, nil
54+
}
55+
56+
func (p *page) Write(b []byte) (n int, err error) {
57+
n = copy(p.buffer[p.length:], b)
58+
p.length += n
59+
return
60+
}
61+
62+
func (p *page) WriteAt(b []byte, offset int) (int, error) {
63+
if offset -= p.offset; offset < 0 || offset > pageSize {
64+
panic("offset out of range")
65+
}
66+
n := copy(p.buffer[offset:], b)
67+
if end := offset + n; end > p.length {
68+
p.length += end
69+
}
70+
return n, nil
71+
}
72+
73+
func (p *page) Size() int {
74+
return p.length
75+
}
76+
77+
func (p *page) slice(begin, end int) []byte {
78+
i, j := begin-p.offset, end-p.offset
79+
if i < 0 {
80+
i = 0
81+
}
82+
if j > len(p.buffer) {
83+
j = len(p.buffer)
84+
}
85+
return p.buffer[i:j]
86+
}
87+
88+
func (f *fragment) Read(b []byte) (int, error) {
89+
if end := f.offset + f.length; f.cursor >= end {
90+
return 0, io.EOF
91+
}
92+
if len(b) > f.length {
93+
b = b[:f.length]
94+
}
95+
read := 0
96+
for _, p := range f.pages {
97+
n, _ := p.ReadAt(b, f.cursor)
98+
b = b[n:]
99+
f.cursor += n
100+
read += n
101+
}
102+
if read == 0 {
103+
return 0, io.EOF
104+
}
105+
return read, nil
106+
}
107+
108+
func (f *fragment) Size() int {
109+
return f.length
110+
}
111+
112+
func (f *fragment) Seek(offset int64, whence int) (int64, error) {
113+
switch whence {
114+
case io.SeekCurrent:
115+
f.cursor += int(offset)
116+
case io.SeekStart:
117+
f.cursor = f.offset + int(offset)
118+
case io.SeekEnd:
119+
f.cursor = f.offset + f.length + int(offset)
120+
}
121+
return int64(f.cursor), nil
122+
}
123+
124+
func (f *fragment) ref() {
125+
for _, p := range f.pages {
126+
p.refs.inc()
127+
}
128+
}
129+
130+
func (f *fragment) unref() {
131+
for _, p := range f.pages {
132+
p.unref()
133+
}
134+
f.pages = nil
135+
f.length = 0
136+
f.cursor = 0
137+
f.offset = 0
138+
}
139+
140+
func (f *fragment) Close() error {
141+
f.unref()
142+
return nil
143+
}

buffer/pagebuffer.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package buffer
2+
3+
import (
4+
"encoding/binary"
5+
"io"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
var (
11+
pageBufferPool = sync.Pool{New: func() interface{} { return new(pageBuffer) }}
12+
)
13+
14+
type refCounter uint32
15+
16+
type pageBuffer struct {
17+
pages []*page
18+
length int
19+
cursor int
20+
refs refCounter
21+
}
22+
23+
type Writer interface {
24+
}
25+
26+
type Buffer interface {
27+
io.Writer
28+
WriteTo(io.Writer) (int, error)
29+
WriteAt([]byte, int)
30+
WriteSizeAt(size int, offset int)
31+
Scan(begin, end int, f func([]byte) bool)
32+
Size() int
33+
Slice(begin, end int) Fragment
34+
Unref()
35+
}
36+
37+
type Fragment interface {
38+
io.ReadCloser
39+
io.Seeker
40+
Size() int
41+
}
42+
43+
func (r *refCounter) inc() {
44+
atomic.AddUint32((*uint32)(r), 1)
45+
}
46+
47+
func (r *refCounter) dec() bool {
48+
i := atomic.AddUint32((*uint32)(r), ^uint32(0))
49+
return i == 0
50+
}
51+
52+
func NewPageBuffer() Buffer {
53+
pb := pageBufferPool.Get().(*pageBuffer)
54+
pb.refs.inc()
55+
return pb
56+
}
57+
58+
func (pb *pageBuffer) Unref() {
59+
if pb.refs.dec() {
60+
for _, p := range pb.pages {
61+
p.unref()
62+
}
63+
pb.length = 0
64+
pb.cursor = 0
65+
pb.pages = nil
66+
pageBufferPool.Put(pb)
67+
}
68+
}
69+
70+
func (pb *pageBuffer) Write(b []byte) (n int, err error) {
71+
n = len(b)
72+
if len(pb.pages) == 0 {
73+
pb.addPage()
74+
}
75+
76+
for len(b) != 0 {
77+
tail := pb.pages[len(pb.pages)-1]
78+
available := pageSize - tail.Size()
79+
80+
if len(b) <= available {
81+
tail.Write(b)
82+
pb.length += len(b)
83+
break
84+
}
85+
86+
tail.Write(b[:available])
87+
b = b[available:]
88+
pb.length += available
89+
pb.addPage()
90+
}
91+
92+
return
93+
}
94+
95+
func (pb *pageBuffer) WriteSizeAt(size int, offset int) {
96+
var b [4]byte
97+
binary.BigEndian.PutUint32(b[:], uint32(size))
98+
pb.WriteAt(b[:], offset)
99+
}
100+
101+
func (pb *pageBuffer) WriteAt(b []byte, offset int) {
102+
for _, p := range pb.slice(offset, offset+len(b)) {
103+
n, _ := p.WriteAt(b, offset)
104+
b = b[n:]
105+
offset += n
106+
}
107+
}
108+
109+
func (pb *pageBuffer) Size() int {
110+
return pb.length
111+
}
112+
113+
func (pb *pageBuffer) addPage() {
114+
p := newPage(pb.length)
115+
pb.pages = append(pb.pages, p)
116+
}
117+
118+
func (pb *pageBuffer) slice(begin, end int) []*page {
119+
i := begin / pageSize
120+
j := end / pageSize
121+
if j < len(pb.pages) {
122+
j++
123+
}
124+
return pb.pages[i:j]
125+
}
126+
127+
func (pb *pageBuffer) Scan(begin, end int, f func([]byte) bool) {
128+
for _, p := range pb.slice(begin, end) {
129+
if !f(p.slice(begin, end)) {
130+
return
131+
}
132+
}
133+
}
134+
135+
func (pb *pageBuffer) WriteTo(w io.Writer) (written int, err error) {
136+
pb.Scan(pb.cursor, pb.length, func(b []byte) bool {
137+
var n int
138+
n, err = w.Write(b)
139+
written += n
140+
return err == nil
141+
})
142+
pb.cursor += written
143+
return
144+
}
145+
146+
func (pb *pageBuffer) Slice(begin, end int) Fragment {
147+
pages := pb.slice(begin, end)
148+
f := &fragment{
149+
pages: pages,
150+
offset: begin,
151+
cursor: begin,
152+
length: end - begin,
153+
}
154+
f.ref()
155+
return f
156+
}

buffer/pagebuffer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package buffer

cmd/mokapi/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,13 @@ func createServer(cfg *static.Config) (*server.Server, error) {
113113
}
114114
http := server.NewHttpManager(scriptEngine, certStore, app)
115115
kafka := server.NewKafkaManager(scriptEngine, app)
116+
mqtt := server.NewMqttManager(scriptEngine, app)
116117
smtp := server.NewSmtpManager(app, scriptEngine, certStore)
117118
ldap := server.NewLdapDirectoryManager(scriptEngine, certStore, app)
118119

119120
watcher.AddListener(func(e dynamic.ConfigEvent) {
120121
kafka.UpdateConfig(e)
122+
mqtt.UpdateConfig(e)
121123
http.Update(e)
122124
smtp.UpdateConfig(e)
123125
ldap.UpdateConfig(e)

config/dynamic/asyncApi/convert_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestConfig_Convert(t *testing.T) {
3838
// Server
3939
require.Len(t, cfg3.Servers, 1)
4040
require.Equal(t, "test.mosquitto.org:{port}", cfg3.Servers["production"].Value.Host)
41-
require.Equal(t, "mqtt", cfg3.Servers["production"].Value.Protocol)
41+
require.Equal(t, "kafka", cfg3.Servers["production"].Value.Protocol)
4242
require.Equal(t, "Test broker", cfg3.Servers["production"].Value.Description)
4343

4444
// Channel

0 commit comments

Comments
 (0)