Skip to content

Commit 24c7160

Browse files
authored
[pubsub] Remove cancellation token / Avoid silent stop (#394)
1 parent adbe294 commit 24c7160

File tree

7 files changed

+741
-1124
lines changed

7 files changed

+741
-1124
lines changed

pubsub/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,21 @@ tokio = "1.32"
2020
async-channel = "1.9"
2121
async-stream = "0.3"
2222
thiserror = "1.0"
23-
tokio-util = "0.7"
2423

2524
token-source = "1.0"
2625
google-cloud-gax = { package = "gcloud-gax", version = "1.3.0", path = "../foundation/gax" }
2726
google-cloud-googleapis = { package = "gcloud-googleapis", version = "1.3.0", path = "../googleapis", features = ["pubsub"]}
28-
2927
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.0", path="../foundation/auth", default-features=false }
3028

3129
[dev-dependencies]
3230
tokio = { version="1.32", features=["rt-multi-thread"] }
3331
rand = "0.8.5"
34-
tracing-subscriber = "0.3"
32+
tracing-subscriber = { version="0.3", features=["env-filter"] }
3533
serial_test = "3.1"
3634
uuid = { version="1.4", features=["v4"] }
3735
ctor = "0.1.26"
3836
futures-util = "0.3"
37+
tokio-util = "0.7"
3938

4039
[features]
4140
default = ["auth", "default-tls"]

pubsub/README.md

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -153,87 +153,30 @@ async fn run(config: ClientConfig) -> Result<(), Status> {
153153

154154
// Token for cancel.
155155
let cancel = CancellationToken::new();
156-
let cancel2 = cancel.clone();
156+
let cancel_for_task = cancel.clone();
157157
tokio::spawn(async move {
158158
// Cancel after 10 seconds.
159159
tokio::time::sleep(Duration::from_secs(10)).await;
160-
cancel2.cancel();
160+
cancel_for_task.cancel();
161161
});
162162

163-
// Receive blocks until the ctx is cancelled or an error occurs.
164-
// Or simply use the `subscription.subscribe` method.
165-
subscription.receive(|mut message, cancel| async move {
166-
// Handle data.
167-
println!("Got Message: {:?}", message.message.data);
168-
169-
// Ack or Nack message.
163+
// Start receiving messages from the subscription.
164+
let mut iter = subscription.subscribe(None).await?;
165+
// Get buffered messages.
166+
// To close safely, use a CancellationToken or to signal shutdown.
167+
while let Some(message) = tokio::select!{
168+
v = iter.next() => v,
169+
_ = ctx.cancelled() => None,
170+
}.await {
170171
let _ = message.ack().await;
171-
}, cancel.clone(), None).await?;
172-
172+
}
173+
// Wait for all the unprocessed messages to be Nack.
174+
// If you don't call dispose, the unprocessed messages will be Nack when the iterator is dropped.
175+
iter.dispose().await;
176+
173177
// Delete subscription if needed.
174178
subscription.delete(None).await?;
175179

176180
Ok(())
177181
}
178-
```
179-
180-
### Subscribe Message (Alternative Way)
181-
182-
After canceling, wait until all pulled messages are processed.
183-
```rust
184-
use std::time::Duration;
185-
use futures_util::StreamExt;
186-
use google_cloud_pubsub::client::{Client, ClientConfig};
187-
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
188-
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
189-
use google_cloud_gax::grpc::Status;
190-
191-
async fn run(config: ClientConfig) -> Result<(), Status> {
192-
// Creating Client, Topic and Subscription...
193-
let client = Client::new(config).await.unwrap();
194-
let subscription = client.subscription("test-subscription");
195-
196-
// Read the messages as a stream
197-
let mut stream = subscription.subscribe(None).await.unwrap();
198-
let cancellable = stream.cancellable();
199-
let task = tokio::spawn(async move {
200-
// None if the stream is cancelled
201-
while let Some(message) = stream.next().await {
202-
message.ack().await.unwrap();
203-
}
204-
});
205-
tokio::time::sleep(Duration::from_secs(60)).await;
206-
cancellable.cancel();
207-
let _ = task.await;
208-
Ok(())
209-
}
210-
```
211-
212-
Unprocessed messages are nack after cancellation.
213-
```rust
214-
use std::time::Duration;
215-
use google_cloud_pubsub::client::{Client, ClientConfig};
216-
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
217-
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
218-
use google_cloud_gax::grpc::Status;
219-
220-
async fn run(config: ClientConfig) -> Result<(), Status> {
221-
// Creating Client, Topic and Subscription...
222-
let client = Client::new(config).await.unwrap();
223-
let subscription = client.subscription("test-subscription");
224-
225-
// Read the messages as a stream
226-
let mut stream = subscription.subscribe(None).await.unwrap();
227-
let cancellable = stream.cancellable();
228-
let task = tokio::spawn(async move {
229-
// None if the stream is cancelled
230-
while let Some(message) = stream.read().await {
231-
message.ack().await.unwrap();
232-
}
233-
});
234-
tokio::time::sleep(Duration::from_secs(60)).await;
235-
cancellable.cancel();
236-
let _ = task.await;
237-
Ok(())
238-
}
239182
```

0 commit comments

Comments
 (0)