Skip to content

Commit 7552a27

Browse files
authored
ref(kafka): Refactor kafka consumer (#66)
1 parent 9aef5a2 commit 7552a27

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

src/consumer/kafka.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub async fn start_consumer(
6161
.expect("Can't subscribe to specified topics");
6262

6363
handle_os_signals(event_sender.clone());
64-
handle_consumer_client(consumer.clone(), client_shutdown_receiver);
64+
poll_consumer_client(consumer.clone(), client_shutdown_receiver);
6565
handle_events(
6666
consumer,
6767
event_receiver,
@@ -81,8 +81,8 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
8181
});
8282
}
8383

84-
#[instrument(skip(consumer, shutdown))]
85-
pub fn handle_consumer_client(
84+
#[instrument(skip_all)]
85+
pub fn poll_consumer_client(
8686
consumer: Arc<StreamConsumer<KafkaContext>>,
8787
shutdown: oneshot::Receiver<()>,
8888
) {
@@ -117,7 +117,7 @@ impl KafkaContext {
117117
impl ClientContext for KafkaContext {}
118118

119119
impl ConsumerContext for KafkaContext {
120-
#[instrument(skip(self, rebalance))]
120+
#[instrument(skip_all)]
121121
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
122122
let (rendezvous_sender, rendezvous_receiver) = sync_channel(0);
123123
match rebalance {
@@ -314,7 +314,7 @@ enum ConsumerState {
314314
Stopped,
315315
}
316316

317-
#[instrument(skip(consumer, events, shutdown_client, spawn_actors))]
317+
#[instrument(skip_all)]
318318
pub async fn handle_events(
319319
consumer: Arc<StreamConsumer<KafkaContext>>,
320320
events: UnboundedReceiver<(Event, SyncSender<()>)>,
@@ -412,7 +412,7 @@ impl MessageQueue for StreamPartitionQueue<KafkaContext> {
412412
}
413413
}
414414

415-
#[instrument(skip(queue, transform, ok, err, shutdown))]
415+
#[instrument(skip_all)]
416416
pub async fn map<T>(
417417
queue: impl MessageQueue,
418418
transform: impl Fn(Arc<OwnedMessage>) -> Result<T, Error>,
@@ -521,7 +521,7 @@ async fn handle_reducer_failure<T>(
521521
reducer.reset();
522522
}
523523

524-
#[instrument(skip(reducer, inflight_msgs, ok, err))]
524+
#[instrument(skip_all)]
525525
async fn flush_reducer<T, U>(
526526
reducer: &mut impl Reducer<Input = T, Output = U>,
527527
inflight_msgs: &mut Vec<OwnedMessage>,
@@ -544,7 +544,7 @@ async fn flush_reducer<T, U>(
544544
Ok(())
545545
}
546546

547-
#[instrument(skip(reducer, receiver, ok, err, shutdown))]
547+
#[instrument(skip_all)]
548548
pub async fn reduce<T, U>(
549549
mut reducer: impl Reducer<Input = T, Output = U>,
550550
mut receiver: mpsc::Receiver<(impl IntoIterator<Item = OwnedMessage>, T)>,
@@ -554,8 +554,8 @@ pub async fn reduce<T, U>(
554554
) -> Result<(), Error> {
555555
let config = reducer.get_reduce_config();
556556
let mut flush_timer = config.flush_interval.map(time::interval);
557-
let mut loop_timer = time::interval(Duration::from_secs(1));
558-
loop_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
557+
let mut repoll_timer = time::interval(Duration::from_secs(1));
558+
repoll_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
559559
let mut inflight_msgs = Vec::new();
560560

561561
loop {
@@ -629,15 +629,15 @@ pub async fn reduce<T, U>(
629629
}
630630
}
631631

632-
_ = loop_timer.tick() => { }
632+
_ = repoll_timer.tick() => { }
633633
}
634634
}
635635

636636
debug!("Shutdown complete");
637637
Ok(())
638638
}
639639

640-
#[instrument(skip(reducer, receiver, ok, shutdown))]
640+
#[instrument(skip_all)]
641641
pub async fn reduce_err(
642642
mut reducer: impl Reducer<Input = OwnedMessage, Output = ()>,
643643
mut receiver: mpsc::Receiver<OwnedMessage>,
@@ -769,7 +769,7 @@ impl From<HighwaterMark> for TopicPartitionList {
769769
}
770770
}
771771

772-
#[instrument(skip(receiver, consumer, _rendezvous_guard))]
772+
#[instrument(skip_all)]
773773
pub async fn commit(
774774
mut receiver: mpsc::Receiver<(Vec<OwnedMessage>, ())>,
775775
consumer: Arc<impl CommitClient>,

0 commit comments

Comments
 (0)