Skip to content

Commit eaacfc4

Browse files
authored
fix: shard key updater didn't disconnect servers, causing routing confusion (#722)
If client attempted to do a sharding key update without a transaction, PgDog would return an error and forget to disconnect backends. Subsequent query would be routed to previous route incorrectly, causing confusion.
1 parent 8431642 commit eaacfc4

File tree

2 files changed

+73
-1
lines changed
  • integration/rust/tests/integration
  • pgdog/src/frontend/client/query_engine/multi_step

2 files changed

+73
-1
lines changed

integration/rust/tests/integration/rewrite.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,77 @@ async fn update_expects_transactions() {
280280
cleanup_table(&pool).await;
281281
}
282282

283+
#[tokio::test]
284+
async fn test_error_disconnects_and_update_works() -> Result<(), Box<dyn std::error::Error>> {
285+
let conn = connections_sqlx().await.pop().unwrap();
286+
let admin = admin_sqlx().await;
287+
288+
admin.execute("SET rewrite_enabled TO true").await?;
289+
admin
290+
.execute("SET rewrite_shard_key_updates TO 'rewrite'")
291+
.await?;
292+
admin
293+
.execute("SET rewrite_split_inserts TO 'rewrite'")
294+
.await?;
295+
admin.execute("SET two_phase_commit TO true").await?;
296+
admin.execute("SET two_phase_commit_auto TO true").await?;
297+
298+
conn.execute("TRUNCATE TABLE sharded").await?;
299+
300+
for _ in 0..250 {
301+
conn.execute("INSERT INTO sharded (id, value) VALUES (pgdog.unique_id(), 'test')")
302+
.await?;
303+
}
304+
305+
let ids: Vec<(i64,)> = sqlx::query_as("SELECT id FROM sharded")
306+
.fetch_all(&conn)
307+
.await?;
308+
309+
let mut errors = 0;
310+
for id in ids.iter() {
311+
let num: i64 = rand::random();
312+
let err = sqlx::query("UPDATE sharded SET id = $2 WHERE id = $1")
313+
.bind(id.0)
314+
.bind(num as i64)
315+
.execute(&conn)
316+
.await
317+
.err();
318+
319+
if let Some(err) = err {
320+
errors += 1;
321+
assert!(
322+
err.to_string()
323+
.contains("sharding key update must be executed inside a transaction"),
324+
"{}",
325+
err.to_string(),
326+
);
327+
}
328+
329+
let mut transaction = conn.begin().await?;
330+
sqlx::query("UPDATE sharded SET id = $2 WHERE id = $1")
331+
.bind(id.0)
332+
.bind(num as i64)
333+
.execute(&mut *transaction)
334+
.await?;
335+
let _ = sqlx::query("SELECT * FROM sharded WHERE id = $1")
336+
.bind(num as i64)
337+
.fetch_one(&mut *transaction)
338+
.await?;
339+
transaction.commit().await?;
340+
341+
let _ = sqlx::query("SELECT * FROM sharded WHERE id = $1")
342+
.bind(num as i64)
343+
.fetch_one(&conn)
344+
.await?;
345+
}
346+
347+
assert!(errors > 0);
348+
349+
admin.execute("RELOAD").await?;
350+
351+
Ok(())
352+
}
353+
283354
async fn prepare_table(pool: &Pool<Postgres>) {
284355
for shard in [0, 1] {
285356
let drop = format!("/* pgdog_shard: {shard} */ DROP TABLE IF EXISTS {TEST_TABLE}");

pgdog/src/frontend/client/query_engine/multi_step/update.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,12 @@ impl<'a> UpdateMulti<'a> {
126126
return Ok(());
127127
}
128128

129-
if !context.in_transaction() && !self.engine.backend.is_multishard()
129+
if !context.in_transaction() || !self.engine.backend.is_multishard()
130130
// Do this check at the last possible moment.
131131
// Just in case we change how transactions are
132132
// routed in the future.
133133
{
134+
self.engine.cleanup_backend(context);
134135
return Err(UpdateError::TransactionRequired.into());
135136
}
136137

0 commit comments

Comments
 (0)