Skip to content

Commit ef978d3

Browse files
authored
Bump dependencies / Refactor pubsub (#398)
1 parent 61b4ccf commit ef978d3

File tree

14 files changed

+175
-247
lines changed

14 files changed

+175
-247
lines changed

artifact-registry/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-artifact-registry"
3-
version = "1.3.0"
3+
version = "1.3.1"
44
edition = "2021"
55
authors = ["yoshidan <naohiro.y@gmail.com>"]
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/artifact-registry"
@@ -15,9 +15,9 @@ doctest = false
1515

1616
[dependencies]
1717
token-source = "1.0"
18-
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.0", path="../foundation/auth", default-features=false }
18+
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false }
1919
google-cloud-googleapis = { package = "gcloud-googleapis", version="1.3.0", path = "../googleapis", features=["artifact-registry"]}
20-
google-cloud-gax = { package = "gcloud-gax", version = "1.3.0", path = "../foundation/gax"}
20+
google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax"}
2121
google-cloud-longrunning = { package = "gcloud-longrunning", version = "1.3.0", path = "../foundation/longrunning" }
2222
tracing = "0.1"
2323
prost-types = "0.14"
@@ -26,7 +26,7 @@ prost-types = "0.14"
2626
tokio = { version="1.32", features=["rt-multi-thread"] }
2727
serial_test = "3.1"
2828
tracing-subscriber = { version="0.3.17", features=["env-filter"]}
29-
ctor = "0.1"
29+
ctor = "0.5"
3030
google-cloud-auth = { package = "gcloud-auth", path = "../foundation/auth", default-features=false }
3131

3232
[features]

bigquery/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-bigquery"
3-
version = "1.4.0"
3+
version = "1.4.1"
44
edition = "2021"
55
authors = ["yoshidan <naohiro.y@gmail.com>"]
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/bigquery"
@@ -17,15 +17,15 @@ doctest = false
1717
async-trait = "0.1"
1818
token-source = "1.0"
1919
google-cloud-googleapis = { package = "gcloud-googleapis", version="1.3.0", path = "../googleapis", features=["bigquery"]}
20-
google-cloud-gax = { package = "gcloud-gax", version = "1.3.0", path = "../foundation/gax"}
21-
thiserror = "1.0"
20+
google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax"}
21+
thiserror = "2.0"
2222
tracing = "0.1"
2323
reqwest = { version = "0.12.4", features = ["json", "stream", "multipart", "charset"], default-features = false }
2424
serde = { version = "1.0", features = ["derive"] }
2525
serde_json = "1.0"
2626
tokio = { version="1.32", features=["macros"] }
2727
time = { version = "0.3", features = ["std", "macros", "formatting", "parsing", "serde"] }
28-
arrow = { version = "54.2.1", default-features = false, features = ["ipc"] }
28+
arrow = { version = "56.1.0", default-features = false, features = ["ipc"] }
2929
base64 = "0.22"
3030
bigdecimal = { version="0.4", features=["serde"] }
3131
num-bigint = "0.4"
@@ -35,13 +35,13 @@ anyhow = "1.0"
3535
async-stream = "0.3"
3636
prost-types = "0.14"
3737

38-
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.0", path="../foundation/auth", default-features=false }
38+
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false }
3939

4040
[dev-dependencies]
4141
tokio = { version="1.32", features=["rt-multi-thread"] }
4242
serial_test = "3.1"
4343
tracing-subscriber = { version="0.3.17", features=["env-filter"] }
44-
ctor = "0.1.26"
44+
ctor = "0.5"
4545
google-cloud-auth = { package = "gcloud-auth", path = "../foundation/auth", default-features=false }
4646
base64-serde = "0.8"
4747
prost = "0.14"

foundation/auth/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-auth"
3-
version = "1.1.1"
3+
version = "1.1.2"
44
authors = ["yoshidan <naohiro.y@gmail.com>"]
55
edition = "2021"
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/foundation/auth"
@@ -18,12 +18,12 @@ reqwest = { version = "0.12.4", features = ["json", "charset"], default-features
1818
serde = { version = "1.0", features = ["derive"] }
1919
serde_json = { version = "1.0" }
2020
jsonwebtoken = { version = "9.2.0" }
21-
thiserror = "1.0"
21+
thiserror = "2.0"
2222
async-trait = "0.1"
2323
home = "0.5"
2424
urlencoding = "2.1"
2525
tokio = { version = "1.32", features = ["fs"] }
26-
google-cloud-metadata = { package = "gcloud-metadata", version = "1.0.0", path = "../metadata" }
26+
google-cloud-metadata = { package = "gcloud-metadata", version = "1.0.1", path = "../metadata" }
2727
token-source = "1.0"
2828
base64 = "0.22"
2929
time = "0.3"
@@ -38,7 +38,7 @@ hex = { version = "0.4", optional = true }
3838
[dev-dependencies]
3939
tokio = { version = "1.32", features = ["test-util", "rt-multi-thread", "macros"] }
4040
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
41-
ctor = "0.1"
41+
ctor = "0.5"
4242
tempfile = "3.8.0"
4343
temp-env = { version = "0.3.6", features = ["async_closure"] }
4444

foundation/gax/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-gax"
3-
version = "1.3.0"
3+
version = "1.3.1"
44
authors = ["yoshidan <naohiro.y@gmail.com>"]
55
edition = "2018"
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/foundation/gax"
@@ -16,8 +16,9 @@ doctest = false
1616
tracing = "0.1"
1717
tokio = { version = "1.32", features = ["macros"] }
1818
tonic = { version = "0.14", default-features = false, features = ["tls-webpki-roots"] }
19-
thiserror = "1.0"
20-
tower = { version = "0.4", features = ["filter", "util"] }
19+
thiserror = "2.0"
20+
tower = { version = "0.5", features = ["filter", "util"] }
2121
http = "1.1"
2222
token-source = "1.0"
23-
tokio-retry2 = "0.5.3"
23+
tokio-retry2 = "0.6"
24+

foundation/gax/src/conn.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,26 @@ use http::{HeaderValue, Request};
1010
use tonic::body::Body;
1111
use tonic::transport::{Channel as TonicChannel, ClientTlsConfig, Endpoint};
1212
use tonic::{Code, Status};
13-
use tower::filter::{AsyncFilter, AsyncFilterLayer, AsyncPredicate};
14-
use tower::util::Either;
13+
use tower::filter::{AsyncFilter, AsyncPredicate};
1514
use tower::{BoxError, ServiceBuilder};
1615

1716
use token_source::{TokenSource, TokenSourceProvider};
1817

19-
pub type Channel = Either<AsyncFilter<TonicChannel, AsyncAuthInterceptor>, TonicChannel>;
18+
pub type Channel = AsyncFilter<TonicChannel, AsyncAuthInterceptor>;
2019

2120
#[derive(Clone, Debug)]
2221
pub struct AsyncAuthInterceptor {
23-
token_source: Arc<dyn TokenSource>,
22+
token_source: Option<Arc<dyn TokenSource>>,
2423
}
2524

2625
impl AsyncAuthInterceptor {
2726
fn new(token_source: Arc<dyn TokenSource>) -> Self {
28-
Self { token_source }
27+
Self {
28+
token_source: Some(token_source),
29+
}
30+
}
31+
fn empty() -> Self {
32+
Self { token_source: None }
2933
}
3034
}
3135

@@ -34,7 +38,10 @@ impl AsyncPredicate<Request<Body>> for AsyncAuthInterceptor {
3438
type Request = Request<Body>;
3539

3640
fn check(&mut self, request: Request<Body>) -> Self::Future {
37-
let ts = self.token_source.clone();
41+
let ts = match &self.token_source {
42+
Some(ts) => ts.clone(),
43+
None => return Box::pin(async move { Ok(request) }),
44+
};
3845
Box::pin(async move {
3946
let token = ts
4047
.token()
@@ -154,8 +161,8 @@ impl<'a> ConnectionManager {
154161

155162
let con = Self::connect(endpoint).await?;
156163
// use GCP token per call
157-
let auth_layer = Some(AsyncFilterLayer::new(AsyncAuthInterceptor::new(Arc::clone(&ts))));
158-
let auth_con = ServiceBuilder::new().option_layer(auth_layer).service(con);
164+
let auth_filter = AsyncAuthInterceptor::new(Arc::clone(&ts));
165+
let auth_con = ServiceBuilder::new().filter_async(auth_filter).service(con);
159166
conns.push(auth_con);
160167
}
161168
Ok(conns)
@@ -171,11 +178,9 @@ impl<'a> ConnectionManager {
171178
let endpoint = conn_options.apply(endpoint);
172179

173180
let con = Self::connect(endpoint).await?;
174-
conns.push(
175-
ServiceBuilder::new()
176-
.option_layer::<AsyncFilterLayer<AsyncAuthInterceptor>>(None)
177-
.service(con),
178-
);
181+
let auth_filter = AsyncAuthInterceptor::empty();
182+
let auth_con = ServiceBuilder::new().filter_async(auth_filter).service(con);
183+
conns.push(auth_con);
179184
Ok(conns)
180185
}
181186

foundation/metadata/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-metadata"
3-
version = "1.0.0"
3+
version = "1.0.1"
44
authors = ["yoshidan <naohiro.y@gmail.com>"]
55
edition = "2021"
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/foundation/metadata"
@@ -16,7 +16,7 @@ doctest = false
1616
tokio = { version = "1.32", features = ["sync", "net", "parking_lot"] }
1717
# this crate uses http only
1818
reqwest = { version = "0.12.4" , default-features = false }
19-
thiserror = "1.0"
19+
thiserror = "2.0"
2020

2121
[dev-dependencies]
2222
tokio = { version = "1.32", features = ["test-util", "rt-multi-thread", "macros"]}

kms/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-kms"
3-
version = "1.3.0"
3+
version = "1.3.1"
44
edition = "2021"
55
authors = ["yoshidan <naohiro.y@gmail.com>"]
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/kms"
@@ -15,16 +15,16 @@ doctest = false
1515

1616
[dependencies]
1717
token-source = "1.0"
18-
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.0", path="../foundation/auth", default-features=false }
18+
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false }
1919
google-cloud-googleapis = { package = "gcloud-googleapis", version="1.3.0", path = "../googleapis", features=["kms"]}
20-
google-cloud-gax = { package = "gcloud-gax", version = "1.3.0", path = "../foundation/gax"}
20+
google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax"}
2121
tracing = "0.1"
2222

2323
[dev-dependencies]
2424
tokio = { version="1.32", features=["rt-multi-thread"] }
2525
serial_test = "3.1"
2626
tracing-subscriber = { version="0.3.17", features=["env-filter"]}
27-
ctor = "0.1"
27+
ctor = "0.5"
2828
google-cloud-auth = { package = "gcloud-auth", path = "../foundation/auth", default-features=false }
2929

3030
[features]

pubsub/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gcloud-pubsub"
3-
version = "1.4.1"
3+
version = "1.5.0"
44
authors = ["yoshidan <naohiro.y@gmail.com>"]
55
edition = "2021"
66
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub"
@@ -17,22 +17,22 @@ doctest = false
1717
tracing = "0.1"
1818
prost-types = "0.14"
1919
tokio = "1.32"
20-
async-channel = "1.9"
20+
async-channel = "2.5"
2121
async-stream = "0.3"
22-
thiserror = "1.0"
22+
thiserror = "2.0"
2323

2424
token-source = "1.0"
25-
google-cloud-gax = { package = "gcloud-gax", version = "1.3.0", path = "../foundation/gax" }
25+
google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax" }
2626
google-cloud-googleapis = { package = "gcloud-googleapis", version = "1.3.0", path = "../googleapis", features = ["pubsub"]}
27-
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.0", path="../foundation/auth", default-features=false }
27+
google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false }
2828

2929
[dev-dependencies]
3030
tokio = { version="1.32", features=["rt-multi-thread"] }
31-
rand = "0.8.5"
31+
rand = "0.9.2"
3232
tracing-subscriber = { version="0.3", features=["env-filter"] }
3333
serial_test = "3.1"
3434
uuid = { version="1.4", features=["v4"] }
35-
ctor = "0.1.26"
35+
ctor = "0.5"
3636
futures-util = "0.3"
3737
tokio-util = "0.7"
3838

pubsub/src/apiv1/subscriber_client.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::sync::Arc;
2-
31
use google_cloud_gax::conn::Channel;
42
use google_cloud_gax::create_request;
53
use google_cloud_gax::grpc::Status;
@@ -13,6 +11,8 @@ use google_cloud_googleapis::pubsub::v1::{
1311
SeekRequest, SeekResponse, Snapshot, StreamingPullRequest, StreamingPullResponse, Subscription,
1412
UpdateSnapshotRequest, UpdateSubscriptionRequest,
1513
};
14+
use std::sync::Arc;
15+
use std::time::Duration;
1616

1717
use crate::apiv1::conn_pool::ConnectionManager;
1818
use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
@@ -242,20 +242,17 @@ impl SubscriberClient {
242242
pub async fn streaming_pull(
243243
&self,
244244
req: StreamingPullRequest,
245-
ping_receiver: async_channel::Receiver<bool>,
245+
interval: Duration,
246246
retry: Option<RetrySetting>,
247247
) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
248248
let action = || async {
249249
let mut client = self.client_for_streaming_pull();
250250
let base_req = req.clone();
251-
let rx = ping_receiver.clone();
252251
let request = Box::pin(async_stream::stream! {
253252
yield base_req.clone();
254-
255-
// ping message.
256-
// must be empty request
257-
while let Ok(_r) = rx.recv().await {
258-
yield create_empty_streaming_pull_request();
253+
loop {
254+
tokio::time::sleep(interval).await;
255+
yield create_empty_streaming_pull_request();
259256
}
260257
});
261258
let mut v = request.into_streaming_request();

pubsub/src/publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ impl Tasks {
244244
//TODO enable manage task by ordering_key
245245
let mut bundle = MessageBundle::new();
246246
while !receiver.is_closed() {
247-
let result = match timeout(flush_interval, &mut receiver.recv()).await {
247+
let result = match timeout(flush_interval, receiver.recv()).await {
248248
Ok(result) => result,
249249
//timed out
250250
Err(_e) => {

0 commit comments

Comments
 (0)