Skip to content

Commit fec2378

Browse files
committed
Do not wait for produce response when acks is 0
1 parent e5d29a5 commit fec2378

File tree

3 files changed

+139
-10
lines changed

3 files changed

+139
-10
lines changed

proxy/processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
defaultReadTimeout = 30 * time.Second
1717
minOpenRequests = 16
1818

19+
apiKeyProduce = int16(0)
1920
apiKeySaslHandshake = int16(17)
2021
apiKeyApiApiVersions = int16(18)
2122

proxy/processor_default.go

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package proxy
22

33
import (
4+
"bytes"
45
"errors"
56
"fmt"
67
"github.com/grepplabs/kafka-proxy/proxy/protocol"
@@ -20,8 +21,12 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
2021
// logrus.Println("Await Kafka request")
2122

2223
// waiting for first bytes or EOF - reset deadlines
23-
src.SetReadDeadline(time.Time{})
24-
dst.SetWriteDeadline(time.Time{})
24+
if err = src.SetReadDeadline(time.Time{}); err != nil {
25+
return true, err
26+
}
27+
if err = dst.SetWriteDeadline(time.Time{}); err != nil {
28+
return true, err
29+
}
2530

2631
keyVersionBuf := make([]byte, 8) // Size => int32 + ApiKey => int16 + ApiVersion => int16
2732

@@ -67,8 +72,9 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
6772
return true, fmt.Errorf("only saslHandshake version 0 and 1 are supported, got version %d", requestKeyVersion.ApiVersion)
6873
}
6974
ctx.localSaslDone = true
70-
src.SetDeadline(time.Time{})
71-
75+
if err = src.SetDeadline(time.Time{}); err != nil {
76+
return false, err
77+
}
7278
// defaultRequestHandler was consumed but due to local handling enqueued defaultResponseHandler will not be.
7379
return false, ctx.putNextRequestHandler(defaultRequestHandler)
7480
case apiKeyApiApiVersions:
@@ -79,11 +85,18 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
7985
}
8086
}
8187

82-
// send inFlightRequest to channel before myCopyN to prevent race condition in proxyResponses
83-
if err = sendRequestKeyVersion(ctx.openRequestsChannel, openRequestSendTimeout, requestKeyVersion); err != nil {
88+
mustReply, readBytes, err := handler.mustReply(requestKeyVersion, src)
89+
if err != nil {
8490
return true, err
8591
}
8692

93+
// send inFlightRequest to channel before myCopyN to prevent race condition in proxyResponses
94+
if mustReply {
95+
if err = sendRequestKeyVersion(ctx.openRequestsChannel, openRequestSendTimeout, requestKeyVersion); err != nil {
96+
return true, err
97+
}
98+
}
99+
87100
requestDeadline := time.Now().Add(ctx.timeout)
88101
err = dst.SetWriteDeadline(requestDeadline)
89102
if err != nil {
@@ -98,24 +111,79 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
98111
if _, err = dst.Write(keyVersionBuf); err != nil {
99112
return false, err
100113
}
114+
// write - send to broker
115+
if len(readBytes) > 0 {
116+
if _, err = dst.Write(readBytes); err != nil {
117+
return false, err
118+
}
119+
}
101120
// 4 bytes were written as keyVersionBuf (ApiKey, ApiVersion)
102-
if readErr, err = myCopyN(dst, src, int64(requestKeyVersion.Length-4), ctx.buf); err != nil {
121+
if readErr, err = myCopyN(dst, src, int64(requestKeyVersion.Length-int32(4+len(readBytes))), ctx.buf); err != nil {
103122
return readErr, err
104123
}
105124
if requestKeyVersion.ApiKey == apiKeySaslHandshake {
106125
if requestKeyVersion.ApiVersion == 0 {
107126
return false, ctx.putNextHandlers(saslAuthV0RequestHandler, saslAuthV0ResponseHandler)
108127
}
109128
}
110-
return false, ctx.putNextHandlers(defaultRequestHandler, defaultResponseHandler)
129+
if mustReply {
130+
return false, ctx.putNextHandlers(defaultRequestHandler, defaultResponseHandler)
131+
} else {
132+
return false, ctx.putNextRequestHandler(defaultRequestHandler)
133+
}
134+
}
135+
136+
func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.RequestKeyVersion, src io.Reader) (bool, []byte, error) {
137+
if requestKeyVersion.ApiKey == apiKeyProduce {
138+
// header version for produce [0..8] is 1 (request_api_key,request_api_version,correlation_id (INT32),client_id, NULLABLE_STRING )
139+
acksReader := protocol.RequestAcksReader{}
140+
141+
var (
142+
acks int16
143+
err error
144+
)
145+
var bufferRead bytes.Buffer
146+
reader := io.TeeReader(src, &bufferRead)
147+
switch requestKeyVersion.ApiVersion {
148+
case 0, 1, 2:
149+
// CorrelationID + ClientID
150+
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
151+
return false, nil, err
152+
}
153+
// acks (INT16)
154+
acks, err = acksReader.ReadAndDiscardProduceAcks(reader)
155+
if err != nil {
156+
return false, nil, err
157+
}
158+
159+
case 3, 4, 5, 6, 7, 8:
160+
// CorrelationID + ClientID
161+
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
162+
return false, nil, err
163+
}
164+
// transactional_id (NULLABLE_STRING),acks (INT16)
165+
acks, err = acksReader.ReadAndDiscardProduceTxnAcks(reader)
166+
if err != nil {
167+
return false, nil, err
168+
}
169+
default:
170+
return false, nil, fmt.Errorf("produce version %d is not supported", requestKeyVersion.ApiVersion)
171+
}
172+
return acks != 0, bufferRead.Bytes(), nil
173+
}
174+
return true, nil, nil
111175
}
112176

113177
func (handler *DefaultResponseHandler) handleResponse(dst DeadlineWriter, src DeadlineReader, ctx *ResponsesLoopContext) (readErr bool, err error) {
114178
//logrus.Println("Await Kafka response")
115179

116180
// waiting for first bytes or EOF - reset deadlines
117-
src.SetReadDeadline(time.Time{})
118-
dst.SetWriteDeadline(time.Time{})
181+
if err = src.SetReadDeadline(time.Time{}); err != nil {
182+
return true, err
183+
}
184+
if err = dst.SetWriteDeadline(time.Time{}); err != nil {
185+
return true, err
186+
}
119187

120188
responseHeaderBuf := make([]byte, 8) // Size => int32, CorrelationId => int32
121189
if _, err = io.ReadFull(src, responseHeaderBuf); err != nil {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package protocol
2+
3+
import (
4+
"encoding/binary"
5+
"io"
6+
"io/ioutil"
7+
)
8+
9+
type RequestAcksReader struct {
10+
}
11+
12+
func (r RequestAcksReader) readAndDiscardNullableString(reader io.Reader) (err error) {
13+
var length int16
14+
if err = binary.Read(reader, binary.BigEndian, &length); err != nil {
15+
return err
16+
}
17+
if length < -1 {
18+
return errInvalidStringLength
19+
}
20+
if length > 0 {
21+
if _, err = io.CopyN(ioutil.Discard, reader, int64(length)); err != nil {
22+
return err
23+
}
24+
}
25+
return nil
26+
}
27+
28+
func (r RequestAcksReader) ReadAndDiscardHeaderV1Part(reader io.Reader) (err error) {
29+
// CorrelationID int32
30+
var correlationID int32
31+
if err = binary.Read(reader, binary.BigEndian, &correlationID); err != nil {
32+
return err
33+
}
34+
// ClientID *string
35+
if err = r.readAndDiscardNullableString(reader); err != nil {
36+
return err
37+
}
38+
return nil
39+
}
40+
41+
func (r RequestAcksReader) ReadAndDiscardProduceAcks(reader io.Reader) (acks int16, err error) {
42+
// Acks int16
43+
if err = binary.Read(reader, binary.BigEndian, &acks); err != nil {
44+
return 0, err
45+
}
46+
return acks, nil
47+
}
48+
49+
func (r RequestAcksReader) ReadAndDiscardProduceTxnAcks(reader io.Reader) (acks int16, err error) {
50+
// TransactionalId *string
51+
if err = r.readAndDiscardNullableString(reader); err != nil {
52+
return 0, err
53+
}
54+
55+
// Acks int16
56+
if err = binary.Read(reader, binary.BigEndian, &acks); err != nil {
57+
return 0, err
58+
}
59+
return acks, nil
60+
}

0 commit comments

Comments
 (0)