Skip to content

Commit 76d9e35

Browse files
jaymellsvix-james
authored andcommitted
Refactor queue module to use omniqueue
This matches downstream changes and should make the integration of DLQ support seamless.
1 parent 19e850a commit 76d9e35

File tree

12 files changed

+176
-89
lines changed

12 files changed

+176
-89
lines changed

server/svix-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ urlencoding = "2.1.2"
6868
form_urlencoded = "1.1.0"
6969
lapin = "2.1.1"
7070
sentry = { version = "0.32.2", features = ["tracing"] }
71-
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] }
71+
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel", "beta"] }
7272
# Not a well-known author, and no longer gets updates => pinned.
7373
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
7474
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }

server/svix-server/config.default.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,6 @@ worker_max_tasks = 500
120120
# Whether or not to disable TLS certificate validation on Webhook dispatch. This is a dangerous flag
121121
# to set true. This value will default to false.
122122
# dangerous_disable_tls_verification = false
123+
124+
# Maximum seconds of queue long-poll
125+
queue_max_poll_secs = 20

server/svix-server/src/cfg.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ pub struct ConfigurationInner {
184184
/// Maximum number of concurrent worker tasks to spawn (0 is unlimited)
185185
pub worker_max_tasks: u16,
186186

187+
/// Maximum seconds of a queue long-poll
188+
pub queue_max_poll_secs: u16,
189+
187190
/// The address of the rabbitmq exchange
188191
pub rabbit_dsn: Option<Arc<String>>,
189192
pub rabbit_consumer_prefetch_size: Option<u16>,

server/svix-server/src/queue/mod.rs

Lines changed: 110 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::{marker::PhantomData, num::NonZeroUsize, sync::Arc, time::Duration};
22

33
use omniqueue::{
4-
backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledProducer, QueueConsumer,
5-
ScheduledQueueProducer,
4+
backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer,
65
};
7-
use serde::{Deserialize, Serialize};
6+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
87

98
use crate::{
109
cfg::{Configuration, QueueBackend},
@@ -24,6 +23,8 @@ const RETRY_SCHEDULE: &[Duration] = &[
2423
Duration::from_millis(40),
2524
];
2625

26+
pub type TaskQueueDelivery = SvixOmniDelivery<QueueTask>;
27+
2728
fn should_retry(err: &Error) -> bool {
2829
matches!(err.typ, ErrorType::Queue(_))
2930
}
@@ -139,19 +140,34 @@ impl QueueTask {
139140
}
140141
}
141142

142-
#[derive(Clone)]
143-
pub struct TaskQueueProducer {
144-
inner: Arc<DynScheduledProducer>,
143+
pub type TaskQueueProducer = SvixOmniProducer<QueueTask>;
144+
pub type TaskQueueConsumer = SvixOmniConsumer<QueueTask>;
145+
146+
pub struct SvixOmniProducer<T: OmniMessage> {
147+
inner: Arc<omniqueue::DynScheduledProducer>,
148+
_phantom: PhantomData<T>,
149+
}
150+
151+
// Manual impl to avoid adding 'Clone' bound on T
152+
impl<T: OmniMessage> Clone for SvixOmniProducer<T> {
153+
fn clone(&self) -> Self {
154+
Self {
155+
inner: self.inner.clone(),
156+
_phantom: PhantomData,
157+
}
158+
}
145159
}
146160

147-
impl TaskQueueProducer {
148-
pub fn new(inner: impl ScheduledQueueProducer + 'static) -> Self {
161+
impl<T: OmniMessage> SvixOmniProducer<T> {
162+
pub(super) fn new(inner: impl ScheduledQueueProducer + 'static) -> Self {
149163
Self {
150164
inner: Arc::new(inner.into_dyn_scheduled()),
165+
_phantom: PhantomData,
151166
}
152167
}
153168

154-
pub async fn send(&self, task: QueueTask, delay: Option<Duration>) -> Result<()> {
169+
#[tracing::instrument(skip_all, name = "queue_send")]
170+
pub async fn send(&self, task: &T, delay: Option<Duration>) -> Result<()> {
155171
let task = Arc::new(task);
156172
run_with_retries(
157173
|| async {
@@ -169,57 +185,99 @@ impl TaskQueueProducer {
169185
)
170186
.await
171187
}
188+
189+
#[tracing::instrument(skip_all, name = "redrive_dlq")]
190+
pub async fn redrive_dlq(&self) -> Result<()> {
191+
self.inner.redrive_dlq().await.map_err(Into::into)
192+
}
172193
}
173194

174-
pub struct TaskQueueConsumer {
195+
pub struct SvixOmniConsumer<T: OmniMessage> {
175196
inner: DynConsumer,
197+
_phantom: PhantomData<T>,
198+
}
199+
200+
pub trait OmniMessage: Serialize + DeserializeOwned + Send + Sync {
201+
fn task_id(&self) -> Option<&str>;
176202
}
177203

178-
impl TaskQueueConsumer {
179-
pub fn new(inner: impl QueueConsumer + 'static) -> Self {
204+
impl OmniMessage for QueueTask {
205+
fn task_id(&self) -> Option<&str> {
206+
self.msg_id()
207+
}
208+
}
209+
210+
impl<T: OmniMessage> SvixOmniConsumer<T> {
211+
pub(super) fn new(inner: impl QueueConsumer + 'static) -> Self {
180212
Self {
181213
inner: inner.into_dyn(),
214+
_phantom: PhantomData,
182215
}
183216
}
184217

185-
pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
186-
const MAX_MESSAGES: usize = 128;
187-
// FIXME(onelson): need to figure out what deadline/duration to use here
218+
#[tracing::instrument(skip_all, name = "queue_receive_all")]
219+
pub async fn receive_all(&mut self, deadline: Duration) -> Result<Vec<SvixOmniDelivery<T>>> {
220+
pub const MAX_MESSAGES: usize = 128;
188221
self.inner
189-
.receive_all(MAX_MESSAGES, Duration::from_secs(30))
222+
.receive_all(MAX_MESSAGES, deadline)
190223
.await
191-
.map_err(Into::into)
224+
.map_err(Error::from)
192225
.trace()?
193226
.into_iter()
194-
.map(TryInto::try_into)
227+
.map(|acker| {
228+
Ok(SvixOmniDelivery {
229+
task: Arc::new(
230+
acker
231+
.payload_serde_json()
232+
.map_err(|e| {
233+
Error::queue(format!("Failed to decode queue task: {e:?}"))
234+
})?
235+
.ok_or_else(|| Error::queue("Unexpected empty delivery"))?,
236+
),
237+
238+
acker,
239+
})
240+
})
195241
.collect()
196242
}
243+
244+
pub fn max_messages(&self) -> Option<NonZeroUsize> {
245+
self.inner.max_messages()
246+
}
197247
}
198248

199249
#[derive(Debug)]
200-
pub struct TaskQueueDelivery {
201-
pub task: Arc<QueueTask>,
202-
acker: Delivery,
250+
pub struct SvixOmniDelivery<T> {
251+
pub task: Arc<T>,
252+
pub(super) acker: Delivery,
203253
}
204254

205-
impl TaskQueueDelivery {
255+
impl<T: OmniMessage> SvixOmniDelivery<T> {
256+
pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
257+
Ok(self.acker.set_ack_deadline(duration).await?)
258+
}
206259
pub async fn ack(self) -> Result<()> {
207-
tracing::trace!(msg_id = self.task.msg_id(), "ack");
260+
tracing::trace!(
261+
task_id = self.task.task_id().map(tracing::field::display),
262+
"ack"
263+
);
208264

209265
let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
210266
let mut acker = Some(self.acker);
211267
loop {
212268
if let Some(result) = retry
213269
.run(|| async {
214-
let delivery = acker
215-
.take()
216-
.expect("acker is always Some when trying to ack");
217-
delivery.ack().await.map_err(|(e, delivery)| {
218-
// Put the delivery back in acker before retrying, to
219-
// satisfy the expect above.
220-
acker = Some(delivery);
221-
e.into()
222-
})
270+
match acker.take() {
271+
Some(delivery) => {
272+
delivery.ack().await.map_err(|(e, delivery)| {
273+
// Put the delivery back in acker before retrying, to
274+
// satisfy the expect above.
275+
acker = Some(delivery);
276+
e.into()
277+
})
278+
}
279+
None => unreachable!(),
280+
}
223281
})
224282
.await
225283
{
@@ -229,27 +287,31 @@ impl TaskQueueDelivery {
229287
}
230288

231289
pub async fn nack(self) -> Result<()> {
232-
tracing::trace!(msg_id = self.task.msg_id(), "nack");
290+
tracing::trace!(
291+
task_id = self.task.task_id().map(tracing::field::display),
292+
"nack"
293+
);
233294

234295
let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
235296
let mut acker = Some(self.acker);
236297
loop {
237298
if let Some(result) = retry
238299
.run(|| async {
239-
let delivery = acker
240-
.take()
241-
.expect("acker is always Some when trying to ack");
242-
243-
delivery
244-
.nack()
245-
.await
246-
.map_err(|(e, delivery)| {
247-
// Put the delivery back in acker before retrying, to
248-
// satisfy the expect above.
249-
acker = Some(delivery);
250-
e.into()
251-
})
252-
.trace()
300+
match acker.take() {
301+
Some(delivery) => {
302+
delivery
303+
.nack()
304+
.await
305+
.map_err(|(e, delivery)| {
306+
// Put the delivery back in acker before retrying, to
307+
// satisfy the expect above.
308+
acker = Some(delivery);
309+
Error::from(e)
310+
})
311+
.trace()
312+
}
313+
_ => unreachable!(),
314+
}
253315
})
254316
.await
255317
{
@@ -258,18 +320,3 @@ impl TaskQueueDelivery {
258320
}
259321
}
260322
}
261-
262-
impl TryFrom<Delivery> for TaskQueueDelivery {
263-
type Error = Error;
264-
fn try_from(value: Delivery) -> Result<Self> {
265-
Ok(TaskQueueDelivery {
266-
task: Arc::new(
267-
value
268-
.payload_serde_json()
269-
.map_err(|_| Error::queue("Failed to decode queue task"))?
270-
.ok_or_else(|| Error::queue("Unexpected empty delivery"))?,
271-
),
272-
acker: value,
273-
})
274-
}
275-
}

server/svix-server/src/queue/rabbitmq.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ mod tests {
186186
.await
187187
.unwrap();
188188

189-
producer.send(QueueTask::HealthCheck, None).await.unwrap();
189+
producer.send(&QueueTask::HealthCheck, None).await.unwrap();
190190
}
191191

192192
// Receive with lapin consumer

0 commit comments

Comments
 (0)