Skip to content

Commit 66c0049

Browse files
authored
Merge pull request #675 from marle3003/develop
Develop
2 parents 5ba082b + 74e6e6b commit 66c0049

File tree

130 files changed

+3362
-1007
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+3362
-1007
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- name: Set up Go
1313
uses: actions/setup-go@v5
1414
with:
15-
go-version: 1.23.4
15+
go-version: 1.25.1
1616

1717
- name: Check out code
1818
uses: actions/checkout@v4

.github/workflows/release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
password: ${{ secrets.DOCKER_PASSWORD }}
2727
- uses: actions/setup-go@v5
2828
with:
29-
go-version: 1.23.4
29+
go-version: 1.25.1
3030
- uses: actions/setup-node@v4
3131
with:
3232
node-version: 23
@@ -59,7 +59,7 @@ jobs:
5959
fetch-depth: 0
6060
- uses: actions/setup-go@v5
6161
with:
62-
go-version: 1.23.4
62+
go-version: 1.25.1
6363
- uses: actions/setup-node@v4
6464
with:
6565
node-version: 23

api/handler_kafka.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,17 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
209209
} else if r.Method == "POST" {
210210
records, err := getProduceRecords(r)
211211
if err != nil {
212-
http.Error(w, err.Error(), http.StatusBadRequest)
212+
writeError(w, err, http.StatusBadRequest)
213213
return
214214
}
215215
c := store.NewClient(k.Store, h.app.Monitor.Kafka)
216216
ct := media.ParseContentType(r.Header.Get("Content-Type"))
217217
result, err := c.Write(topicName, records, &ct)
218218
if err != nil {
219219
if errors.Is(err, store.TopicNotFound) || errors.Is(err, store.PartitionNotFound) {
220-
http.Error(w, err.Error(), http.StatusNotFound)
220+
writeError(w, err, http.StatusNotFound)
221221
} else {
222-
http.Error(w, err.Error(), http.StatusBadRequest)
222+
writeError(w, err, http.StatusBadRequest)
223223
}
224224
}
225225
res := produceResponse{}
@@ -266,7 +266,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
266266
idValue := segments[7]
267267
id, err := strconv.Atoi(idValue)
268268
if err != nil {
269-
http.Error(w, "error partition ID is not an integer", http.StatusBadRequest)
269+
writeError(w, fmt.Errorf("error partition ID is not an integer"), http.StatusBadRequest)
270270
return
271271
}
272272
p := t.Partition(id)
@@ -280,7 +280,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
280280
} else {
281281
records, err := getProduceRecords(r)
282282
if err != nil {
283-
http.Error(w, err.Error(), http.StatusBadRequest)
283+
writeError(w, err, http.StatusBadRequest)
284284
return
285285
}
286286
for _, record := range records {
@@ -291,9 +291,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
291291
result, err := c.Write(topicName, records, &ct)
292292
if err != nil {
293293
if errors.Is(err, store.TopicNotFound) || errors.Is(err, store.PartitionNotFound) {
294-
http.Error(w, err.Error(), http.StatusNotFound)
294+
writeError(w, err, http.StatusNotFound)
295295
} else {
296-
http.Error(w, err.Error(), http.StatusBadRequest)
296+
writeError(w, err, http.StatusBadRequest)
297297
}
298298
}
299299
res := produceResponse{}
@@ -320,15 +320,15 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
320320
idValue := segments[7]
321321
id, err := strconv.Atoi(idValue)
322322
if err != nil {
323-
http.Error(w, fmt.Errorf("error partition ID is not an integer").Error(), http.StatusBadRequest)
323+
writeError(w, fmt.Errorf("error partition ID is not an integer"), http.StatusBadRequest)
324324
return
325325
}
326326
offsetValue := r.URL.Query().Get("offset")
327327
offset := -1
328328
if offsetValue != "" {
329329
offset, err = strconv.Atoi(offsetValue)
330330
if err != nil {
331-
http.Error(w, fmt.Errorf("error offset is not an integer").Error(), http.StatusBadRequest)
331+
writeError(w, fmt.Errorf("error offset is not an integer"), http.StatusBadRequest)
332332
return
333333
}
334334
}
@@ -338,9 +338,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
338338
records, err := c.Read(topicName, id, int64(offset), &ct)
339339
if err != nil {
340340
if errors.Is(err, store.TopicNotFound) || errors.Is(err, store.PartitionNotFound) {
341-
http.Error(w, err.Error(), http.StatusNotFound)
341+
writeError(w, err, http.StatusNotFound)
342342
} else {
343-
http.Error(w, err.Error(), http.StatusInternalServerError)
343+
writeError(w, err, http.StatusInternalServerError)
344344
}
345345
return
346346
}
@@ -358,13 +358,13 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
358358
idValue := segments[7]
359359
id, err := strconv.Atoi(idValue)
360360
if err != nil {
361-
http.Error(w, fmt.Errorf("error partition ID is not an integer").Error(), http.StatusBadRequest)
361+
writeError(w, fmt.Errorf("error partition ID is not an integer"), http.StatusBadRequest)
362362
return
363363
}
364364
offsetValue := segments[9]
365365
offset, err := strconv.Atoi(offsetValue)
366366
if err != nil {
367-
http.Error(w, fmt.Errorf("error offset is not an integer").Error(), http.StatusBadRequest)
367+
writeError(w, fmt.Errorf("error offset is not an integer"), http.StatusBadRequest)
368368
return
369369
}
370370

@@ -373,9 +373,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
373373
records, err := c.Read(topicName, id, int64(offset), &ct)
374374
if err != nil {
375375
if errors.Is(err, store.TopicNotFound) || errors.Is(err, store.PartitionNotFound) {
376-
http.Error(w, err.Error(), http.StatusNotFound)
376+
writeError(w, err, http.StatusNotFound)
377377
} else {
378-
http.Error(w, err.Error(), http.StatusBadRequest)
378+
writeError(w, err, http.StatusBadRequest)
379379
}
380380
return
381381
}

api/handler_search.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package api
22

33
import (
4+
"mokapi/runtime"
45
"mokapi/runtime/search"
56
"net/http"
67
"net/url"
8+
"slices"
79
"strings"
810
)
911

@@ -54,8 +56,10 @@ func getQueryParamInsensitive(values url.Values, key string) string {
5456
func getFacets(query url.Values) map[string]string {
5557
facets := make(map[string]string)
5658
for key, values := range query {
57-
if !searchFacetExcludeQueryParams[key] {
58-
facets[key] = values[0]
59+
if slices.Contains(runtime.SupportedFacets, key) {
60+
if !searchFacetExcludeQueryParams[key] {
61+
facets[key] = values[0]
62+
}
5963
}
6064
}
6165
return facets

api/handler_search_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,35 @@ func TestHandler_SearchQuery(t *testing.T) {
139139
_, err := app.Kafka.Add(toConfig(k), enginetest.NewEngine())
140140
require.NoError(t, err)
141141

142+
return app
143+
},
144+
},
145+
{
146+
name: "search with additional query parameter should not be used as facet",
147+
requestUrl: "/api/search/query?q=foo&refresh=20",
148+
response: []try.ResponseCondition{
149+
try.HasStatusCode(200),
150+
try.HasHeader("Content-Type", "application/json"),
151+
try.AssertBody(func(t *testing.T, body string) {
152+
var result search.Result
153+
err := json.Unmarshal([]byte(body), &result)
154+
require.NoError(t, err)
155+
require.Len(t, result.Facets, 1)
156+
require.Equal(t, []search.FacetValue{{Value: "HTTP", Count: 1}, {Value: "Kafka", Count: 1}}, result.Facets["type"])
157+
require.Len(t, result.Results, 2)
158+
}),
159+
},
160+
app: func() *runtime.App {
161+
app := runtime.New(&static.Config{Api: static.Api{Search: static.Search{
162+
Enabled: true,
163+
}}})
164+
165+
h := openapitest.NewConfig("3.0", openapitest.WithInfo("foo", "", ""))
166+
app.AddHttp(toConfig(h))
167+
k := asyncapitest.NewConfig(asyncapitest.WithInfo("foo", "", ""))
168+
_, err := app.Kafka.Add(toConfig(k), enginetest.NewEngine())
169+
require.NoError(t, err)
170+
142171
return app
143172
},
144173
},

buffer/page.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
const pageSize = 65536
99

1010
var (
11-
pagePool = sync.Pool{New: func() interface{} { return new(page) }}
11+
pagePool = sync.Pool{New: func() interface{} {
12+
return new(page)
13+
}}
1214
)
1315

1416
type page struct {

buffer/pagebuffer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
)
99

1010
var (
11-
pageBufferPool = sync.Pool{New: func() interface{} { return new(pageBuffer) }}
11+
pageBufferPool = sync.Pool{New: func() interface{} {
12+
return new(pageBuffer)
13+
}}
1214
)
1315

1416
type refCounter uint32
@@ -78,12 +80,12 @@ func (pb *pageBuffer) Write(b []byte) (n int, err error) {
7880
available := pageSize - tail.Size()
7981

8082
if len(b) <= available {
81-
tail.Write(b)
83+
_, _ = tail.Write(b)
8284
pb.length += len(b)
8385
break
8486
}
8587

86-
tail.Write(b[:available])
88+
_, _ = tail.Write(b[:available])
8789
b = b[available:]
8890
pb.length += available
8991
pb.addPage()

buffer/pagebuffer_test.go

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,109 @@
1-
package buffer
1+
package buffer_test
2+
3+
import (
4+
"bytes"
5+
"github.com/stretchr/testify/require"
6+
"mokapi/buffer"
7+
"strings"
8+
"testing"
9+
)
10+
11+
func TestPageBuffer(t *testing.T) {
12+
testcases := []struct {
13+
name string
14+
test func(t *testing.T)
15+
}{
16+
{
17+
name: "Write",
18+
test: func(t *testing.T) {
19+
pb := buffer.NewPageBuffer()
20+
n, err := pb.Write([]byte("hello world"))
21+
require.NoError(t, err)
22+
require.Equal(t, 11, n)
23+
require.Equal(t, 11, pb.Size())
24+
},
25+
},
26+
{
27+
name: "WriteAt",
28+
test: func(t *testing.T) {
29+
pb := buffer.NewPageBuffer()
30+
_, _ = pb.Write([]byte("hello world"))
31+
pb.WriteAt([]byte("W"), 6)
32+
require.Equal(t, 11, pb.Size())
33+
b := new(bytes.Buffer)
34+
n, err := pb.WriteTo(b)
35+
require.NoError(t, err)
36+
require.Equal(t, 11, n)
37+
require.Equal(t, "hello World", b.String())
38+
},
39+
},
40+
{
41+
name: "should use pool",
42+
test: func(t *testing.T) {
43+
// init
44+
pb := buffer.NewPageBuffer()
45+
data := []byte("hello world")
46+
_, _ = pb.Write(data)
47+
48+
n := testing.AllocsPerRun(10, func() {
49+
pb.Unref()
50+
pb = buffer.NewPageBuffer()
51+
_, _ = pb.Write(data)
52+
})
53+
54+
//
55+
require.Equal(t, float64(1), n, "expected 1 allocations per run (pages array)")
56+
},
57+
},
58+
{
59+
name: "test page size",
60+
test: func(t *testing.T) {
61+
data := make([]byte, 65537)
62+
pb := buffer.NewPageBuffer()
63+
_, err := pb.Write(data)
64+
require.NoError(t, err)
65+
require.Equal(t, 65537, pb.Size())
66+
},
67+
},
68+
{
69+
name: "using pages with slice",
70+
test: func(t *testing.T) {
71+
data := make([]byte, 100000)
72+
pb := buffer.NewPageBuffer()
73+
_, err := pb.Write(data)
74+
require.NoError(t, err)
75+
f := pb.Slice(65537, 70000)
76+
require.Equal(t, 4463, f.Size())
77+
78+
n := testing.AllocsPerRun(1, func() {
79+
pb.Unref()
80+
pb = buffer.NewPageBuffer()
81+
_, _ = pb.Write(data)
82+
})
83+
// expected 2, one for the page used by fragment and one for the buffer array in the page
84+
require.Equal(t, float64(4), n, "expected 4 allocations (2 additional for pages)")
85+
},
86+
},
87+
{
88+
name: "read from fragment",
89+
test: func(t *testing.T) {
90+
data := []byte(strings.Repeat("a", 100000))
91+
pb := buffer.NewPageBuffer()
92+
_, err := pb.Write(data)
93+
require.NoError(t, err)
94+
f := pb.Slice(65537, 70000)
95+
require.Equal(t, 4463, f.Size())
96+
b := make([]byte, 4463)
97+
n, err := f.Read(b)
98+
require.NoError(t, err)
99+
require.Equal(t, 4463, n)
100+
},
101+
},
102+
}
103+
104+
for _, tc := range testcases {
105+
t.Run(tc.name, func(t *testing.T) {
106+
tc.test(t)
107+
})
108+
}
109+
}

0 commit comments

Comments
 (0)