Skip to content

Commit c3e10f8

Browse files
committed
Automatically retry on transaction failure for Neo4j.
1 parent 88b6ee4 commit c3e10f8

File tree

6 files changed

+173
-13
lines changed

6 files changed

+173
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,4 @@ tokio-stream = "0.1.17"
102102
async-stream = "0.3.6"
103103
neo4rs = "0.8.0"
104104
bytes = "1.10.1"
105+
rand = "0.9.0"

src/base/value.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,23 @@ impl From<KeyValue> for Value {
533533
}
534534
}
535535

536+
impl From<&KeyValue> for Value {
537+
fn from(value: &KeyValue) -> Self {
538+
match value {
539+
KeyValue::Bytes(v) => Value::Basic(BasicValue::Bytes(v.clone())),
540+
KeyValue::Str(v) => Value::Basic(BasicValue::Str(v.clone())),
541+
KeyValue::Bool(v) => Value::Basic(BasicValue::Bool(*v)),
542+
KeyValue::Int64(v) => Value::Basic(BasicValue::Int64(*v)),
543+
KeyValue::Range(v) => Value::Basic(BasicValue::Range(*v)),
544+
KeyValue::Uuid(v) => Value::Basic(BasicValue::Uuid(*v)),
545+
KeyValue::Date(v) => Value::Basic(BasicValue::Date(*v)),
546+
KeyValue::Struct(v) => Value::Struct(FieldValues {
547+
fields: v.iter().map(Value::from).collect(),
548+
}),
549+
}
550+
}
551+
}
552+
536553
impl From<FieldValues> for Value {
537554
fn from(value: FieldValues) -> Self {
538555
Value::Struct(value)

src/ops/storages/neo4j.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ impl GraphRelationship {
9292
}
9393
}
9494

95+
impl retriable::IsRetryable for neo4rs::Error {
96+
fn is_retryable(&self) -> bool {
97+
match self {
98+
neo4rs::Error::ConnectionError => true,
99+
neo4rs::Error::Neo4j(e) => e.kind() == neo4rs::Neo4jErrorKind::Transient,
100+
_ => false,
101+
}
102+
}
103+
}
104+
95105
#[derive(Default)]
96106
pub struct GraphPool {
97107
graphs: Mutex<HashMap<GraphKey, Arc<OnceCell<Arc<Graph>>>>>,
@@ -172,7 +182,7 @@ fn json_value_to_bolt_value(value: &serde_json::Value) -> Result<BoltType> {
172182
Ok(bolt_value)
173183
}
174184

175-
fn key_to_bolt(key: KeyValue, schema: &schema::ValueType) -> Result<BoltType> {
185+
fn key_to_bolt(key: &KeyValue, schema: &schema::ValueType) -> Result<BoltType> {
176186
value_to_bolt(&key.into(), schema)
177187
}
178188

@@ -362,18 +372,18 @@ FINISH
362372
tgt_fields,
363373
})
364374
}
365-
}
366375

367-
#[async_trait]
368-
impl ExportTargetExecutor for RelationshipStorageExecutor {
369-
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> {
376+
fn build_queries_to_apply_mutation(
377+
&self,
378+
mutation: &ExportTargetMutation,
379+
) -> Result<Vec<neo4rs::Query>> {
370380
let mut queries = vec![];
371-
for upsert in mutation.upserts {
372-
let rel_id_bolt = key_to_bolt(upsert.key, &self.key_field.value_type.typ)?;
381+
for upsert in mutation.upserts.iter() {
382+
let rel_id_bolt = key_to_bolt(&upsert.key, &self.key_field.value_type.typ)?;
373383
queries
374384
.push(neo4rs::query(&self.delete_cypher).param(REL_ID_PARAM, rel_id_bolt.clone()));
375385

376-
let value = upsert.value;
386+
let value = &upsert.value;
377387
let mut insert_cypher = neo4rs::query(&self.insert_cypher)
378388
.param(REL_ID_PARAM, rel_id_bolt)
379389
.param(
@@ -425,17 +435,31 @@ impl ExportTargetExecutor for RelationshipStorageExecutor {
425435
}
426436
queries.push(insert_cypher);
427437
}
428-
for delete_key in mutation.delete_keys {
438+
for delete_key in mutation.delete_keys.iter() {
429439
queries.push(neo4rs::query(&self.delete_cypher).param(
430440
REL_ID_PARAM,
431441
key_to_bolt(delete_key, &self.key_field.value_type.typ)?,
432442
));
433443
}
444+
Ok(queries)
445+
}
446+
}
434447

435-
let mut txn = self.graph.start_txn().await?;
436-
txn.run_queries(queries).await?;
437-
txn.commit().await?;
438-
Ok(())
448+
#[async_trait]
449+
impl ExportTargetExecutor for RelationshipStorageExecutor {
450+
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> {
451+
retriable::run(
452+
|| async {
453+
let queries = self.build_queries_to_apply_mutation(&mutation)?;
454+
let mut txn = self.graph.start_txn().await?;
455+
txn.run_queries(queries.clone()).await?;
456+
txn.commit().await?;
457+
retriable::Ok(())
458+
},
459+
retriable::RunOptions::default(),
460+
)
461+
.await
462+
.map_err(Into::<anyhow::Error>::into)
439463
}
440464
}
441465

src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, L
2424
pub(crate) use crate::ops::interface;
2525
pub(crate) use crate::service::error::ApiError;
2626
pub(crate) use crate::setup::AuthRegistry;
27+
pub(crate) use crate::utils::retriable;
2728

2829
pub(crate) use crate::{api_bail, api_error};
2930

src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod db;
22
pub mod fingerprint;
33
pub mod immutable;
4+
pub mod retriable;
45
pub mod yaml_ser;

src/utils/retriable.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use log::trace;
2+
use std::{future::Future, time::Duration};
3+
4+
pub trait IsRetryable {
5+
fn is_retryable(&self) -> bool;
6+
}
7+
8+
pub struct Error {
9+
error: anyhow::Error,
10+
is_retryable: bool,
11+
}
12+
13+
impl std::fmt::Display for Error {
14+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15+
std::fmt::Display::fmt(&self.error, f)
16+
}
17+
}
18+
19+
impl std::fmt::Debug for Error {
20+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21+
std::fmt::Debug::fmt(&self.error, f)
22+
}
23+
}
24+
25+
impl IsRetryable for Error {
26+
fn is_retryable(&self) -> bool {
27+
self.is_retryable
28+
}
29+
}
30+
31+
impl From<anyhow::Error> for Error {
32+
fn from(error: anyhow::Error) -> Self {
33+
Self {
34+
error,
35+
is_retryable: false,
36+
}
37+
}
38+
}
39+
40+
impl Into<anyhow::Error> for Error {
41+
fn into(self) -> anyhow::Error {
42+
self.error
43+
}
44+
}
45+
46+
impl<E: IsRetryable + std::error::Error + Send + Sync + 'static> From<E> for Error {
47+
fn from(error: E) -> Self {
48+
Self {
49+
is_retryable: error.is_retryable(),
50+
error: anyhow::Error::new(error),
51+
}
52+
}
53+
}
54+
55+
pub type Result<T, E = Error> = std::result::Result<T, E>;
56+
57+
#[allow(non_snake_case)]
58+
pub fn Ok<T>(value: T) -> Result<T> {
59+
Result::Ok(value)
60+
}
61+
62+
pub struct RunOptions {
63+
pub max_retries: usize,
64+
pub initial_backoff: Duration,
65+
pub max_backoff: Duration,
66+
}
67+
68+
impl Default for RunOptions {
69+
fn default() -> Self {
70+
Self {
71+
max_retries: 5,
72+
initial_backoff: Duration::from_millis(100),
73+
max_backoff: Duration::from_secs(10),
74+
}
75+
}
76+
}
77+
78+
pub async fn run<
79+
Ok,
80+
Err: std::fmt::Display + IsRetryable,
81+
Fut: Future<Output = Result<Ok, Err>>,
82+
F: Fn() -> Fut,
83+
>(
84+
f: F,
85+
options: RunOptions,
86+
) -> Result<Ok, Err> {
87+
let mut retries = 0;
88+
let mut backoff = options.initial_backoff;
89+
90+
loop {
91+
match f().await {
92+
Result::Ok(result) => return Result::Ok(result),
93+
Result::Err(err) => {
94+
if !err.is_retryable() || retries >= options.max_retries {
95+
return Result::Err(err);
96+
}
97+
retries += 1;
98+
trace!(
99+
"Will retry #{} in {}ms for error: {}",
100+
retries,
101+
backoff.as_millis(),
102+
err
103+
);
104+
tokio::time::sleep(backoff).await;
105+
if backoff < options.max_backoff {
106+
backoff = std::cmp::min(
107+
Duration::from_micros(
108+
(backoff.as_micros() * rand::random_range(1618..=2000) / 1000) as u64,
109+
),
110+
options.max_backoff,
111+
);
112+
}
113+
}
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)