Skip to content

Commit 99bd070

Browse files
authored
Merge pull request #731 from MaterializeInc/clonable-threaded-producer
make `ThreadedProducer` clonable like `BaseProducer`
2 parents c6d9a65 + dbeca93 commit 99bd070

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

src/producer/base_producer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -639,13 +639,14 @@ where
639639
/// queued events, such as delivery notifications. The thread will be
640640
/// automatically stopped when the producer is dropped.
641641
#[must_use = "The threaded producer will stop immediately if unused"]
642+
#[derive(Clone)]
642643
pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>
643644
where
644645
C: ProducerContext<Part> + 'static,
645646
{
646647
producer: Arc<BaseProducer<C, Part>>,
647648
should_stop: Arc<AtomicBool>,
648-
handle: Option<JoinHandle<()>>,
649+
handle: Option<Arc<JoinHandle<()>>>,
649650
}
650651

651652
impl FromClientConfig for ThreadedProducer<DefaultProducerContext, NoCustomPartitioner> {
@@ -687,7 +688,7 @@ where
687688
Ok(ThreadedProducer {
688689
producer,
689690
should_stop,
690-
handle: Some(thread),
691+
handle: Some(Arc::new(thread)),
691692
})
692693
}
693694
}
@@ -778,7 +779,7 @@ where
778779
{
779780
fn drop(&mut self) {
780781
trace!("Destroy ThreadedProducer");
781-
if let Some(handle) = self.handle.take() {
782+
if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
782783
trace!("Stopping polling");
783784
self.should_stop.store(true, Ordering::Relaxed);
784785
trace!("Waiting for polling thread termination");

0 commit comments

Comments
 (0)