@@ -10,6 +10,125 @@ import (
1010 "time"
1111)
1212
13+ func TestHandleRequest (t * testing.T ) {
14+ buf := make ([]byte , defaultRequestBufferSize )
15+
16+ tt := []struct {
17+ name string
18+ apiKey int16
19+ apiVersion int16
20+ hexInput string
21+
22+ mustReply bool
23+ }{
24+ {name : "Produce v2, kafka-client 0.10.2.2, acks=1" , apiKey : 0 , apiVersion : 2 ,
25+ hexInput : "00000086000000020000000500144b61666b614578616d706c6550726f647563657200010000753000000001000f746573742d6e6f2d68656164657273000000010000000000000041000000000000000000000035fe96cb720100000001734a61d94200000008000001734a61d8df0000001748656c6c6f204d6f6d2031353934363830373933333131" ,
26+ mustReply : true ,
27+ },
28+ {name : "Produce v2, kafka-client 0.10.2.2, acks=all" , apiKey : 0 , apiVersion : 2 ,
29+ hexInput : "00000086000000020000000500144b61666b614578616d706c6550726f6475636572ffff0000753000000001000f746573742d6e6f2d68656164657273000000010000000000000041000000000000000000000035f472a86d0100000001734a6349bb00000008000001734a6349600000001748656c6c6f204d6f6d2031353934363830383837363438" ,
30+ mustReply : true ,
31+ },
32+ {name : "Produce v2, kafka-client 0.10.2.2, acks=0" , apiKey : 0 , apiVersion : 2 ,
33+ hexInput : "00000086000000020000000500144b61666b614578616d706c6550726f647563657200000000753000000001000f746573742d6e6f2d68656164657273000000010000000000000041000000000000000000000035557b46590100000001734a64b31f00000008000001734a64b2be0000001748656c6c6f204d6f6d2031353934363830393830313538" ,
34+ mustReply : false ,
35+ },
36+ {name : "Produce v3, kafka-client 0.11.0.2, acks=1" , apiKey : 0 , apiVersion : 3 ,
37+ hexInput : "000000c2000000030000000500144b61666b614578616d706c6550726f6475636572ffff00010000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff0231f7fe0e000000000000000001734a66bef6000001734a66bef6ffffffffffffffffffffffffffff000000017a00000010000001734a66be5f2e48656c6c6f204d6f6d203135393436383131313432303702146865616465722d6b6579186865616465722d76616c7565" ,
38+ mustReply : true ,
39+ },
40+ {name : "Produce v3, kafka-client 0.11.0.2, acks=all" , apiKey : 0 , apiVersion : 3 ,
41+ hexInput : "000000c2000000030000000500144b61666b614578616d706c6550726f6475636572ffffffff0000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff028386cf33000000000000000001734a6836c1000001734a6836c1ffffffffffffffffffffffffffff000000017a00000010000001734a68363f2e48656c6c6f204d6f6d203135393436383132313034333102146865616465722d6b6579186865616465722d76616c7565" ,
42+ mustReply : true ,
43+ },
44+ {name : "Produce v3, kafka-client 0.11.0.2, acks=0" , apiKey : 0 , apiVersion : 3 ,
45+ hexInput : "000000c2000000030000000500144b61666b614578616d706c6550726f6475636572ffff00000000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff0274ff21b4000000000000000001734a68c09a000001734a68c09affffffffffffffffffffffffffff000000017a00000010000001734a68c0162e48656c6c6f204d6f6d203135393436383132343537313802146865616465722d6b6579186865616465722d76616c7565" ,
46+ mustReply : false ,
47+ },
48+ {name : "Produce v3, kafka-client 2.5.0, acks=1" , apiKey : 0 , apiVersion : 8 ,
49+ hexInput : "000000c2000000080000000300144b61666b614578616d706c6550726f6475636572ffff00010000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff02662a226b000000000000000001734a69dfbd000001734a69dfbdffffffffffffffffffffffffffff000000017a00000010000001734a69deba2e48656c6c6f204d6f6d203135393436383133313930393802146865616465722d6b6579186865616465722d76616c7565" ,
50+ mustReply : true ,
51+ },
52+ {name : "Produce v3, kafka-client 2.5.0, acks=all" , apiKey : 0 , apiVersion : 8 ,
53+ hexInput : "000000c2000000080000000300144b61666b614578616d706c6550726f6475636572ffffffff0000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff025afc90c2000000000000000001734a6ae36d000001734a6ae36dffffffffffffffffffffffffffff000000017a00000010000001734a6ae2592e48656c6c6f204d6f6d203135393436383133383535363102146865616465722d6b6579186865616465722d76616c7565" ,
54+ mustReply : true ,
55+ },
56+ {name : "Produce v3, kafka-client 2.5.0, acks=0" , apiKey : 0 , apiVersion : 8 ,
57+ hexInput : "000000c2000000080000000300144b61666b614578616d706c6550726f6475636572ffff00000000753000000001000f746573742d6e6f2d6865616465727300000001000000000000007b00000000000000000000006fffffffff02021b145f000000000000000001734a6c14d6000001734a6c14d6ffffffffffffffffffffffffffff000000017a00000010000001734a6c13e42e48656c6c6f204d6f6d203135393436383134363337383002146865616465722d6b6579186865616465722d76616c7565" ,
58+ mustReply : false ,
59+ },
60+ {name : "ApiVersions v3, kafka-client 2.5.0" , apiKey : 18 , apiVersion : 3 ,
61+ hexInput : "00000038001200030000000000144b61666b614578616d706c6550726f647563657200126170616368652d6b61666b612d6a61766106322e352e3000" ,
62+ mustReply : true ,
63+ },
64+ {name : "Metadata v9, kafka-client 2.5.0, acks=0" , apiKey : 3 , apiVersion : 9 ,
65+ hexInput : "00000035000300090000000100144b61666b614578616d706c6550726f6475636572000210746573742d6e6f2d686561646572730001000000" ,
66+ mustReply : true ,
67+ },
68+ }
69+ for _ , tc := range tt {
70+ input , err := hex .DecodeString (tc .hexInput )
71+ if err != nil {
72+ t .Fatal (err )
73+ }
74+ output := bytes .NewBuffer (make ([]byte , 0 ))
75+ dst := & TestDeadlineWriter {
76+ Buffer : output ,
77+ }
78+ readBuffer := bytes .NewBuffer (input )
79+ writeBuffer := bytes .NewBuffer (make ([]byte , 0 ))
80+ src := & TestDeadlineReaderWriter {
81+ reader : readBuffer ,
82+ writer : writeBuffer ,
83+ }
84+
85+ openRequestsChannel := make (chan protocol.RequestKeyVersion , 1 )
86+ nextRequestHandlerChannel := make (chan RequestHandler , 1 )
87+ nextResponseHandlerChannel := make (chan ResponseHandler , 1 )
88+
89+ ctx := & RequestsLoopContext {
90+ openRequestsChannel : openRequestsChannel ,
91+ nextRequestHandlerChannel : nextRequestHandlerChannel ,
92+ nextResponseHandlerChannel : nextResponseHandlerChannel ,
93+ timeout : 1 * time .Second ,
94+ buf : buf ,
95+ localSasl : & LocalSasl {},
96+ }
97+
98+ a := assert .New (t )
99+ _ , err = defaultRequestHandler .handleRequest (dst , src , ctx )
100+ if err != nil {
101+ t .Fatal (err )
102+ }
103+ a .Equal (input , output .Bytes ()) // local sasl is not tested
104+ a .Empty (readBuffer .Bytes ()) // check all bytes from input has been read
105+
106+ select {
107+ case openRequest := <- openRequestsChannel :
108+ a .True (tc .mustReply )
109+ a .Equal (tc .apiKey , openRequest .ApiKey )
110+ a .Equal (tc .apiVersion , openRequest .ApiVersion )
111+ default :
112+ a .False (tc .mustReply )
113+ }
114+
115+ select {
116+ case nextRequestHandler := <- nextRequestHandlerChannel :
117+ a .Equal (defaultRequestHandler , nextRequestHandler )
118+ default :
119+ a .Fail ("Next request was not received" )
120+ }
121+
122+ select {
123+ case nextResponseHandler := <- nextResponseHandlerChannel :
124+ a .True (tc .mustReply )
125+ a .Equal (defaultResponseHandler , nextResponseHandler )
126+ default :
127+ a .False (tc .mustReply )
128+ }
129+ }
130+ }
131+
13132func TestHandleResponse (t * testing.T ) {
14133 netAddressMappingFunc := func (brokerHost string , brokerPort int32 ) (listenerHost string , listenerPort int32 , err error ) {
15134 if brokerHost == "localhost" {
@@ -32,6 +151,9 @@ func TestHandleResponse(t *testing.T) {
32151 hexInput string
33152 hexOutput string
34153 }{
154+ {name : "Produce v2, kafka-client 0.10.2.2" , apiKey : 0 , apiVersion : 2 ,
155+ hexInput : "000000370000000500000001000f746573742d6e6f2d6865616465727300000001000000000000000000000000000affffffffffffffff00000000" ,
156+ },
35157 {name : "Produce v5, kafka-client 1.1.1" , apiKey : 0 , apiVersion : 5 ,
36158 hexInput : "0000003f0000001000000001000f746573742d6e6f2d68656164657273000000010000000000000000000000000008ffffffffffffffff000000000000000000000000" ,
37159 },
@@ -143,8 +265,7 @@ func TestHandleResponse(t *testing.T) {
143265 ctx := & ResponsesLoopContext {openRequestsChannel : openRequestsChannel , timeout : 1 * time .Second , buf : buf , netAddressMappingFunc : netAddressMappingFunc }
144266
145267 a := assert .New (t )
146- handler := & DefaultResponseHandler {}
147- _ , err = handler .handleResponse (dst , src , ctx )
268+ _ , err = defaultResponseHandler .handleResponse (dst , src , ctx )
148269 if err != nil {
149270 t .Fatal (err )
150271 }
@@ -168,3 +289,27 @@ type TestDeadlineReader struct {
168289func (w * TestDeadlineReader ) SetReadDeadline (t time.Time ) error {
169290 return nil
170291}
292+
293+ type TestDeadlineReaderWriter struct {
294+ reader * bytes.Buffer
295+ writer * bytes.Buffer
296+ }
297+
298+ func (w * TestDeadlineReaderWriter ) SetReadDeadline (t time.Time ) error {
299+ return nil
300+ }
301+ func (w * TestDeadlineReaderWriter ) SetWriteDeadline (t time.Time ) error {
302+ return nil
303+ }
304+
305+ func (w * TestDeadlineReaderWriter ) SetDeadline (t time.Time ) error {
306+ return nil
307+ }
308+
309+ func (w * TestDeadlineReaderWriter ) Read (p []byte ) (n int , err error ) {
310+ return w .reader .Read (p )
311+ }
312+
313+ func (w * TestDeadlineReaderWriter ) Write (p []byte ) (n int , err error ) {
314+ return w .reader .Write (p )
315+ }
0 commit comments