Skip to content

Commit 2b7a4b3

Browse files
authored
bridge: Upgrade omniqueue and related dependencies (#1326)
2 parents b8a39cb + 4ed3a0f commit 2b7a4b3

File tree

4 files changed

+57
-59
lines changed

4 files changed

+57
-59
lines changed

bridge/Cargo.lock

Lines changed: 19 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bridge/svix-bridge-plugin-queue/Cargo.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
omniqueue = "0.2.0"
109
serde_json = "1.0"
1110
serde = { version = "1.0", features = ["derive"] }
1211
svix-bridge-types = { path = "../svix-bridge-types" }
@@ -15,13 +14,19 @@ tokio-executor-trait = "2.1"
1514
tokio-reactor-trait = "1.1"
1615
tracing = "0.1"
1716

17+
[dependencies.omniqueue]
18+
git = "https://github.com/svix/omniqueue-rs"
19+
rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b"
20+
default-features = false
21+
features = ["gcp_pubsub", "rabbitmq", "redis", "sqs"]
22+
1823
[dev-dependencies]
1924
aws-config = "1.1.5"
2025
aws-sdk-sqs = "1.13.0"
2126
fastrand = "2.0.1"
2227
google-cloud-googleapis = "0.12.0"
23-
google-cloud-pubsub = "0.23.0"
28+
google-cloud-pubsub = "0.24.0"
2429
lapin = "2"
25-
redis = { version = "0.24.0", features = ["tokio-comp", "streams"] }
30+
redis = { version = "0.25.4", features = ["tokio-comp", "streams"] }
2631
tracing-subscriber = "0.3"
2732
wiremock = "0.5.18"

bridge/svix-bridge-plugin-queue/src/redis/mod.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
4242
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
4343
let delayed_lock_key = format!("{delayed_queue_key}_lock");
4444

45-
backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
46-
backends::RedisConfig {
47-
dsn: cfg.dsn.clone(),
48-
max_connections: cfg.max_connections,
49-
reinsert_on_nack: cfg.reinsert_on_nack,
50-
queue_key: cfg.queue_key.clone(),
51-
delayed_queue_key,
52-
delayed_lock_key,
53-
consumer_group: cfg.consumer_group.clone(),
54-
consumer_name: cfg.consumer_name.clone(),
55-
// FIXME: expose in config?
56-
payload_key: "payload".to_string(),
57-
ack_deadline_ms: cfg.ack_deadline_ms,
58-
},
59-
)
45+
backends::RedisBackend::builder(backends::RedisConfig {
46+
dsn: cfg.dsn.clone(),
47+
max_connections: cfg.max_connections,
48+
reinsert_on_nack: cfg.reinsert_on_nack,
49+
queue_key: cfg.queue_key.clone(),
50+
delayed_queue_key,
51+
delayed_lock_key,
52+
consumer_group: cfg.consumer_group.clone(),
53+
consumer_name: cfg.consumer_name.clone(),
54+
// FIXME: expose in config?
55+
payload_key: "payload".to_string(),
56+
ack_deadline_ms: cfg.ack_deadline_ms,
57+
})
6058
.make_dynamic()
6159
.build_consumer()
6260
.await
@@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
6967
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
7068
let delayed_lock_key = format!("{delayed_queue_key}_lock");
7169

72-
backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
73-
backends::RedisConfig {
74-
dsn: cfg.dsn.clone(),
75-
max_connections: cfg.max_connections,
76-
queue_key: cfg.queue_key.clone(),
77-
delayed_queue_key,
78-
delayed_lock_key,
79-
// FIXME: expose in config?
80-
payload_key: "payload".to_string(),
81-
// consumer stuff we don't care about.
82-
reinsert_on_nack: false,
83-
consumer_group: String::new(),
84-
consumer_name: String::new(),
85-
ack_deadline_ms: cfg.ack_deadline_ms,
86-
},
87-
)
70+
backends::RedisBackend::builder(backends::RedisConfig {
71+
dsn: cfg.dsn.clone(),
72+
max_connections: cfg.max_connections,
73+
queue_key: cfg.queue_key.clone(),
74+
delayed_queue_key,
75+
delayed_lock_key,
76+
// FIXME: expose in config?
77+
payload_key: "payload".to_string(),
78+
// consumer stuff we don't care about.
79+
reinsert_on_nack: false,
80+
consumer_group: String::new(),
81+
consumer_name: String::new(),
82+
ack_deadline_ms: cfg.ack_deadline_ms,
83+
})
8884
.make_dynamic()
8985
.build_producer()
9086
.await

bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async fn create_test_stream(client: &Client) -> String {
5959
.take(8)
6060
.collect();
6161

62-
let mut conn = client.get_async_connection().await.unwrap();
62+
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
6363

6464
let _: () = conn
6565
.xgroup_create_mkstream(&name, "test_cg", 0i8)
@@ -70,12 +70,12 @@ async fn create_test_stream(client: &Client) -> String {
7070
}
7171

7272
async fn delete_test_stream(client: &Client, key: &str) {
73-
let mut conn = client.get_async_connection().await.unwrap();
73+
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
7474
let _: () = conn.del(key).await.unwrap();
7575
}
7676

7777
async fn publish(client: &Client, key: &str, payload: &str) {
78-
let mut conn = client.get_async_connection().await.unwrap();
78+
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
7979
// N.b. the redis code relies on the messages being json with a `payload` key in there.
8080
// The `payload` key can be any valid JSON value.
8181
let _: () = conn.xadd(key, "*", &[("payload", payload)]).await.unwrap();

0 commit comments

Comments
 (0)