Skip to content

Commit 643b10d

Browse files
++
1 parent 2c450dd commit 643b10d

File tree

1 file changed

+159
-53
lines changed

1 file changed

+159
-53
lines changed

zig/src/as_crr.zig

Lines changed: 159 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -223,34 +223,82 @@ fn createClockTable(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
223223
}
224224
}
225225

226-
/// Create the pks table for mapping auto-increment keys to packed PK blobs.
227-
/// The pk column is an auto-increment key that is INDEPENDENT of base table rowid.
228-
/// This allows compound/text PK tables to have separate entries for old and new PKs
229-
/// when a PK column is updated (old entry for tombstone, new entry for live row).
230-
///
231-
/// The base_rowid column stores the current base table rowid for live rows.
232-
/// For tombstoned entries, base_rowid is NULL (the row no longer exists).
233-
/// This allows efficient value lookups: pks.pk -> pks.base_rowid -> base_table.rowid
234-
///
235-
/// Schema:
236-
/// ```sql
237-
/// CREATE TABLE IF NOT EXISTS "{table}__crsql_pks" (
238-
/// "pk" INTEGER PRIMARY KEY, -- Auto-increment key (clock table references this)
239-
/// "base_rowid" INTEGER, -- Base table rowid (NULL for tombstoned entries)
240-
/// "pks" BLOB NOT NULL UNIQUE -- Packed PK blob, unique constraint for lookups
241-
/// );
242-
/// ```
226+
/// Create the pks table for mapping auto-increment keys to PK column values.
227+
/// Schema matches Rust/C implementation:
228+
/// - Key: `__crsql_key INTEGER PRIMARY KEY`
229+
/// - One column per PK column from the base table
230+
/// - Unique index `{table}__crsql_pks_pks` on the PK columns
243231
fn createPksTable(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
232+
const info = try getTableInfo(db, table_name);
233+
244234
var buf: [SQL_BUF_SIZE]u8 = undefined;
245-
const sql = std.fmt.bufPrintZ(&buf,
246-
\\CREATE TABLE IF NOT EXISTS "{s}__crsql_pks" (
247-
\\ "pk" INTEGER PRIMARY KEY,
248-
\\ "base_rowid" INTEGER,
249-
\\ "pks" BLOB NOT NULL UNIQUE
250-
\\);
251-
, .{table_name}) catch return error.BufferOverflow;
235+
var fbs = std.io.fixedBufferStream(&buf);
236+
const writer = fbs.writer();
252237

253-
const rc = api.exec(db, sql, null, null, null);
238+
writer.print(
239+
"CREATE TABLE IF NOT EXISTS \"{s}__crsql_pks\" (__crsql_key INTEGER PRIMARY KEY",
240+
.{table_name},
241+
) catch return error.BufferOverflow;
242+
243+
// Add PK columns in order
244+
var pk_order: usize = 1;
245+
var pk_written: usize = 0;
246+
while (pk_written < info.pk_count) : (pk_order += 1) {
247+
for (info.columns[0..info.count]) |col| {
248+
if (col.pk_index == @as(c_int, @intCast(pk_order))) {
249+
writer.print(", \"{s}\"", .{col.name[0..col.name_len]}) catch return error.BufferOverflow;
250+
pk_written += 1;
251+
break;
252+
}
253+
}
254+
}
255+
256+
writer.writeAll(")") catch return error.BufferOverflow;
257+
258+
var sql_len = fbs.pos;
259+
if (sql_len >= SQL_BUF_SIZE) return error.BufferOverflow;
260+
buf[sql_len] = 0;
261+
262+
const sql: [*:0]const u8 = @ptrCast(&buf);
263+
var rc = api.exec(db, sql, null, null, null);
264+
if (rc != api.SQLITE_OK) {
265+
return error.SqliteError;
266+
}
267+
268+
// Create unique index on PK columns
269+
// CREATE UNIQUE INDEX IF NOT EXISTS "{table}__crsql_pks_pks" ON "{table}__crsql_pks" (pk_cols...)
270+
var idx_buf: [SQL_BUF_SIZE]u8 = undefined;
271+
var idx_fbs = std.io.fixedBufferStream(&idx_buf);
272+
const idx_writer = idx_fbs.writer();
273+
274+
idx_writer.print(
275+
"CREATE UNIQUE INDEX IF NOT EXISTS \"{s}__crsql_pks_pks\" ON \"{s}__crsql_pks\" (",
276+
.{ table_name, table_name },
277+
) catch return error.BufferOverflow;
278+
279+
pk_order = 1;
280+
pk_written = 0;
281+
while (pk_written < info.pk_count) : (pk_order += 1) {
282+
for (info.columns[0..info.count]) |col| {
283+
if (col.pk_index == @as(c_int, @intCast(pk_order))) {
284+
if (pk_written > 0) {
285+
idx_writer.writeAll(", ") catch return error.BufferOverflow;
286+
}
287+
idx_writer.print("\"{s}\"", .{col.name[0..col.name_len]}) catch return error.BufferOverflow;
288+
pk_written += 1;
289+
break;
290+
}
291+
}
292+
}
293+
294+
idx_writer.writeAll(")") catch return error.BufferOverflow;
295+
296+
sql_len = idx_fbs.pos;
297+
if (sql_len >= SQL_BUF_SIZE) return error.BufferOverflow;
298+
idx_buf[sql_len] = 0;
299+
300+
const idx_sql: [*:0]const u8 = @ptrCast(&idx_buf);
301+
rc = api.exec(db, idx_sql, null, null, null);
254302
if (rc != api.SQLITE_OK) {
255303
return error.SqliteError;
256304
}
@@ -1025,24 +1073,30 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
10251073
return error.SqliteError;
10261074
}
10271075

1028-
// Build the SELECT query to find rows not yet backfilled
1029-
// Query: SELECT rowid, pk_cols... FROM table WHERE rowid NOT IN (SELECT pk FROM table__crsql_pks)
1076+
// Build the SELECT query to find PK tuples in base table not yet in __crsql_pks
1077+
// Rust/C algorithm: SELECT pk_cols FROM base EXCEPT SELECT pk_cols FROM pks
10301078
var select_buf: [SQL_BUF_SIZE]u8 = undefined;
10311079
var select_fbs = std.io.fixedBufferStream(&select_buf);
10321080
const select_writer = select_fbs.writer();
10331081

1034-
select_writer.print("SELECT rowid", .{}) catch {
1082+
// SELECT pk1, pk2, ... FROM base
1083+
select_writer.writeAll("SELECT ") catch {
10351084
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
10361085
return error.BufferOverflow;
10371086
};
10381087

1039-
// Add PK columns in order for crsql_pack_columns
10401088
var pk_order: usize = 1;
10411089
var pk_written: usize = 0;
10421090
while (pk_written < info.pk_count) : (pk_order += 1) {
10431091
for (info.columns[0..info.count]) |col| {
10441092
if (col.pk_index == @as(c_int, @intCast(pk_order))) {
1045-
select_writer.print(", \"{s}\"", .{col.name[0..col.name_len]}) catch {
1093+
if (pk_written > 0) {
1094+
select_writer.writeAll(", ") catch {
1095+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1096+
return error.BufferOverflow;
1097+
};
1098+
}
1099+
select_writer.print("\"{s}\"", .{col.name[0..col.name_len]}) catch {
10461100
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
10471101
return error.BufferOverflow;
10481102
};
@@ -1052,8 +1106,39 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
10521106
}
10531107
}
10541108

1055-
// Find rows not yet in pks table (use base_rowid column, not pk)
1056-
select_writer.print(" FROM \"{s}\" WHERE rowid NOT IN (SELECT base_rowid FROM \"{s}__crsql_pks\" WHERE base_rowid IS NOT NULL)", .{ table_name, table_name }) catch {
1109+
select_writer.print(" FROM \"{s}\" AS t1 ", .{table_name}) catch {
1110+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1111+
return error.BufferOverflow;
1112+
};
1113+
1114+
// EXCEPT SELECT pk1, pk2, ... FROM pks
1115+
select_writer.writeAll("EXCEPT SELECT ") catch {
1116+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1117+
return error.BufferOverflow;
1118+
};
1119+
1120+
pk_order = 1;
1121+
pk_written = 0;
1122+
while (pk_written < info.pk_count) : (pk_order += 1) {
1123+
for (info.columns[0..info.count]) |col| {
1124+
if (col.pk_index == @as(c_int, @intCast(pk_order))) {
1125+
if (pk_written > 0) {
1126+
select_writer.writeAll(", ") catch {
1127+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1128+
return error.BufferOverflow;
1129+
};
1130+
}
1131+
select_writer.print("\"{s}\"", .{col.name[0..col.name_len]}) catch {
1132+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1133+
return error.BufferOverflow;
1134+
};
1135+
pk_written += 1;
1136+
break;
1137+
}
1138+
}
1139+
}
1140+
1141+
select_writer.print(" FROM \"{s}__crsql_pks\" AS t2", .{table_name}) catch {
10571142
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
10581143
return error.BufferOverflow;
10591144
};
@@ -1076,17 +1161,44 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
10761161
defer _ = api.finalize(select_stmt);
10771162

10781163
// Build INSERT statement for pks table
1164+
// Rust/C schema: INSERT INTO "{table}__crsql_pks" (pk_cols...) VALUES (?, ?, ...) RETURNING __crsql_key
10791165
var pks_insert_buf: [SQL_BUF_SIZE]u8 = undefined;
10801166
var pks_fbs = std.io.fixedBufferStream(&pks_insert_buf);
10811167
const pks_writer = pks_fbs.writer();
10821168

1083-
// Insert with base_rowid instead of pk (pk auto-increments)
1084-
pks_writer.print("INSERT OR IGNORE INTO \"{s}__crsql_pks\" (base_rowid, pks) VALUES (?, crsql_pack_columns(", .{table_name}) catch {
1169+
pks_writer.print("INSERT INTO \"{s}__crsql_pks\" (", .{table_name}) catch {
10851170
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
10861171
return error.BufferOverflow;
10871172
};
10881173

1089-
// Add placeholders for each PK column value
1174+
// Column list
1175+
var pk_order_insert: usize = 1;
1176+
var pk_written_insert: usize = 0;
1177+
while (pk_written_insert < info.pk_count) : (pk_order_insert += 1) {
1178+
for (info.columns[0..info.count]) |col| {
1179+
if (col.pk_index == @as(c_int, @intCast(pk_order_insert))) {
1180+
if (pk_written_insert > 0) {
1181+
pks_writer.writeAll(", ") catch {
1182+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1183+
return error.BufferOverflow;
1184+
};
1185+
}
1186+
pks_writer.print("\"{s}\"", .{col.name[0..col.name_len]}) catch {
1187+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1188+
return error.BufferOverflow;
1189+
};
1190+
pk_written_insert += 1;
1191+
break;
1192+
}
1193+
}
1194+
}
1195+
1196+
pks_writer.writeAll(") VALUES (") catch {
1197+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1198+
return error.BufferOverflow;
1199+
};
1200+
1201+
// Placeholders
10901202
for (0..info.pk_count) |i| {
10911203
if (i > 0) {
10921204
pks_writer.writeAll(", ") catch {
@@ -1100,7 +1212,7 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
11001212
};
11011213
}
11021214

1103-
pks_writer.writeAll("))") catch {
1215+
pks_writer.writeAll(") RETURNING __crsql_key") catch {
11041216
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
11051217
return error.BufferOverflow;
11061218
};
@@ -1145,18 +1257,8 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
11451257

11461258
// Iterate over rows that need backfilling
11471259
while (api.step(select_stmt) == api.SQLITE_ROW) {
1148-
// Column 0 is rowid
1149-
const rowid = api.column_int64(select_stmt, 0);
1150-
1151-
// Insert into pks table
1152-
// Bind rowid as first param
1153-
if (api.bind_int64(pks_stmt, 1, rowid) != api.SQLITE_OK) {
1154-
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1155-
return error.SqliteError;
1156-
}
1157-
1158-
// Bind PK column values (columns 1..pk_count from select)
1159-
for (1..info.pk_count + 1) |i| {
1260+
// Bind PK column values (columns 0..pk_count from select)
1261+
for (0..info.pk_count) |i| {
11601262
const col_idx: c_int = @intCast(i);
11611263
const bind_idx: c_int = @intCast(i + 1);
11621264
const value = api.column_value(select_stmt, col_idx);
@@ -1193,19 +1295,23 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
11931295
}
11941296
}
11951297

1196-
_ = api.step(pks_stmt);
1298+
// Insert pks entry and capture assigned __crsql_key
1299+
const step_rc = api.step(pks_stmt);
1300+
if (step_rc != api.SQLITE_ROW) {
1301+
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
1302+
return error.SqliteError;
1303+
}
1304+
const key = api.column_int64(pks_stmt, 0);
11971305
_ = api.reset(pks_stmt);
11981306

11991307
// Insert clock entries for each non-PK column
12001308
for (info.columns[0..info.count]) |col| {
12011309
if (col.pk_index == 0) {
1202-
// Non-PK column - create clock entry
1203-
if (api.bind_int64(clock_stmt, 1, rowid) != api.SQLITE_OK) {
1310+
if (api.bind_int64(clock_stmt, 1, key) != api.SQLITE_OK) {
12041311
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
12051312
return error.SqliteError;
12061313
}
12071314

1208-
// Bind column name
12091315
const col_name_slice = col.name[0..col.name_len];
12101316
if (api.bind_text(clock_stmt, 2, @ptrCast(col_name_slice.ptr), @intCast(col.name_len), api.getTransientDestructor()) != api.SQLITE_OK) {
12111317
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
@@ -1219,7 +1325,7 @@ fn backfillExistingRows(db: ?*api.sqlite3, table_name: [*:0]const u8) !void {
12191325

12201326
// If there are no non-PK columns, insert sentinel row
12211327
if (non_pk_count == 0) {
1222-
if (api.bind_int64(clock_stmt, 1, rowid) != api.SQLITE_OK) {
1328+
if (api.bind_int64(clock_stmt, 1, key) != api.SQLITE_OK) {
12231329
_ = api.exec(db, "ROLLBACK TO backfill", null, null, null);
12241330
return error.SqliteError;
12251331
}

0 commit comments

Comments
 (0)