Skip to content

Commit 9e71a5a

Browse files
committed
kafka 2.4.0 protocol changes
1 parent 6970b40 commit 9e71a5a

File tree

13 files changed

+825
-57
lines changed

13 files changed

+825
-57
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ sudo: false
33
language: go
44

55
go:
6-
- "1.12.x"
6+
- "1.13.x"
77

88
env:
99
global:

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM alpine:3.7
1+
FROM alpine:3.11
22

33
RUN apk add --no-cache ca-certificates
44

Dockerfile.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.12 as builder
1+
FROM golang:1.13 as builder
22

33
ARG GOOS=linux
44
ARG GOARCH=amd64
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
package protocol
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"math"
6+
"math/rand"
7+
"testing"
8+
)
9+
10+
func TestEncodeDecodeCompactString(t *testing.T) {
11+
tt := []struct {
12+
name string
13+
lens []int
14+
}{
15+
{name: "empty string", lens: []int{0}},
16+
{name: "empty strings", lens: []int{0, 0}},
17+
{name: "len 1", lens: []int{1}},
18+
{name: "lens 1", lens: []int{1, 1}},
19+
{name: "len 2", lens: []int{2}},
20+
{name: "len 3", lens: []int{3}},
21+
{name: "len 4", lens: []int{4}},
22+
{name: "len 16", lens: []int{16}},
23+
{name: "len 63", lens: []int{63}},
24+
{name: "len 64", lens: []int{64}},
25+
{name: "len 128", lens: []int{128}},
26+
{name: "len 8191", lens: []int{8191}},
27+
{name: "len 8192", lens: []int{8192}},
28+
{name: "len 32767", lens: []int{math.MaxInt16}},
29+
{name: "lens 32767", lens: []int{math.MaxInt16, math.MaxInt16}},
30+
{name: "different values", lens: []int{0, 1, 2, 3, 4, 16, 64, 127, 128, 8191, 8192, 8191, 128, 127, 64, 16, 4, 3, 2, 1, 0}},
31+
}
32+
for _, tc := range tt {
33+
values := make([]string, 0)
34+
for _, l := range tc.lens {
35+
value := RandStringRunes(l)
36+
values = append(values, value)
37+
}
38+
request := &CompactStringsHolder{
39+
values: values,
40+
}
41+
buf, err := Encode(request)
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
response := &CompactStringsHolder{}
46+
err = Decode(buf, response)
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
if len(request.values) != len(response.values) {
51+
t.Fatalf("Values array lengths differ: expected %v, actual %v", request.values, response.values)
52+
}
53+
for i := range request.values {
54+
if request.values[i] != response.values[i] {
55+
t.Fatalf("Values differ: index %d, expected %v, actual %v", i, request.values[i], response.values[i])
56+
}
57+
}
58+
}
59+
}
60+
61+
func TestEncodeDecodeCompactNullableString(t *testing.T) {
62+
tt := []struct {
63+
name string
64+
lens []int
65+
}{
66+
{name: "nil ref", lens: []int{-1}},
67+
{name: "nil refs", lens: []int{-1, -1}},
68+
{name: "empty string", lens: []int{0}},
69+
{name: "empty strings", lens: []int{0, 0}},
70+
{name: "len 1", lens: []int{1}},
71+
{name: "lens 1", lens: []int{1, 1}},
72+
{name: "len 2", lens: []int{2}},
73+
{name: "len 3", lens: []int{3}},
74+
{name: "len 4", lens: []int{4}},
75+
{name: "len 16", lens: []int{16}},
76+
{name: "len 63", lens: []int{63}},
77+
{name: "len 64", lens: []int{64}},
78+
{name: "len 128", lens: []int{128}},
79+
{name: "len 8191", lens: []int{8191}},
80+
{name: "len 8192", lens: []int{8192}},
81+
{name: "len 32767", lens: []int{math.MaxInt16}},
82+
{name: "lens 32767", lens: []int{math.MaxInt16, math.MaxInt16}},
83+
{name: "different values", lens: []int{0, 1, 2, 3, 4, -1, 16, 64, 127, 128, 8191, 8192, 8191, 128, 127, -1, 64, 16, 4, 3, 2, 1, -1, 0}},
84+
}
85+
for _, tc := range tt {
86+
values := make([]*string, 0)
87+
for _, l := range tc.lens {
88+
if l >= 0 {
89+
value := RandStringRunes(l)
90+
values = append(values, &value)
91+
} else {
92+
values = append(values, nil)
93+
}
94+
}
95+
request := &CompactNullableStringsHolder{
96+
values: values,
97+
}
98+
buf, err := Encode(request)
99+
if err != nil {
100+
t.Fatal(err)
101+
}
102+
response := &CompactNullableStringsHolder{}
103+
err = Decode(buf, response)
104+
if err != nil {
105+
t.Fatal(err)
106+
}
107+
if len(request.values) != len(response.values) {
108+
t.Fatalf("Values array lengths differ: expected %v, actual %v", request.values, response.values)
109+
}
110+
for i := range request.values {
111+
if (request.values[i] != nil) != (response.values[i] != nil) {
112+
t.Errorf("is nil comperison differ: expected %v, actual %v", request.values[i] != nil, response.values[i] != nil)
113+
}
114+
if request.values[i] != nil && *request.values[i] != *response.values[i] {
115+
t.Fatalf("Values differ: index %d, expected %v, actual %v", i, request.values[i], response.values[i])
116+
}
117+
}
118+
}
119+
}
120+
121+
func TestEncodeDecodeCompactArray(t *testing.T) {
122+
compactArray := &compactArray{name: "strings", ty: typeStr}
123+
tt := []struct {
124+
name string
125+
lens []int
126+
}{
127+
{name: "empty string", lens: []int{0}},
128+
{name: "empty strings", lens: []int{0, 0}},
129+
{name: "len 1", lens: []int{1}},
130+
{name: "lens 1", lens: []int{1, 1}},
131+
{name: "len 2", lens: []int{2}},
132+
{name: "len 3", lens: []int{3}},
133+
{name: "len 4", lens: []int{4}},
134+
{name: "len 16", lens: []int{16}},
135+
{name: "len 63", lens: []int{63}},
136+
{name: "len 64", lens: []int{64}},
137+
{name: "len 128", lens: []int{128}},
138+
{name: "len 8191", lens: []int{8191}},
139+
{name: "len 8192", lens: []int{8192}},
140+
{name: "len 32767", lens: []int{math.MaxInt16}},
141+
{name: "lens 32767", lens: []int{math.MaxInt16, math.MaxInt16}},
142+
{name: "different values", lens: []int{0, 1, 2, 3, 4, 16, 64, 127, 128, 8191, 8192, 8191, 128, 127, 64, 16, 4, 3, 2, 1, 0}},
143+
}
144+
for _, tc := range tt {
145+
values := make([]interface{}, 0)
146+
for _, l := range tc.lens {
147+
value := RandStringRunes(l)
148+
values = append(values, value)
149+
}
150+
request := &CompactArrayHolder{
151+
values: values,
152+
array: compactArray,
153+
}
154+
buf, err := Encode(request)
155+
if err != nil {
156+
t.Fatal(err)
157+
}
158+
response := &CompactArrayHolder{
159+
array: compactArray,
160+
}
161+
err = Decode(buf, response)
162+
if err != nil {
163+
t.Fatal(err)
164+
}
165+
if len(request.values) != len(response.values) {
166+
t.Fatalf("Values array lengths differ: expected %v, actual %v", request.values, response.values)
167+
}
168+
for i := range request.values {
169+
if (request.values[i] != nil) != (response.values[i] != nil) {
170+
t.Errorf("is nil comperison differ: expected %v, actual %v", request.values[i] != nil, response.values[i] != nil)
171+
}
172+
if request.values[i] != nil && request.values[i] != response.values[i] {
173+
t.Fatalf("Values differ: index %d, expected %v, actual %v", i, request.values[i], response.values[i])
174+
}
175+
}
176+
}
177+
}
178+
179+
func TestEncodeDecodeCompactNullableArray(t *testing.T) {
180+
array := &compactNullableArray{name: "strings", ty: typeNullableStr}
181+
tt := []struct {
182+
name string
183+
lens []int
184+
}{
185+
{name: "nil ref", lens: []int{-1}},
186+
{name: "nil refs", lens: []int{-1, -1}},
187+
{name: "empty string", lens: []int{0}},
188+
{name: "empty strings", lens: []int{0, 0}},
189+
{name: "len 1", lens: []int{1}},
190+
{name: "lens 1", lens: []int{1, 1}},
191+
{name: "len 2", lens: []int{2}},
192+
{name: "len 3", lens: []int{3}},
193+
{name: "len 4", lens: []int{4}},
194+
{name: "len 16", lens: []int{16}},
195+
{name: "len 63", lens: []int{63}},
196+
{name: "len 64", lens: []int{64}},
197+
{name: "len 128", lens: []int{128}},
198+
{name: "len 8191", lens: []int{8191}},
199+
{name: "len 8192", lens: []int{8192}},
200+
{name: "len 32767", lens: []int{math.MaxInt16}},
201+
{name: "lens 32767", lens: []int{math.MaxInt16, math.MaxInt16}},
202+
{name: "different values", lens: []int{0, 1, 2, 3, 4, -1, 16, 64, 127, 128, 8191, 8192, 8191, 128, 127, -1, 64, 16, 4, 3, 2, 1, -1, 0}},
203+
}
204+
for _, tc := range tt {
205+
values := make([]interface{}, 0)
206+
for _, l := range tc.lens {
207+
if l >= 0 {
208+
value := RandStringRunes(l)
209+
values = append(values, &value)
210+
} else {
211+
values = append(values, new(string))
212+
}
213+
}
214+
request := &CompactNullableArrayHolder{
215+
values: values,
216+
array: array,
217+
}
218+
buf, err := Encode(request)
219+
if err != nil {
220+
t.Fatal(err)
221+
}
222+
response := &CompactNullableArrayHolder{
223+
array: array,
224+
}
225+
err = Decode(buf, response)
226+
if err != nil {
227+
t.Fatal(err)
228+
}
229+
if len(request.values) != len(response.values) {
230+
t.Fatalf("Values array lengths differ: expected %v, actual %v", request.values, response.values)
231+
}
232+
for i := range request.values {
233+
if (request.values[i].(*string) != nil) != (response.values[i].(*string) != nil) {
234+
t.Errorf("is nil comperison differ: expected %v, actual %v", request.values[i] != nil, response.values[i] != nil)
235+
}
236+
if request.values[i].(*string) != nil && *request.values[i].(*string) != *response.values[i].(*string) {
237+
t.Fatalf("Values differ: index %d, expected %v, actual %v", i, request.values[i].(*string), response.values[i].(*string))
238+
}
239+
}
240+
}
241+
}
242+
243+
type CompactStringsHolder struct {
244+
values []string
245+
}
246+
247+
func (r *CompactStringsHolder) encode(pe packetEncoder) (err error) {
248+
for _, value := range r.values {
249+
err = pe.putCompactString(value)
250+
if err != nil {
251+
return err
252+
}
253+
}
254+
return
255+
}
256+
257+
func (r *CompactStringsHolder) decode(pd packetDecoder) (err error) {
258+
r.values = make([]string, 0)
259+
var value string
260+
for ok := true; ok; ok = pd.remaining() > 0 {
261+
if value, err = pd.getCompactString(); err != nil {
262+
return err
263+
}
264+
r.values = append(r.values, value)
265+
}
266+
if pd.remaining() != 0 {
267+
return errors.Errorf("remaining bytes %d", pd.remaining())
268+
}
269+
return
270+
}
271+
272+
type CompactNullableStringsHolder struct {
273+
values []*string
274+
}
275+
276+
func (r *CompactNullableStringsHolder) encode(pe packetEncoder) (err error) {
277+
for _, value := range r.values {
278+
err = pe.putCompactNullableString(value)
279+
if err != nil {
280+
return err
281+
}
282+
}
283+
return
284+
}
285+
286+
func (r *CompactNullableStringsHolder) decode(pd packetDecoder) (err error) {
287+
r.values = make([]*string, 0)
288+
var value *string
289+
for ok := true; ok; ok = pd.remaining() > 0 {
290+
if value, err = pd.getCompactNullableString(); err != nil {
291+
return err
292+
}
293+
r.values = append(r.values, value)
294+
}
295+
if pd.remaining() != 0 {
296+
return errors.Errorf("remaining bytes %d", pd.remaining())
297+
}
298+
return
299+
}
300+
301+
type CompactArrayHolder struct {
302+
values []interface{}
303+
array *compactArray
304+
}
305+
306+
func (r *CompactArrayHolder) encode(pe packetEncoder) (err error) {
307+
return r.array.encode(pe, r.values)
308+
}
309+
310+
func (r *CompactArrayHolder) decode(pd packetDecoder) (err error) {
311+
vs, err := r.array.decode(pd)
312+
if err != nil {
313+
return err
314+
}
315+
in, ok := vs.([]interface{})
316+
if !ok {
317+
return errors.New("decoded value not a []interface{}")
318+
}
319+
r.values = in
320+
321+
if pd.remaining() != 0 {
322+
return errors.Errorf("remaining bytes %d", pd.remaining())
323+
}
324+
return
325+
}
326+
327+
type CompactNullableArrayHolder struct {
328+
values []interface{}
329+
array *compactNullableArray
330+
}
331+
332+
func (r *CompactNullableArrayHolder) encode(pe packetEncoder) (err error) {
333+
return r.array.encode(pe, r.values)
334+
}
335+
336+
func (r *CompactNullableArrayHolder) decode(pd packetDecoder) (err error) {
337+
vs, err := r.array.decode(pd)
338+
if err != nil {
339+
return err
340+
}
341+
in, ok := vs.([]interface{})
342+
if !ok {
343+
return errors.New("decoded value not a []interface{}")
344+
}
345+
r.values = in
346+
347+
if pd.remaining() != 0 {
348+
return errors.Errorf("remaining bytes %d", pd.remaining())
349+
}
350+
return
351+
}
352+
353+
func RandStringRunes(n int) string {
354+
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
355+
b := make([]rune, n)
356+
for i := range b {
357+
b[i] = letterRunes[rand.Intn(len(letterRunes))]
358+
}
359+
return string(b)
360+
}

proxy/protocol/packet_decoder.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ type packetDecoder interface {
2020
getInt64Array() ([]int64, error)
2121
getStringArray() ([]string, error)
2222

23+
getVarintBytes() ([]byte, error)
24+
25+
getCompactString() (string, error)
26+
getCompactNullableString() (*string, error)
27+
getCompactArrayLength() (int, error)
28+
getCompactNullableArrayLength() (int, error)
29+
2330
// Subsets
2431
remaining() int
2532
}

0 commit comments

Comments
 (0)