Skip to content

Commit e996765

Browse files
authored
Make multi subscriber optional (#185)
* make optional * fmt
1 parent e2d8530 commit e996765

File tree

1 file changed

+45
-6
lines changed

1 file changed

+45
-6
lines changed

pubsub/src/subscription.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,23 @@ pub struct SubscriptionConfigToUpdate {
7979
pub retry_policy: Option<RetryPolicy>,
8080
}
8181

82+
#[derive(Debug, Clone, Default)]
83+
pub struct SubscribeConfig {
84+
enable_multiple_subscriber: bool,
85+
subscriber_config: SubscriberConfig,
86+
}
87+
88+
impl SubscribeConfig {
89+
pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
90+
self.enable_multiple_subscriber = v;
91+
self
92+
}
93+
pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
94+
self.subscriber_config = v;
95+
self
96+
}
97+
}
98+
8299
#[derive(Debug, Clone)]
83100
pub struct ReceiveConfig {
84101
pub worker_count: usize,
@@ -353,13 +370,25 @@ impl Subscription {
353370
/// Ok(())
354371
/// }
355372
/// ```
356-
pub async fn subscribe(&self, opt: Option<SubscriberConfig>) -> Result<MessageStream, Status> {
373+
pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
357374
let (tx, rx) = async_channel::unbounded::<ReceivedMessage>();
358375
let cancel = CancellationToken::new();
359376

360377
// spawn a separate subscriber task for each connection in the pool
361-
for _ in 0..self.pool_size() {
362-
Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), tx.clone(), opt.clone());
378+
let opt = opt.unwrap_or_default();
379+
let subscribers = if opt.enable_multiple_subscriber {
380+
self.pool_size()
381+
} else {
382+
1
383+
};
384+
for _ in 0..subscribers {
385+
Subscriber::start(
386+
cancel.clone(),
387+
self.fqsn.clone(),
388+
self.subc.clone(),
389+
tx.clone(),
390+
Some(opt.subscriber_config.clone()),
391+
);
363392
}
364393

365394
Ok(MessageStream { queue: rx, cancel })
@@ -584,7 +613,7 @@ mod tests {
584613
use crate::apiv1::publisher_client::PublisherClient;
585614
use crate::apiv1::subscriber_client::SubscriberClient;
586615
use crate::subscriber::ReceivedMessage;
587-
use crate::subscription::{SeekTo, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate};
616+
use crate::subscription::{SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate};
588617

589618
const PROJECT_NAME: &str = "local-project";
590619
const EMULATOR: &str = "localhost:8681";
@@ -966,12 +995,22 @@ mod tests {
966995

967996
#[tokio::test]
968997
#[serial]
969-
async fn test_subscribe() {
998+
async fn test_subscribe_single_subscriber() {
999+
test_subscribe(None).await
1000+
}
1001+
1002+
#[tokio::test]
1003+
#[serial]
1004+
async fn test_subscribe_multiple_subscriber() {
1005+
test_subscribe(Some(SubscribeConfig::default().with_enable_multiple_subscriber(true))).await
1006+
}
1007+
1008+
async fn test_subscribe(opt: Option<SubscribeConfig>) {
9701009
let subscription = create_subscription(false).await;
9711010
let received = Arc::new(Mutex::new(false));
9721011
let checking = received.clone();
9731012
let _handler = tokio::spawn(async move {
974-
let mut iter = subscription.subscribe(None).await.unwrap();
1013+
let mut iter = subscription.subscribe(opt).await.unwrap();
9751014
while let Some(message) = iter.next().await {
9761015
*received.lock().unwrap() = true;
9771016
let _ = message.ack().await;

0 commit comments

Comments
 (0)