Skip to content

Commit 6d694d7

Browse files
committed
kafka 2.4.0 response header version 1
1 parent 9e71a5a commit 6d694d7

File tree

8 files changed

+498
-11
lines changed

8 files changed

+498
-11
lines changed

proxy/processor_default.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,25 @@ func (handler *DefaultResponseHandler) handleResponse(dst DeadlineWriter, src De
144144
if err != nil {
145145
return true, err
146146
}
147+
responseHeaderTaggedFields, err := protocol.NewResponseHeaderTaggedFields(requestKeyVersion)
148+
if err != nil {
149+
return true, err
150+
}
151+
unknownTaggedFields, err := responseHeaderTaggedFields.MaybeRead(src)
152+
if err != nil {
153+
return true, err
154+
}
155+
readResponsesHeaderLength := int32(4 + len(unknownTaggedFields)) // 4 = Length + CorrelationID
147156

148157
responseModifier, err := protocol.GetResponseModifier(requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion, ctx.netAddressMappingFunc)
149158
if err != nil {
150159
return true, err
151160
}
152161
if responseModifier != nil {
153-
if int32(responseHeader.Length) > protocol.MaxResponseSize {
162+
if responseHeader.Length > protocol.MaxResponseSize {
154163
return true, protocol.PacketDecodingError{Info: fmt.Sprintf("message of length %d too large", responseHeader.Length)}
155164
}
156-
resp := make([]byte, int(responseHeader.Length-4))
165+
resp := make([]byte, int(responseHeader.Length-readResponsesHeaderLength))
157166
if _, err = io.ReadFull(src, resp); err != nil {
158167
return true, err
159168
}
@@ -162,13 +171,16 @@ func (handler *DefaultResponseHandler) handleResponse(dst DeadlineWriter, src De
162171
return true, err
163172
}
164173
// add 4 bytes (CorrelationId) to the length
165-
newHeaderBuf, err := protocol.Encode(&protocol.ResponseHeader{Length: int32(len(newResponseBuf) + 4), CorrelationID: responseHeader.CorrelationID})
174+
newHeaderBuf, err := protocol.Encode(&protocol.ResponseHeader{Length: int32(len(newResponseBuf) + int(readResponsesHeaderLength)), CorrelationID: responseHeader.CorrelationID})
166175
if err != nil {
167176
return true, err
168177
}
169178
if _, err := dst.Write(newHeaderBuf); err != nil {
170179
return false, err
171180
}
181+
if _, err := dst.Write(unknownTaggedFields); err != nil {
182+
return false, err
183+
}
172184
if _, err := dst.Write(newResponseBuf); err != nil {
173185
return false, err
174186
}
@@ -177,8 +189,11 @@ func (handler *DefaultResponseHandler) handleResponse(dst DeadlineWriter, src De
177189
if _, err := dst.Write(responseHeaderBuf); err != nil {
178190
return false, err
179191
}
180-
// 4 bytes were written as responseHeaderBuf (CorrelationId)
181-
if readErr, err = myCopyN(dst, src, int64(responseHeader.Length-4), ctx.buf); err != nil {
192+
if _, err := dst.Write(unknownTaggedFields); err != nil {
193+
return false, err
194+
}
195+
// 4 bytes were written as responseHeaderBuf (CorrelationId) + tagged fields
196+
if readErr, err = myCopyN(dst, src, int64(responseHeader.Length-readResponsesHeaderLength), ctx.buf); err != nil {
182197
return readErr, err
183198
}
184199
}

proxy/protocol/prep_encoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (pe *prepEncoder) putInt64(in int64) {
3030

3131
func (pe *prepEncoder) putVarint(in int64) {
3232
var buf [binary.MaxVarintLen64]byte
33-
pe.length += binary.PutVarint(buf[:], in)
33+
pe.length += binary.PutUvarint(buf[:], uint64(in))
3434
}
3535

3636
func (pe *prepEncoder) putArrayLength(in int) error {

proxy/protocol/real_decoder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (rd *realDecoder) getInt64() (int64, error) {
6161
}
6262

6363
func (rd *realDecoder) getVarint() (int64, error) {
64-
tmp, n := binary.Varint(rd.raw[rd.off:])
64+
tmp, n := binary.Uvarint(rd.raw[rd.off:])
6565
if n == 0 {
6666
rd.off = len(rd.raw)
6767
return -1, ErrInsufficientData
@@ -71,7 +71,7 @@ func (rd *realDecoder) getVarint() (int64, error) {
7171
return -1, errVarintOverflow
7272
}
7373
rd.off += n
74-
return tmp, nil
74+
return int64(tmp), nil
7575
}
7676

7777
func (rd *realDecoder) getArrayLength() (int, error) {

proxy/protocol/real_encoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (re *realEncoder) putInt64(in int64) {
3232
}
3333

3434
func (re *realEncoder) putVarint(in int64) {
35-
re.off += binary.PutVarint(re.raw[re.off:], in)
35+
re.off += binary.PutUvarint(re.raw[re.off:], uint64(in))
3636
}
3737

3838
func (re *realEncoder) putArrayLength(in int) error {

proxy/protocol/request_key_version.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,225 @@ func (r *RequestKeyVersion) decode(pd packetDecoder) (err error) {
2323
r.ApiVersion, err = pd.getInt16()
2424
return err
2525
}
26+
27+
// Determine response header version. Function returns -1 for unknown api key.
28+
// See also public short responseHeaderVersion(short _version) in kafka/clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java
29+
func (r *RequestKeyVersion) ResponseHeaderVersion() int16 {
30+
switch r.ApiKey {
31+
case 0:
32+
return 0
33+
case 1:
34+
return 0
35+
case 2:
36+
return 0
37+
case 3:
38+
if r.ApiVersion >= 9 {
39+
return 1
40+
} else {
41+
return 0
42+
}
43+
case 4:
44+
if r.ApiVersion >= 4 {
45+
return 1
46+
} else {
47+
return 0
48+
}
49+
case 5:
50+
if r.ApiVersion >= 2 {
51+
return 1
52+
} else {
53+
return 0
54+
}
55+
case 6:
56+
if r.ApiVersion >= 6 {
57+
return 1
58+
} else {
59+
return 0
60+
}
61+
case 7:
62+
if r.ApiVersion >= 3 {
63+
return 1
64+
} else {
65+
return 0
66+
}
67+
case 8:
68+
if r.ApiVersion >= 8 {
69+
return 1
70+
} else {
71+
return 0
72+
}
73+
case 9:
74+
if r.ApiVersion >= 6 {
75+
return 1
76+
} else {
77+
return 0
78+
}
79+
case 10:
80+
if r.ApiVersion >= 3 {
81+
return 1
82+
} else {
83+
return 0
84+
}
85+
case 11:
86+
if r.ApiVersion >= 6 {
87+
return 1
88+
} else {
89+
return 0
90+
}
91+
case 12:
92+
if r.ApiVersion >= 4 {
93+
return 1
94+
} else {
95+
return 0
96+
}
97+
case 13:
98+
if r.ApiVersion >= 4 {
99+
return 1
100+
} else {
101+
return 0
102+
}
103+
case 14:
104+
if r.ApiVersion >= 4 {
105+
return 1
106+
} else {
107+
return 0
108+
}
109+
case 15:
110+
if r.ApiVersion >= 5 {
111+
return 1
112+
} else {
113+
return 0
114+
}
115+
case 16:
116+
if r.ApiVersion >= 3 {
117+
return 1
118+
} else {
119+
return 0
120+
}
121+
case 17:
122+
if r.ApiVersion >= 2 {
123+
return 1
124+
} else {
125+
return 0
126+
}
127+
case 18:
128+
return 0
129+
case 19:
130+
if r.ApiVersion >= 5 {
131+
return 1
132+
} else {
133+
return 0
134+
}
135+
case 20:
136+
if r.ApiVersion >= 4 {
137+
return 1
138+
} else {
139+
return 0
140+
}
141+
case 21:
142+
return 0
143+
case 22:
144+
if r.ApiVersion >= 2 {
145+
return 1
146+
} else {
147+
return 0
148+
}
149+
case 23:
150+
return 0
151+
case 24:
152+
return 0
153+
case 25:
154+
return 0
155+
case 26:
156+
return 0
157+
case 27:
158+
return 0
159+
case 28:
160+
if r.ApiVersion >= 3 {
161+
return 1
162+
} else {
163+
return 0
164+
}
165+
case 29:
166+
if r.ApiVersion >= 2 {
167+
return 1
168+
} else {
169+
return 0
170+
}
171+
case 30:
172+
return 0
173+
case 31:
174+
return 0
175+
case 32:
176+
return 0
177+
case 33:
178+
return 0
179+
case 34:
180+
return 0
181+
case 35:
182+
return 0
183+
case 36:
184+
if r.ApiVersion >= 2 {
185+
return 1
186+
} else {
187+
return 0
188+
}
189+
case 37:
190+
if r.ApiVersion >= 2 {
191+
return 1
192+
} else {
193+
return 0
194+
}
195+
case 38:
196+
if r.ApiVersion >= 2 {
197+
return 1
198+
} else {
199+
return 0
200+
}
201+
case 39:
202+
if r.ApiVersion >= 2 {
203+
return 1
204+
} else {
205+
return 0
206+
}
207+
case 40:
208+
if r.ApiVersion >= 2 {
209+
return 1
210+
} else {
211+
return 0
212+
}
213+
case 41:
214+
if r.ApiVersion >= 2 {
215+
return 1
216+
} else {
217+
return 0
218+
}
219+
case 42:
220+
if r.ApiVersion >= 2 {
221+
return 1
222+
} else {
223+
return 0
224+
}
225+
case 43:
226+
if r.ApiVersion >= 2 {
227+
return 1
228+
} else {
229+
return 0
230+
}
231+
case 44:
232+
if r.ApiVersion >= 1 {
233+
return 1
234+
} else {
235+
return 0
236+
}
237+
case 45:
238+
return 1
239+
case 46:
240+
return 1
241+
case 47:
242+
return 0
243+
default:
244+
// throw new UnsupportedVersionException("Unsupported API key " + apiKey);
245+
return -1
246+
}
247+
}

0 commit comments

Comments
 (0)