Skip to content

Commit 3b9a984

Browse files
refactor(merge_insert): adapt findPkFromBlob for Rust/C pks schema
PROBLEM: Sync operations failing because findPkFromBlob expected old schema with `pks BLOB` column, but new schema stores PK columns directly. OLD SCHEMA: __crsql_pks(pk INTEGER, base_rowid INTEGER, pks BLOB) Query: SELECT pk FROM pks WHERE pks = ? NEW SCHEMA (Rust/C compatible): __crsql_pks(__crsql_key INTEGER PRIMARY KEY, pk_col1, pk_col2, ...) Query: SELECT __crsql_key FROM pks WHERE col1 = ? AND col2 = ? CHANGES: 1. Import as_crr module for TableInfo access 2. Make getTableInfo() public in as_crr.zig 3. Completely rewrite findPkFromBlob(): - Load TableInfo to get PK column names and count - Unpack pk_blob using codec.unpack() - Build dynamic SQL with WHERE clause for each PK column - Bind unpacked values (Integer, Float, Text, Blob, Null) - Query for __crsql_key instead of pk - Use api.getTransientDestructor() for text/blob binding IMPACT: findPkFromBlob now correctly queries the new pks schema. Sync operations will still fail until INSERT path is refactored (next step). TESTING: - Compiles successfully - Returns NoRows for non-existent entries (expected behavior) - Ready for INSERT path refactoring PART OF: TASK-147 (Rust/C pks schema compatibility) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 255e316 commit 3b9a984

File tree

2 files changed

+79
-7
lines changed

2 files changed

+79
-7
lines changed

zig/src/as_crr.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ fn createPksTable(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
305305
}
306306

307307
/// Query PRAGMA table_info to get column information
308-
fn getTableInfo(db: ?*api.sqlite3, table_name: [*:0]const u8) !TableInfo {
308+
pub fn getTableInfo(db: ?*api.sqlite3, table_name: [*:0]const u8) !TableInfo {
309309
var info = TableInfo{
310310
.columns = undefined,
311311
.count = 0,

zig/src/merge_insert.zig

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const std = @import("std");
2121
const api = @import("ffi/api.zig");
2222
const codec = @import("codec.zig");
2323
const site_identity = @import("site_identity.zig");
24+
const as_crr = @import("as_crr.zig");
2425

2526
/// Error set for merge operations
2627
pub const MergeError = error{
@@ -354,25 +355,96 @@ pub fn setWinnerClock(
354355
}
355356

356357
/// Find the pk (pks table key) from a packed PK blob.
357-
/// Queries the __crsql_pks table to find matching row.
358-
/// Returns the pks table's auto-increment key, NOT the base table rowid.
358+
/// Queries the __crsql_pks table to find matching row using unpacked PK column values.
359+
/// Returns the pks table's auto-increment key (__crsql_key), NOT the base table rowid.
360+
///
361+
/// NEW SCHEMA: The pks table stores PK columns directly, not as a packed blob.
362+
/// This function unpacks the pk_blob and queries:
363+
/// SELECT __crsql_key FROM table__crsql_pks WHERE col1 = ? AND col2 = ?
359364
pub fn findPkFromBlob(
360365
db: ?*api.sqlite3,
361366
table_name: []const u8,
362367
pk_blob: [*]const u8,
363368
pk_blob_len: usize,
364369
) MergeError!i64 {
365-
var buf: [512]u8 = undefined;
366-
const sql = std.fmt.bufPrintZ(&buf, "SELECT pk FROM \"{s}__crsql_pks\" WHERE pks = ?", .{table_name}) catch return MergeError.BufferOverflow;
370+
// Get TableInfo to know PK column names and count
371+
// getTableInfo requires null-terminated string
372+
var table_name_buf: [256]u8 = undefined;
373+
const table_name_z = std.fmt.bufPrintZ(&table_name_buf, "{s}", .{table_name}) catch return MergeError.BufferOverflow;
374+
const info = as_crr.getTableInfo(db, table_name_z) catch return MergeError.SqliteError;
375+
376+
if (info.pk_count == 0) {
377+
return MergeError.SqliteError; // Table must have a primary key
378+
}
379+
380+
// Unpack the pk_blob into individual values
381+
// We need a temporary allocator for the unpacked values
382+
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
383+
defer arena.deinit();
384+
const allocator = arena.allocator();
385+
386+
const pk_blob_slice: []const u8 = pk_blob[0..pk_blob_len];
387+
const values = codec.unpack(allocator, pk_blob_slice) catch return MergeError.DecodeError;
388+
389+
if (values.len != info.pk_count) {
390+
return MergeError.DecodeError; // Mismatch between unpacked values and PK column count
391+
}
392+
393+
// Build SQL: SELECT __crsql_key FROM "table__crsql_pks" WHERE "col1" = ? AND "col2" = ?
394+
var sql_buf: [1024]u8 = undefined;
395+
var fbs = std.io.fixedBufferStream(&sql_buf);
396+
var writer = fbs.writer();
397+
398+
writer.print("SELECT __crsql_key FROM \"{s}__crsql_pks\" WHERE ", .{table_name}) catch return MergeError.BufferOverflow;
399+
400+
// Build WHERE clause with PK columns in order
401+
var pk_order: usize = 1;
402+
var pk_written: usize = 0;
403+
while (pk_written < info.pk_count) : (pk_order += 1) {
404+
// Find the column with this pk_order
405+
var col_name: ?[]const u8 = null;
406+
for (0..info.count) |i| {
407+
const col = &info.columns[i];
408+
if (col.pk_index == @as(c_int, @intCast(pk_order))) {
409+
col_name = col.name[0..col.name_len];
410+
break;
411+
}
412+
}
413+
414+
const name = col_name orelse return MergeError.SqliteError;
367415

416+
if (pk_written > 0) {
417+
writer.writeAll(" AND ") catch return MergeError.BufferOverflow;
418+
}
419+
writer.print("\"{s}\" IS ?", .{name}) catch return MergeError.BufferOverflow;
420+
pk_written += 1;
421+
}
422+
423+
const sql = std.mem.sliceTo(&sql_buf, 0);
424+
425+
// Prepare statement
368426
var stmt: ?*api.sqlite3_stmt = null;
369-
if (api.prepare_v2(db, sql, -1, &stmt, null) != api.SQLITE_OK) {
427+
if (api.prepare_v2(db, sql.ptr, -1, &stmt, null) != api.SQLITE_OK) {
370428
return MergeError.SqliteError;
371429
}
372430
defer _ = api.finalize(stmt);
373431

374-
_ = api.bind_blob(stmt, 1, pk_blob, @intCast(pk_blob_len), api.getTransientDestructor());
432+
// Bind unpacked PK values in order
433+
for (values, 0..) |value, idx| {
434+
const param_idx: c_int = @intCast(idx + 1);
435+
const rc = switch (value) {
436+
.Null => api.bind_null(stmt, param_idx),
437+
.Integer => |i| api.bind_int64(stmt, param_idx, i),
438+
.Float => |f| api.bind_double(stmt, param_idx, f),
439+
.Text => |t| api.bind_text(stmt, param_idx, t.ptr, @intCast(t.len), api.getTransientDestructor()),
440+
.Blob => |b| api.bind_blob(stmt, param_idx, b.ptr, @intCast(b.len), api.getTransientDestructor()),
441+
};
442+
if (rc != api.SQLITE_OK) {
443+
return MergeError.SqliteError;
444+
}
445+
}
375446

447+
// Execute and return __crsql_key
376448
if (api.step(stmt) == api.SQLITE_ROW) {
377449
return api.column_int64(stmt, 0);
378450
}

0 commit comments

Comments
 (0)