@@ -138,13 +138,8 @@ impl KafkaConsumer {
138
138
. expect ( "create_consumer task panicked" ) ?;
139
139
140
140
loop {
141
- // FIXME(jplatte): I don't know if StreamConsumer::recv has an internal buffer.
142
- // Overall, rdkafka seems to be doing a bunch of background magic so maybe it does.
143
- // In that case, it's likely already doing batching reads (which don't seem to be
144
- // a thing in the public API) internally.
145
- // If not, we should likely do some sort of batching ourselves, e.g. have two separate
146
- // tokio tasks, one which pulls messages from Kafka and one that processes them, with
147
- // a bounded channel in between for backpressure.
141
+ // It's fine to pull messages one-by-one without any buffering in our own code because
142
+ // rdkafka buffers messages internally through a background task / thread.
148
143
let msg = consumer. recv ( ) . await ?;
149
144
tracing:: debug!( "Received a message" ) ;
150
145
@@ -178,12 +173,11 @@ impl KafkaConsumer {
178
173
}
179
174
}
180
175
181
- // FIXME(jplatte): Unlike recv above, this seems less likely to be auto-coalesced
182
- // internally in rdkafka so maybe we should introduce our own logic to only commit
183
- // after N messages to reduce unnecessary back and forth on the Kafka connection,
184
- // or unnecessary disk writes inside Kafka (messages in Kafka are not committed
185
- // individually, rather what this call does is update the stored stream position
186
- // for the consumer group).
176
+ // FIXME(jplatte): Should we introduce logic to only commit every N messages to reduce
177
+ // back and forth on the Kafka connection / disk writes inside Kafka?
178
+ //
179
+ // Background: messages in Kafka are not committed individually, rather what this call
180
+ // does is update the stored stream position for the consumer group.
187
181
consumer. commit_message ( & msg, CommitMode :: Async ) ?;
188
182
}
189
183
}
0 commit comments