Skip to content

Commit 0c55763

Browse files
committed
Allow disabling produce required acks check
1 parent fec2378 commit 0c55763

File tree

6 files changed

+20
-3
lines changed

6 files changed

+20
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ See:
109109
--log-level-fieldname string Log level fieldname for json format (default "@level")
110110
--log-msg-fieldname string Message fieldname for json format (default "@message")
111111
--log-time-fieldname string Time fieldname for json format (default "@timestamp")
112+
--producer-acks-0-disabled Assume fire-and-forget is never sent by the producer. Enabling this parameter will increase performance
112113
--proxy-listener-ca-chain-cert-file string PEM encoded CA's certificate file. If provided, client certificate is required and verified
113114
--proxy-listener-cert-file string PEM encoded file with server certificate
114115
--proxy-listener-cipher-suites stringSlice List of supported cipher suites

cmd/kafka-proxy/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ func initFlags() {
142142
// http://kafka.apache.org/protocol.html#protocol_api_keys
143143
Server.Flags().IntSliceVar(&c.Kafka.ForbiddenApiKeys, "forbidden-api-keys", []int{}, "Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics")
144144

145+
Server.Flags().BoolVar(&c.Kafka.Producer.Acks0Disabled, "producer-acks-0-disabled", false, "Assume fire-and-forget is never sent by the producer. Enabling this parameter will increase performance")
146+
145147
// TLS
146148
Server.Flags().BoolVar(&c.Kafka.TLS.Enable, "tls-enable", false, "Whether or not to use TLS when connecting to the broker")
147149
Server.Flags().BoolVar(&c.Kafka.TLS.InsecureSkipVerify, "tls-insecure-skip-verify", false, "It controls whether a client verifies the server's certificate chain and host name")

config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ type Config struct {
141141
Timeout time.Duration
142142
}
143143
}
144+
Producer struct {
145+
Acks0Disabled bool
146+
}
144147
}
145148
ForwardProxy struct {
146149
Url string

proxy/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
155155
timeout: c.Auth.Gateway.Server.Timeout,
156156
tokenInfo: gatewayTokenInfo,
157157
},
158-
ForbiddenApiKeys: forbiddenApiKeys,
158+
ForbiddenApiKeys: forbiddenApiKeys,
159+
ProducerAcks0Disabled: c.Kafka.Producer.Acks0Disabled,
159160
},
160161
dialAddressMapping: dialAddressMapping,
161162
kafkaClientCert: kafkaClientCert,

proxy/processor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ProcessorConfig struct {
4141
LocalSasl *LocalSasl
4242
AuthServer *AuthServer
4343
ForbiddenApiKeys map[int16]struct{}
44+
ProducerAcks0Disabled bool
4445
}
4546

4647
type processor struct {
@@ -60,6 +61,8 @@ type processor struct {
6061
forbiddenApiKeys map[int16]struct{}
6162
// metrics
6263
brokerAddress string
64+
// producer will never send request with acks=0
65+
producerAcks0Disabled bool
6366
}
6467

6568
func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor {
@@ -103,6 +106,7 @@ func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor {
103106
localSasl: cfg.LocalSasl,
104107
authServer: cfg.AuthServer,
105108
forbiddenApiKeys: cfg.ForbiddenApiKeys,
109+
producerAcks0Disabled: cfg.ProducerAcks0Disabled,
106110
}
107111
}
108112

@@ -125,6 +129,7 @@ func (p *processor) RequestsLoop(dst DeadlineWriter, src DeadlineReaderWriter) (
125129
buf: make([]byte, p.requestBufferSize),
126130
localSasl: p.localSasl,
127131
localSaslDone: false, // sequential processing - mutex is required
132+
producerAcks0Disabled: p.producerAcks0Disabled,
128133
}
129134

130135
return ctx.requestsLoop(dst, src)
@@ -142,6 +147,8 @@ type RequestsLoopContext struct {
142147

143148
localSasl *LocalSasl
144149
localSaslDone bool
150+
151+
producerAcks0Disabled bool
145152
}
146153

147154
// used by local authentication

proxy/processor_default.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
8585
}
8686
}
8787

88-
mustReply, readBytes, err := handler.mustReply(requestKeyVersion, src)
88+
mustReply, readBytes, err := handler.mustReply(requestKeyVersion, src, ctx)
8989
if err != nil {
9090
return true, err
9191
}
@@ -133,8 +133,11 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
133133
}
134134
}
135135

136-
func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.RequestKeyVersion, src io.Reader) (bool, []byte, error) {
136+
func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.RequestKeyVersion, src io.Reader, ctx *RequestsLoopContext) (bool, []byte, error) {
137137
if requestKeyVersion.ApiKey == apiKeyProduce {
138+
if ctx.producerAcks0Disabled {
139+
return true, nil, nil
140+
}
138141
// header version for produce [0..8] is 1 (request_api_key,request_api_version,correlation_id (INT32),client_id, NULLABLE_STRING )
139142
acksReader := protocol.RequestAcksReader{}
140143

0 commit comments

Comments
 (0)