Skip to content

Commit 9a483fe

Browse files
committed
add reproduction of rollback after commit on serialization error
1 parent 0e703ba commit 9a483fe

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed

tests/transactions.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,125 @@ async fn concurrent_serializable_transactions_behave_correctly() {
104104
res.unwrap_err()
105105
);
106106
}
107+
108+
#[cfg(feature = "postgres")]
109+
#[tokio::test]
110+
async fn commit_with_serialization_failure_already_ends_transaction() {
111+
use diesel::prelude::*;
112+
use diesel_async::{AsyncConnection, RunQueryDsl};
113+
use std::sync::Arc;
114+
use tokio::sync::Barrier;
115+
116+
table! {
117+
users3 {
118+
id -> Integer,
119+
}
120+
}
121+
122+
// create an async connection
123+
let mut conn = super::connection_without_transaction().await;
124+
125+
struct A(Vec<&'static str>);
126+
impl diesel::connection::Instrumentation for A {
127+
fn on_connection_event(&mut self, event: diesel::connection::InstrumentationEvent<'_>) {
128+
if let diesel::connection::InstrumentationEvent::StartQuery { query, .. } = event {
129+
let q = query.to_string();
130+
let q = q.split_once(' ').map(|(a, _)| a).unwrap_or(&q);
131+
132+
if matches!(q, "BEGIN" | "COMMIT" | "ROLLBACK") {
133+
assert_eq!(q, self.0.pop().unwrap());
134+
}
135+
}
136+
}
137+
}
138+
conn.set_instrumentation(A(vec!["COMMIT", "BEGIN", "COMMIT", "BEGIN"]));
139+
140+
let mut conn1 = super::connection_without_transaction().await;
141+
142+
diesel::sql_query("CREATE TABLE IF NOT EXISTS users3 (id int);")
143+
.execute(&mut conn)
144+
.await
145+
.unwrap();
146+
147+
let barrier_1 = Arc::new(Barrier::new(2));
148+
let barrier_2 = Arc::new(Barrier::new(2));
149+
let barrier_1_for_tx1 = barrier_1.clone();
150+
let barrier_1_for_tx2 = barrier_1.clone();
151+
let barrier_2_for_tx1 = barrier_2.clone();
152+
let barrier_2_for_tx2 = barrier_2.clone();
153+
154+
let mut tx = conn.build_transaction().serializable().read_write();
155+
156+
let res = tx.run(|conn| {
157+
Box::pin(async {
158+
users3::table.select(users3::id).load::<i32>(conn).await?;
159+
160+
barrier_1_for_tx1.wait().await;
161+
diesel::insert_into(users3::table)
162+
.values(users3::id.eq(1))
163+
.execute(conn)
164+
.await?;
165+
barrier_2_for_tx1.wait().await;
166+
167+
Ok::<_, diesel::result::Error>(())
168+
})
169+
});
170+
171+
let mut tx1 = conn1.build_transaction().serializable().read_write();
172+
173+
let res1 = async {
174+
let res = tx1
175+
.run(|conn| {
176+
Box::pin(async {
177+
users3::table.select(users3::id).load::<i32>(conn).await?;
178+
179+
barrier_1_for_tx2.wait().await;
180+
diesel::insert_into(users3::table)
181+
.values(users3::id.eq(1))
182+
.execute(conn)
183+
.await?;
184+
185+
Ok::<_, diesel::result::Error>(())
186+
})
187+
})
188+
.await;
189+
barrier_2_for_tx2.wait().await;
190+
res
191+
};
192+
193+
let (res, res1) = tokio::join!(res, res1);
194+
let _ = diesel::sql_query("DROP TABLE users3")
195+
.execute(&mut conn1)
196+
.await;
197+
198+
assert!(
199+
res1.is_ok(),
200+
"Expected the second transaction to be succussfull, but got an error: {:?}",
201+
res1.unwrap_err()
202+
);
203+
204+
assert!(res.is_err(), "Expected the first transaction to fail");
205+
let err = res.unwrap_err();
206+
assert!(
207+
matches!(
208+
&err,
209+
diesel::result::Error::DatabaseError(
210+
diesel::result::DatabaseErrorKind::SerializationFailure,
211+
_
212+
)
213+
),
214+
"Expected an serialization failure but got another error: {err:?}"
215+
);
216+
217+
let mut tx = conn.build_transaction();
218+
219+
let res = tx
220+
.run(|_| Box::pin(async { Ok::<_, diesel::result::Error>(()) }))
221+
.await;
222+
223+
assert!(
224+
res.is_ok(),
225+
"Expect transaction to run fine but got an error: {:?}",
226+
res.unwrap_err()
227+
);
228+
}

0 commit comments

Comments
 (0)