Skip to content

Commit 2d84554

Browse files
committed
fix: make query_as macros retry
1 parent 865550c commit 2d84554

File tree

32 files changed

+177
-57
lines changed

32 files changed

+177
-57
lines changed

packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
4949
WHERE
5050
workflow_id = ANY($1)
5151
",
52-
workflow_ids,
52+
&workflow_ids,
5353
)
5454
.await?;
5555

@@ -493,7 +493,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
493493
FROM db_workflow.tagged_signals
494494
WHERE signal_id = ANY($1)
495495
",
496-
signal_ids,
496+
&signal_ids,
497497
)
498498
.await?;
499499

packages/common/pools/src/utils/sql_query_macros.rs

Lines changed: 141 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -203,18 +203,58 @@ macro_rules! __sql_query {
203203
};
204204
([$ctx:expr, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => {
205205
async {
206-
let query = sqlx::query($crate::__opt_indoc!($sql))
207-
$(
208-
.bind($bind)
209-
)*;
210-
211206
// Execute query
212207
$crate::__sql_query_metrics_acquire!(_acquire);
213208
$crate::__sql_query_metrics_start!($ctx, execute, _acquire, _start);
214-
let res = query.execute(&mut **$tx).await.map_err(Into::<GlobalError>::into);
209+
210+
let mut backoff = $crate::__rivet_util::Backoff::new(
211+
4,
212+
None,
213+
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
214+
50
215+
);
216+
let mut i = 0;
217+
218+
// Retry loop
219+
let res = loop {
220+
let query = sqlx::query($crate::__opt_indoc!($sql))
221+
$(
222+
.bind($bind)
223+
)*;
224+
225+
match query.execute(&mut **$tx).await {
226+
Err(err) => {
227+
i += 1;
228+
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
229+
break Err(
230+
sqlx::Error::Io(
231+
std::io::Error::new(
232+
std::io::ErrorKind::Other,
233+
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
234+
)
235+
)
236+
);
237+
}
238+
239+
use sqlx::Error::*;
240+
match &err {
241+
// Retry other errors with a backoff
242+
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
243+
| WorkerCrashed => {
244+
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
245+
backoff.tick().await;
246+
}
247+
// Throw error
248+
_ => break Err(err),
249+
}
250+
}
251+
x => break x,
252+
}
253+
};
254+
215255
$crate::__sql_query_metrics_finish!($ctx, execute, _start);
216256

217-
res
257+
res.map_err(Into::<GlobalError>::into)
218258
}
219259
.instrument(tracing::info_span!("sql_query"))
220260
};
@@ -229,39 +269,119 @@ macro_rules! __sql_query_as {
229269
async {
230270
use sqlx::Acquire;
231271

232-
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
233-
$(
234-
.bind($bind)
235-
)*;
236-
237272
// Acquire connection
238273
$crate::__sql_query_metrics_acquire!(_acquire);
239274
let driver = $driver;
240275
let mut conn = $crate::__sql_acquire!($ctx, driver);
241276

242277
// Execute query
243278
$crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start);
244-
let res = query.$action(&mut *conn).await.map_err(Into::<GlobalError>::into);
279+
280+
let mut backoff = $crate::__rivet_util::Backoff::new(
281+
4,
282+
None,
283+
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
284+
50
285+
);
286+
let mut i = 0;
287+
288+
// Retry loop
289+
let res = loop {
290+
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
291+
$(
292+
.bind($bind)
293+
)*;
294+
295+
match query.$action(&mut *conn).await {
296+
Err(err) => {
297+
i += 1;
298+
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
299+
break Err(
300+
sqlx::Error::Io(
301+
std::io::Error::new(
302+
std::io::ErrorKind::Other,
303+
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
304+
)
305+
)
306+
);
307+
}
308+
309+
use sqlx::Error::*;
310+
match &err {
311+
// Retry other errors with a backoff
312+
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
313+
| WorkerCrashed => {
314+
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
315+
backoff.tick().await;
316+
}
317+
// Throw error
318+
_ => break Err(err),
319+
}
320+
}
321+
x => break x,
322+
}
323+
};
324+
245325
$crate::__sql_query_metrics_finish!($ctx, $action, _start);
246326

247-
res
327+
res.map_err(Into::<GlobalError>::into)
248328
}
249329
.instrument(tracing::info_span!("sql_query_as"))
250330
};
251331
([$ctx:expr, $rv:ty, $action:ident, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => {
252332
async {
253-
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
254-
$(
255-
.bind($bind)
256-
)*;
257-
258333
// Execute query
259334
$crate::__sql_query_metrics_acquire!(_acquire);
260335
$crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start);
261-
let res = query.$action(&mut **$tx).await.map_err(Into::<GlobalError>::into);
336+
337+
let mut backoff = $crate::__rivet_util::Backoff::new(
338+
4,
339+
None,
340+
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
341+
50
342+
);
343+
let mut i = 0;
344+
345+
// Retry loop
346+
let res = loop {
347+
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
348+
$(
349+
.bind($bind)
350+
)*;
351+
352+
match query.$action(&mut **$tx).await {
353+
Err(err) => {
354+
i += 1;
355+
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
356+
break Err(
357+
sqlx::Error::Io(
358+
std::io::Error::new(
359+
std::io::ErrorKind::Other,
360+
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
361+
)
362+
)
363+
);
364+
}
365+
366+
use sqlx::Error::*;
367+
match &err {
368+
// Retry other errors with a backoff
369+
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
370+
| WorkerCrashed => {
371+
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
372+
backoff.tick().await;
373+
}
374+
// Throw error
375+
_ => break Err(err),
376+
}
377+
}
378+
x => break x,
379+
}
380+
};
381+
262382
$crate::__sql_query_metrics_finish!($ctx, $action, _start);
263383

264-
res
384+
res.map_err(Into::<GlobalError>::into)
265385
}
266386
.instrument(tracing::info_span!("sql_query_as"))
267387
};

packages/core/services/build/ops/get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async fn handle(ctx: OperationContext<build::get::Request>) -> GlobalResult<buil
4343
WHERE
4444
build_id = ANY($1)
4545
",
46-
build_ids,
46+
&build_ids,
4747
)
4848
.await?
4949
.into_iter()

packages/core/services/build/src/ops/patch_tags.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub async fn patch_tags(ctx: &OperationCtx, input: &Input) -> GlobalResult<Outpu
133133
WHERE b.build_id = f2.build_id
134134
",
135135
build_id,
136-
exclusive_tags_json,
136+
&exclusive_tags_json,
137137
)
138138
.await?;
139139
}
@@ -147,7 +147,7 @@ pub async fn patch_tags(ctx: &OperationCtx, input: &Input) -> GlobalResult<Outpu
147147
WHERE build_id = $1
148148
",
149149
build_id,
150-
tags_json,
150+
&tags_json,
151151
)
152152
.await?;
153153

packages/core/services/cdn/ops/site-get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async fn handle(
2727
FROM db_cdn.sites
2828
WHERE site_id = ANY($1)
2929
",
30-
site_ids,
30+
&site_ids,
3131
)
3232
.await?
3333
.into_iter()

packages/core/services/cf-custom-hostname/ops/get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async fn handle(
3636
FROM db_cf_custom_hostname.custom_hostnames
3737
WHERE identifier = ANY($1)
3838
",
39-
identifiers,
39+
&identifiers,
4040
)
4141
.await?;
4242

packages/core/services/cloud/ops/game-config-get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn handle(
3232
FROM db_cloud.game_configs
3333
WHERE game_id = ANY($1)
3434
",
35-
game_ids,
35+
&game_ids,
3636
)
3737
.await?
3838
.into_iter()

packages/core/services/cloud/ops/namespace-get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn handle(
2424
FROM db_cloud.game_namespaces
2525
WHERE namespace_id = ANY($1)
2626
",
27-
namespace_ids,
27+
&namespace_ids,
2828
)
2929
.await?;
3030

packages/core/services/cloud/ops/version-get/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn handle(
2424
FROM db_cloud.game_versions
2525
WHERE version_id = ANY($1)
2626
",
27-
req_version_ids,
27+
&req_version_ids,
2828
)
2929
.await?;
3030

packages/core/services/cloud/worker/src/workers/version_name_reserve.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn worker(
3232
RETURNING version_display_name;
3333
"#,
3434
game_id,
35-
date_prefix,
35+
&date_prefix,
3636
ctx.ts(),
3737
)
3838
.await?;

0 commit comments

Comments
 (0)