Skip to content

Commit 7604bc5

Browse files
committed
Refactor chunk size configuration
1 parent d26d4de commit 7604bc5

File tree

4 files changed

+38
-26
lines changed

4 files changed

+38
-26
lines changed

src/connectors/kafka/config.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,28 @@ impl ConsumerConfig {
640640
pub fn topics(&self) -> Vec<&str> {
641641
self.topics.iter().map(|t| t.as_str()).collect()
642642
}
643+
644+
pub fn buffer_config(&self) -> BufferConfig {
645+
BufferConfig {
646+
buffer_size: self.buffer_size,
647+
buffer_timeout: self.buffer_timeout,
648+
}
649+
}
650+
}
651+
652+
#[derive(Clone, Debug)]
653+
pub struct BufferConfig {
654+
pub buffer_size: usize,
655+
pub buffer_timeout: Duration,
656+
}
657+
658+
impl Default for BufferConfig {
659+
fn default() -> Self {
660+
Self {
661+
buffer_size: 10000,
662+
buffer_timeout: Duration::from_millis(10000),
663+
}
664+
}
643665
}
644666

645667
impl ProducerConfig {
@@ -874,7 +896,7 @@ impl Default for ConsumerConfig {
874896
topics: vec![],
875897
group_id: "parseable-connect-cg".to_string(),
876898
buffer_size: 10_000,
877-
buffer_timeout: Duration::from_millis(5000),
899+
buffer_timeout: Duration::from_millis(10000),
878900
group_instance_id: "parseable-cg-ii".to_string(),
879901
// NOTE: cooperative-sticky does not work well in rdkafka when using manual commit.
880902
// @see https://github.com/confluentinc/librdkafka/issues/4629

src/connectors/kafka/processor.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
use crate::connectors::common::processor::Processor;
20+
use crate::connectors::kafka::config::BufferConfig;
2021
use crate::connectors::kafka::{ConsumerRecord, StreamConsumer, TopicPartition};
2122
use crate::event::format;
2223
use crate::event::format::EventFormat;
@@ -30,7 +31,6 @@ use rdkafka::consumer::{CommitMode, Consumer};
3031
use serde_json::Value;
3132
use std::collections::HashMap;
3233
use std::sync::Arc;
33-
use std::time::Duration;
3434
use tokio_stream::wrappers::ReceiverStream;
3535
use tracing::{debug, error, warn};
3636

@@ -110,25 +110,25 @@ where
110110
{
111111
processor: Arc<P>,
112112
consumer: Arc<StreamConsumer>,
113-
buffer_size: usize,
114-
buffer_timeout: Duration,
113+
buffer_config: BufferConfig,
115114
}
116115

117116
impl<P> StreamWorker<P>
118117
where
119118
P: Processor<Vec<ConsumerRecord>, ()> + Send + Sync + 'static,
120119
{
121-
pub fn new(
122-
processor: Arc<P>,
123-
consumer: Arc<StreamConsumer>,
124-
buffer_size: usize,
125-
buffer_timeout: Duration,
126-
) -> Self {
120+
pub fn new(processor: Arc<P>, consumer: Arc<StreamConsumer>) -> Self {
121+
let buffer_config = consumer
122+
.context()
123+
.config()
124+
.consumer()
125+
.expect("Consumer config is missing")
126+
.buffer_config();
127+
127128
Self {
128129
processor,
129130
consumer,
130-
buffer_size,
131-
buffer_timeout,
131+
buffer_config,
132132
}
133133
}
134134

@@ -139,8 +139,8 @@ where
139139
) -> anyhow::Result<()> {
140140
let chunked_stream = tokio_stream::StreamExt::chunks_timeout(
141141
record_stream,
142-
self.buffer_size,
143-
self.buffer_timeout,
142+
self.buffer_config.buffer_size,
143+
self.buffer_config.buffer_timeout,
144144
);
145145

146146
chunked_stream

src/connectors/kafka/sink.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::connectors::kafka::ConsumerRecord;
2323
use anyhow::Result;
2424
use futures_util::StreamExt;
2525
use std::sync::Arc;
26-
use tokio::time::Duration;
2726
use tracing::{error, info};
2827

2928
pub struct KafkaSinkConnector<P>
@@ -38,17 +37,10 @@ impl<P> KafkaSinkConnector<P>
3837
where
3938
P: Processor<Vec<ConsumerRecord>, ()> + Send + Sync + 'static,
4039
{
41-
pub fn new(
42-
kafka_streams: KafkaStreams,
43-
processor: P,
44-
buffer_size: usize,
45-
buffer_timeout: Duration,
46-
) -> Self {
40+
pub fn new(kafka_streams: KafkaStreams, processor: P) -> Self {
4741
let worker = Arc::new(StreamWorker::new(
4842
Arc::new(processor),
4943
kafka_streams.consumer(),
50-
buffer_size,
51-
buffer_timeout,
5244
));
5345

5446
Self {

src/connectors/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use crate::option::{Mode, CONFIG};
3131
use actix_web_prometheus::PrometheusMetrics;
3232
use prometheus::Registry;
3333
use std::sync::Arc;
34-
use std::time::Duration;
3534
use tokio::sync::RwLock;
3635
use tracing::{info, warn};
3736

@@ -93,8 +92,7 @@ where
9392
let stats = kafka_streams.statistics();
9493
registry.register(Box::new(KafkaMetricsCollector::new(stats)?))?;
9594

96-
let kafka_parseable_sink_connector =
97-
KafkaSinkConnector::new(kafka_streams, processor, 10000, Duration::from_millis(5000));
95+
let kafka_parseable_sink_connector = KafkaSinkConnector::new(kafka_streams, processor);
9896

9997
rebalance_listener.start();
10098
kafka_parseable_sink_connector.run().await?;

0 commit comments

Comments
 (0)