Skip to content

Multi-row INSERTs broken via prepared statements: param-count mismatch or silent broadcast #981

@gurjarprateek

Description

@gurjarprateek

pgdog version observed: v0.1.40 and main@d87123f


Summary

When a client using the extended query protocol prepares a multi-row INSERT and then executes it (the standard db.Prepare(sql).Exec(args) pattern in Go's lib/pq, and similar patterns in JDBC / libpq sync mode), pgdog's split_inserts = "rewrite" produces a wire-protocol mismatch and the operation fails:

pq: got 10 parameters but the statement requires 5

Falling back to the default split_inserts mode (or "ignore") silently broadcasts the multi-row INSERT to all shards, inserting each row N times for an N-shard cluster, which surfaces in the application as e.g.:

ERROR:  bind message supplies 5 parameters, but prepared statement "" requires 10
[or, from the application's RowsAffected check:]
Inserted 8 instead of 2 rows

Sharded workloads that issue multi-row INSERTs via prepared statements (Temporal's persistence layer is the example we hit) cannot function under either configuration without marking the offending tables omnisharded, which trades correctness for 4× write amplification.

Root cause

Two distinct code paths combine to produce the failure:

(1) split_inserts = "rewrite" doesn't handle the two-roundtrip protocol

lib/pq (and any driver that calls db.Prepare(...).Exec(...)) sends extended-protocol messages in two separate requests:

  • Round 1: Parse + Describe + Sync — server returns ParseComplete + ParameterDescription
  • Round 2: Bind + Execute + Sync — server processes the bind

The existing InsertSplit logic in pgdog/src/frontend/router/parser/rewrite/statement/insert.rs and pgdog/src/frontend/router/parser/rewrite/statement/plan.rs::apply() assume Parse + Bind + Execute + Sync arrive in one batched request — which is what the existing unit tests at client/query_engine/test/rewrite_insert_split.rs exercise.

When they arrive in two separate requests:

  • Round 1 rewrites Parse to single-row form and forwards to backends. Each backend's ParameterDescription reports the rewritten placeholder count (5), which pgdog forwards to the client (only first occurrence via MultiServerState::forward).
  • Round 2's Bind carries the original multi-row parameter count (10). The backend's prepared statement has 5 placeholders → bind message supplies 10 parameters, but prepared statement "" requires 5.

Confirmed with RUST_LOG=pgdog=debug:

request buffered [0.0025ms] ['P', 'D', 'S']
INSERT INTO <table> (a, b, c, d, e) VALUES ($1, $2, $3, $4, $5),($6, $7, $8, $9, $10)
INSERT INTO <table> (a, b, c, d, e) VALUES ($1, $2, $3, $4, $5)   ← pgdog rewritten to single-row
[backend prepared statement now has 5 placeholders, ParameterDescription forwarded with 5 params]
request buffered [0.0010ms] ['B', 'E', 'S']
[backend then errors with mismatched param count]

(2) Multi-row INSERTs short-circuit to broadcast in the router

pgdog/src/frontend/router/parser/statement.rs::search_insert_stmt (current main):

// Multi-row VALUES broadcasts to all shards
if select_stmt.values_lists.len() > 1 {
    return Ok(SearchResult::Match(Shard::All));
}

Even with split_inserts = "ignore" and a uniformly-keyed multi-row INSERT (every row shares the same sharding column value — the dominant pattern for our workload), pgdog routes to Shard::All. With cross_shard_disabled = false (default), every backend gets the INSERT and inserts every row, so the application sees Inserted N×shards rows.

Proposed fix

Two changes, ~97 lines total. Tested empirically: 1000 sharded workflow runs complete cleanly, multi-row INSERTs route to the shard-key-correct backend instead of broadcasting.

Change 1: gate RewriteResult::InsertSplit on Parse+Bind co-location

pgdog/src/frontend/router/parser/rewrite/statement/plan.rs:

@@ -140,8 +140,40 @@ impl RewritePlan {
         }
 
         if !self.insert_split.is_empty() {
-            let requests = build_split_requests(&self.insert_split, request);
-            return Ok(RewriteResult::InsertSplit(requests));
+            // The split logic is only correct when the request carries BOTH Parse and Bind
+            // (or simple-protocol Query). Drivers that prepare-then-execute (lib/pq, libpq's
+            // synchronous mode, JDBC, etc.) send Parse+Describe+Sync as one request, then
+            // Bind+Execute+Sync as a separate one. Splitting at Parse-time rewrites the
+            // backend's prepared statement to single-row form, then the later Bind carries
+            // the original multi-row parameter count -> "got N but requires M" mismatch.
+            // Splitting at Bind-time forwards a 5-param Bind to a backend whose prepared
+            // statement is still 10-placeholder -> the mirror "supplies 5 but requires 10"
+            // mismatch.
+            //
+            // For now, only execute the split when both shapes are present so the in-
+            // batch extended-protocol test case continues to work. The deferred case
+            // (separate Parse and Bind requests) is left unsharded — the original multi-
+            // row INSERT passes through to one round-robined backend instead of erroring.
+            // Reads against the data still need to route by shard_id, so users who care
+            // about correct sharding for prepared-multi-row-INSERT workloads should mark
+            // those tables omnisharded for now and wait for the proper deferred-split fix.
+            let has_bind_or_query = request.messages.iter().any(|m| {
+                matches!(m, ProtocolMessage::Bind(_) | ProtocolMessage::Query(_))
+            });
+            let has_parse = request
+                .messages
+                .iter()
+                .any(|m| matches!(m, ProtocolMessage::Parse(_)));
+            // Simple-protocol Query has SQL inline so split works in one shot.
+            // Batched extended-protocol Parse+Bind also has all pieces in one go.
+            let is_query_only = request
+                .messages
+                .iter()
+                .any(|m| matches!(m, ProtocolMessage::Query(_)));
+            if is_query_only || (has_parse && has_bind_or_query) {
+                let requests = build_split_requests(&self.insert_split, request);
+                return Ok(RewriteResult::InsertSplit(requests));
+            }
         }

Change 2: per-row shard extraction for multi-row VALUES

pgdog/src/frontend/router/parser/statement.rs::search_insert_stmt:

@@ -1464,8 +1464,70 @@ impl<'a, 'b, 'c> StatementParser<'a, 'b, 'c> {
         // Handle different INSERT forms
         if let Some(ref select_node) = stmt.select_stmt {
             if let Some(NodeEnum::SelectStmt(ref select_stmt)) = select_node.node {
-                // Multi-row VALUES broadcasts to all shards
+                // Multi-row VALUES: extract a shard per row from the sharding-key
+                // column. If every row converges on the same shard, route there;
+                // if rows would target different shards or we can't determine
+                // them, fall back to broadcast.
                 if select_stmt.values_lists.len() > 1 {
+                    let mut row_shards: Vec<Shard> = Vec::with_capacity(
+                        select_stmt.values_lists.len(),
+                    );
+                    let mut all_resolved = true;
+                    'rows: for values_list in &select_stmt.values_lists {
+                        let Some(NodeEnum::List(ref list)) = values_list.node else {
+                            all_resolved = false;
+                            break 'rows;
+                        };
+                        let mut row_shard: Option<Shard> = None;
+                        for (pos, value_node) in list.items.iter().enumerate() {
+                            let Some(column_name) = columns.get(pos) else {
+                                continue;
+                            };
+                            let table_name = ctx.table.map(|t| t.name);
+                            let table_schema = ctx.table.and_then(|t| t.schema);
+                            let sharded_table = self.get_sharded_table_by_name(
+                                column_name.as_str(),
+                                table_name,
+                                table_schema,
+                            );
+                            if sharded_table.is_none() {
+                                continue;
+                            }
+                            let Ok(value) = Value::try_from(value_node) else {
+                                continue;
+                            };
+                            if let Some(shard) =
+                                self.compute_shard_for_table(sharded_table, value)?
+                            {
+                                row_shard = Some(shard);
+                                break;
+                            }
+                        }
+                        match row_shard {
+                            Some(s) => row_shards.push(s),
+                            None => {
+                                all_resolved = false;
+                                break 'rows;
+                            }
+                        }
+                    }
+                    if all_resolved && !row_shards.is_empty() {
+                        // Converge: if every row hashes to the same shard, route there.
+                        // Otherwise return Matches so the caller's converge() can decide
+                        // (Multi/All).
+                        let unique: HashSet<Shard> = row_shards.iter().cloned().collect();
+                        if unique.len() == 1 {
+                            return Ok(SearchResult::Match(unique.into_iter().next().unwrap()));
+                        }
+                        return Ok(SearchResult::Matches(row_shards));
+                    }
                     return Ok(SearchResult::Match(Shard::All));
                 }

What this fix doesn't cover

The combined patch does not correctly shard the rare case where a multi-row INSERT genuinely targets different shards under the two-roundtrip protocol. That requires the deferred-split architectural change — caching the rewrite plan against the prepared-statement name and applying it when the Bind arrives in the later round trip, plus a synthetic ParameterDescription so the client's view of the prepared statement stays consistent. We avoided that path because it requires cross-request state in the prepared-statement cache and synthetic protocol-message generation, and the same-shard case (rows all sharing the sharding key value) covers our workload.

Minimal reproducer

Self-contained 4-shard docker-compose setup. The pgdog config:

# pgdog/pgdog.toml
[general]
host = "0.0.0.0"
port = 6432

[rewrite]
enabled = true
split_inserts = "rewrite"

[[databases]]
name = "app"
host = "postgres-shard-0"
port = 5432
shard = 0
role = "primary"
user = "app"
password = "app"
# ... repeat for postgres-shard-{1,2,3}

[[sharded_tables]]
database = "app"
column = "shard_id"
data_type = "bigint"
# pgdog/users.toml
[[users]]
name = "app"
password = "app"
database = "app"

Schema:

CREATE TABLE timer_tasks (
  shard_id              INTEGER NOT NULL,
  visibility_timestamp  TIMESTAMP NOT NULL,
  task_id               BIGINT NOT NULL,
  data                  BYTEA NOT NULL,
  data_encoding         VARCHAR(16) NOT NULL,
  PRIMARY KEY (shard_id, visibility_timestamp, task_id)
);

Trigger from a Go process using database/sql + lib/pq:

db, _ := sql.Open("postgres", "postgresql://app:app@127.0.0.1:6432/app?sslmode=disable")
stmt, _ := db.Prepare(
    `INSERT INTO timer_tasks (shard_id, visibility_timestamp, task_id, data, data_encoding)
     VALUES ($1, $2, $3, $4, $5), ($6, $7, $8, $9, $10)`,
)
// All rows share shard_id=5 — should route to one backend.
_, err := stmt.Exec(
    5, time.Now(), int64(1001), []byte{0x00}, "json",
    5, time.Now(), int64(1002), []byte{0x00}, "json",
)
// err: "pq: got 10 parameters but the statement requires 5"

Empirical evidence (numbers)

Same workload, three pgdog configurations, 1000 runs each:

Config Workflows complete? p50 total disk (post-VACUUM)
split_inserts = "rewrite" (default-ish) ❌ All fail n/a n/a
omnisharded task tables + split_inserts = "ignore" 443 ms 38.96 MB (task tables 4× replicated)
With patch (both changes above) 443 ms 38.96 MB (task tables correctly sharded)
Single-postgres baseline (no sharding) 217 ms 7.55 MB

The latency floor is dominated by pgdog's per-query proxy hop across many small queries; the architectural win of the patch is correct routing for multi-row INSERTs (task table rows drop from 4× replicated to actually distributed across the 4 backends), which is what matters when scaling out for WAL-write-wait reduction at production volume.

License

Happy to send the patch as a PR. License is AGPL-3.0 either way, no contribution-license blockers from our side.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions