Skip to content

Commit 19e850a

Browse files
authored
bridge: Upgrade dependencies (#1482)
## Motivation After getting a rustsec warning in #1469, I attempted this upgrade last week, but it failed due to a weird version constraint in opentelemetry (`tokio ~= 1.38.0`). The rustsec ended up being updated so I could merge the other PR w/o these upgrades, but I feel like it still makes sense to get this merged now rather than later. ## Solution Upgrade most of our dependencies, including axum 0.6 > 0.7.
2 parents eda2708 + 7687fdc commit 19e850a

File tree

9 files changed

+258
-291
lines changed

9 files changed

+258
-291
lines changed

bridge/Cargo.lock

Lines changed: 212 additions & 246 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bridge/svix-bridge-plugin-queue/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@ tracing = "0.1"
1616

1717
[dependencies.omniqueue]
1818
git = "https://github.com/svix/omniqueue-rs"
19-
rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b"
19+
rev = "e953ce07621a33708a4c28d9a5cfe431ede45dee"
2020
default-features = false
2121
features = ["gcp_pubsub", "rabbitmq", "redis", "sqs"]
2222

2323
[dev-dependencies]
2424
aws-config = "1.1.5"
2525
aws-sdk-sqs = "1.13.0"
2626
fastrand = "2.0.1"
27-
google-cloud-googleapis = "0.12.0"
28-
google-cloud-pubsub = "0.24.0"
27+
google-cloud-googleapis = "0.15.0"
28+
google-cloud-pubsub = "0.29.1"
2929
lapin = "2"
30-
redis = { version = "0.25.4", features = ["tokio-comp", "streams"] }
30+
redis = { version = "0.27.2", features = ["tokio-comp", "streams"] }
3131
tracing-subscriber.workspace = true
3232
wiremock.workspace = true
3333

bridge/svix-bridge-plugin-queue/src/redis/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
5454
// FIXME: expose in config?
5555
payload_key: "payload".to_string(),
5656
ack_deadline_ms: cfg.ack_deadline_ms,
57+
dlq_config: None,
58+
sentinel_config: None,
5759
})
5860
.make_dynamic()
5961
.build_consumer()
@@ -80,6 +82,8 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
8082
consumer_group: String::new(),
8183
consumer_name: String::new(),
8284
ack_deadline_ms: cfg.ack_deadline_ms,
85+
dlq_config: None,
86+
sentinel_config: None,
8387
})
8488
.make_dynamic()
8589
.build_producer()

bridge/svix-bridge/Cargo.toml

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,38 @@ publish = false
88
anyhow = "1"
99
base64 = "0.13.1"
1010
clap = { version = "4.2.4", features = ["env", "derive"] }
11-
axum = { version = "0.6", features = ["macros"] }
11+
axum = { version = "0.7.7", features = ["macros"] }
1212
enum_dispatch = "0.3"
13-
itertools = "0.12.1"
14-
http = "0.2"
13+
itertools = "0.13.0"
1514
once_cell = "1.18.0"
16-
opentelemetry = "0.22.0"
17-
opentelemetry_sdk = { version = "0.22.1", features = ["metrics", "rt-tokio"] }
18-
opentelemetry-otlp = { version = "0.15.0", features = ["metrics", "grpc-tonic", "http-proto", "reqwest-client"] }
15+
opentelemetry = "0.26.0"
16+
opentelemetry_sdk = { version = "0.26.0", features = ["metrics", "rt-tokio"] }
17+
opentelemetry-otlp = { version = "0.26.0", features = ["metrics", "grpc-tonic", "http-proto", "reqwest-client"] }
1918
serde.workspace = true
2019
serde_json.workspace = true
2120
serde_yaml = "0.9"
22-
svix-ksuid = "0.7.0"
21+
svix-ksuid = "0.8.0"
2322
svix-bridge-plugin-queue = { path = "../svix-bridge-plugin-queue" }
2423
svix-bridge-plugin-kafka = { optional = true, path = "../svix-bridge-plugin-kafka" }
2524
svix-bridge-types.workspace = true
2625
tokio.workspace = true
2726
tracing.workspace = true
28-
tracing-opentelemetry = "0.23.0"
27+
tracing-opentelemetry = "0.27.0"
2928
tracing-subscriber = { workspace = true, features = ["fmt", "json"] }
3029
# N.b. for newer deno versions (like this) the runtimes must be retained and reused since they will leak memory if you
3130
# create/drop them.
3231
deno_core = "0.308.0"
3332
deno_ast = "0.42.1"
34-
deadpool = { version = "0.9.5", features = ["unmanaged", "rt_tokio_1"] }
33+
deadpool = { version = "0.12.1", features = ["unmanaged", "rt_tokio_1"] }
3534
shellexpand = { version = "3.1.0", default-features = false, features = ["base-0"] }
3635

3736
[target.'cfg(not(target_env = "msvc"))'.dependencies]
38-
tikv-jemallocator = { version = "0.5", optional = true }
39-
tikv-jemalloc-ctl = { version = "0.5", optional = true, features = ["use_std"] }
37+
tikv-jemallocator = { version = "0.6.0", optional = true }
38+
tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["use_std", "stats"] }
4039

4140
[dev-dependencies]
4241
chrono = "0.4"
43-
tower = "0.4"
42+
tower = "0.5.1"
4443

4544
[features]
4645
default = ["kafka", "jemalloc"]

bridge/svix-bridge/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
use clap::Parser;
88
use itertools::{Either, Itertools};
99
use once_cell::sync::Lazy;
10+
use opentelemetry::trace::TracerProvider as _;
1011
use opentelemetry_otlp::WithExportConfig;
1112
use opentelemetry_sdk::{
1213
metrics::{data::Temporality, reader::TemporalitySelector, InstrumentKind, SdkMeterProvider},
@@ -89,7 +90,7 @@ fn setup_tracing(cfg: &Config) {
8990
.tracing()
9091
.with_exporter(exporter)
9192
.with_trace_config(
92-
opentelemetry_sdk::trace::config()
93+
opentelemetry_sdk::trace::Config::default()
9394
.with_sampler(
9495
otel_cfg
9596
.sample_ratio
@@ -99,7 +100,8 @@ fn setup_tracing(cfg: &Config) {
99100
.with_resource(get_svc_identifiers(cfg)),
100101
)
101102
.install_batch(Tokio)
102-
.unwrap();
103+
.unwrap()
104+
.tracer("svix_bridge");
103105

104106
tracing_opentelemetry::layer().with_tracer(tracer)
105107
});

bridge/svix-bridge/src/webhook_receiver/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::{net::SocketAddr, sync::Arc, time::Duration};
22

33
use axum::{
4-
body::Body,
54
extract::{Path, State},
5+
http,
66
routing::post,
77
Router,
88
};
@@ -26,7 +26,7 @@ mod config;
2626
mod types;
2727
mod verification;
2828

29-
fn router() -> Router<InternalState, Body> {
29+
fn router() -> Router<InternalState> {
3030
Router::new()
3131
.route(
3232
"/webhook/:integration_id",
@@ -50,8 +50,8 @@ pub async fn run(
5050
let router = router().with_state(state);
5151

5252
tracing::info!("Listening on: {listen_addr}");
53-
axum::Server::bind(&listen_addr)
54-
.serve(router.into_make_service())
53+
let listener = tokio::net::TcpListener::bind(listen_addr).await.unwrap();
54+
axum::serve(listener, router)
5555
.await
5656
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
5757
}

bridge/svix-bridge/src/webhook_receiver/tests.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async fn test_forwarding_no_verification() {
6666
.uri("/webhook/a")
6767
.method("POST")
6868
.header("content-type", "application/json")
69-
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
69+
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
7070
.unwrap(),
7171
)
7272
.await
@@ -110,7 +110,7 @@ async fn test_forwarding_multiple_receivers() {
110110
.uri("/webhook/a")
111111
.method("POST")
112112
.header("content-type", "application/json")
113-
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
113+
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
114114
.unwrap();
115115

116116
let response = ServiceExt::<Request<Body>>::ready(&mut app)
@@ -128,7 +128,7 @@ async fn test_forwarding_multiple_receivers() {
128128
.uri("/webhook/b")
129129
.method("POST")
130130
.header("content-type", "application/json")
131-
.body(serde_json::to_vec(&json!({"b": true})).unwrap().into())
131+
.body(axum::body::Body::from(json!({ "b": true }).to_string()))
132132
.unwrap();
133133

134134
let response = ServiceExt::<Request<Body>>::ready(&mut app)
@@ -200,7 +200,7 @@ async fn test_transformation_json() {
200200
.uri("/webhook/transformed")
201201
.method("POST")
202202
.header("content-type", "application/json")
203-
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
203+
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
204204
.unwrap();
205205

206206
let response = ServiceExt::<Request<Body>>::ready(&mut app)
@@ -222,7 +222,7 @@ async fn test_transformation_json() {
222222
.uri("/webhook/as-is")
223223
.method("POST")
224224
.header("content-type", "application/json")
225-
.body(serde_json::to_vec(&json!({"b": true})).unwrap().into())
225+
.body(axum::body::Body::from(json!({ "b": true }).to_string()))
226226
.unwrap();
227227

228228
let response = ServiceExt::<Request<Body>>::ready(&mut app)
@@ -280,7 +280,7 @@ async fn test_transformation_string() {
280280
.uri("/webhook/transformed")
281281
.method("POST")
282282
.header("content-type", "text/plain")
283-
.body("plain text".as_bytes().into())
283+
.body(axum::body::Body::from("plain text"))
284284
.unwrap();
285285

286286
let response = ServiceExt::<Request<Body>>::ready(&mut app)
@@ -337,7 +337,7 @@ async fn test_forwarding_svix_verification_mismatch() {
337337
.header("svix-id", "msg_valid")
338338
.header("svix-signature", signature.clone())
339339
.header("svix-timestamp", &format!("{timestamp}"))
340-
.body(sent_payload_bytes.into())
340+
.body(axum::body::Body::from(sent_payload_bytes))
341341
.unwrap(),
342342
)
343343
.await
@@ -383,13 +383,13 @@ async fn test_forwarding_svix_verification_match() {
383383
.header("content-type", "application/json")
384384
.header("svix-id", "msg_valid")
385385
.header("svix-signature", signature.clone())
386-
.header("svix-timestamp", &format!("{timestamp}"))
387-
.body(payload_bytes.into())
386+
.header("svix-timestamp", timestamp.to_string())
387+
.body(axum::body::Body::from(payload_bytes))
388388
.unwrap(),
389389
)
390390
.await
391391
.unwrap();
392392
assert_eq!(response.status(), StatusCode::NO_CONTENT);
393393
let forwarded = a_rx.try_recv().unwrap();
394-
assert_eq!(json!(forwarded), json!({"a": true}));
394+
assert_eq!(json!(forwarded), json!({ "a": true }));
395395
}

bridge/svix-bridge/src/webhook_receiver/types.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc};
33
use anyhow::Result;
44
use axum::{
55
async_trait,
6-
body::{Bytes, HttpBody},
7-
extract::FromRequest,
8-
BoxError,
6+
body::Bytes,
7+
extract::{FromRequest, Request},
8+
http::{self, HeaderMap, HeaderValue},
99
};
10-
use http::{HeaderMap, HeaderValue, Request};
1110
use serde::{Deserialize, Serialize};
1211
use svix_bridge_types::{
1312
svix, ReceiverInputOpts, ReceiverOutput, TransformationConfig, TransformerTx, WebhookVerifier,
@@ -212,16 +211,13 @@ impl From<RequestFromParts> for SerializableRequest<Unvalidated> {
212211
}
213212

214213
#[async_trait]
215-
impl<S, B> FromRequest<S, B> for SerializableRequest<Unvalidated>
214+
impl<S> FromRequest<S> for SerializableRequest<Unvalidated>
216215
where
217216
S: Send + Sync,
218-
B: HttpBody + Send + Sync + 'static,
219-
B::Data: Send,
220-
B::Error: Into<BoxError>,
221217
{
222-
type Rejection = <RequestFromParts as FromRequest<S, B>>::Rejection;
218+
type Rejection = <RequestFromParts as FromRequest<S>>::Rejection;
223219

224-
async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
220+
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
225221
RequestFromParts::from_request(req, state)
226222
.await
227223
.map(Into::into)

bridge/svix-bridge/src/webhook_receiver/verification.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub enum Verifier {
8585
mod tests {
8686
use std::sync::Arc;
8787

88-
use axum::extract::FromRequest;
88+
use axum::{extract::FromRequest, http};
8989
use svix_bridge_types::svix::webhooks::Webhook;
9090

9191
use super::{super::types::SerializableRequest, SvixVerifier, VerificationMethod};
@@ -107,7 +107,7 @@ mod tests {
107107
.header("svix-id", "msg_valid")
108108
.header("svix-signature", signature.clone())
109109
.header("svix-timestamp", &format!("{timestamp}"))
110-
.body(axum::body::Full::new(payload))
110+
.body(axum::body::Body::from(payload))
111111
.unwrap();
112112

113113
let sr = SerializableRequest::from_request(req, &()).await.unwrap();
@@ -119,7 +119,7 @@ mod tests {
119119
.header("svix-id", "msg_invalid")
120120
.header("svix-signature", signature)
121121
.header("svix-timestamp", &format!("{timestamp}"))
122-
.body(axum::body::Full::new(payload))
122+
.body(axum::body::Body::from(payload))
123123
.unwrap();
124124

125125
let sr = SerializableRequest::from_request(req, &()).await.unwrap();

0 commit comments

Comments
 (0)