Skip to content

Commit 3711932

Browse files
author
Zohaib Sibte Hassan
committed
transpiler/cdc: wire schema-aware upsert conflicts and remove forced WAL checkpoints
This commit completes the in-flight replication/transpiler fixes and cleans up CDC apply behavior. Transpiler and parse pipeline: - Add ParseOptions.SchemaProvider and thread it through parser/query context. - Refactor InsertOnDuplicateKeyRule to return conflict metadata instead of mutating serializer state. - Make SQLite serializer stateless for conflict targets via SerializeWithOpts. - Consolidate transpiler flow to single final serialization, qualifier stripping, and literal extraction ordering. - Preserve schema-derived ON CONFLICT targets when literal extraction is enabled. - Add/adjust tests for literal extraction + ON DUPLICATE KEY, serializer behavior, and ANSI-quote/backslash handling. - Tighten auto-increment injection behavior to skip unsafe injection when INSERT has no explicit column list. Runtime wiring for P1: - Extend coordinator DatabaseManager contract with GetTranspilerSchema. - Implement GetTranspilerSchema in db.DatabaseManager using cached schema. - Wire schema provider into coordinator and read-only replica ParseStatementWithOptions paths. - Filter out SchemaCache rowid sentinel from conflict targeting so fallback behavior remains correct when no explicit PK exists. - Update coordinator-facing test database manager mocks and add focused DB unit tests for GetTranspilerSchema. CDC / replication visibility and P2 rollback: - Keep shared-cache DSN usage across write/read/reopen and batch committer DB connections. - Ensure snapshot/read query paths use readDB. - Remove synchronous PRAGMA wal_checkpoint(PASSIVE) calls from CDC apply commit and batch flush commit hot paths. - Retain adaptive/background checkpoint behavior where applicable. Functional impact: - INSERT ... ON DUPLICATE KEY now resolves ON CONFLICT targets from real schema at runtime (not parser-local fallback only). - Literal extraction no longer drops schema-derived conflict targets. - Replica local reads stay local while forwarded writes can still opt into visibility guarantees via wait-for-replication semantics, without per-commit forced checkpoint overhead.
1 parent 0088b8d commit 3711932

21 files changed

+575
-198
lines changed

coordinator/handler.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type DatabaseManager interface {
3434
// GetAutoIncrementColumn returns the auto-increment column name for a table.
3535
// Uses cached schema - does NOT query SQLite PRAGMA.
3636
GetAutoIncrementColumn(database, table string) (string, error)
37+
// GetTranspilerSchema returns schema information for transpiler conflict resolution.
38+
// Uses cached schema - does NOT query SQLite PRAGMA.
39+
GetTranspilerSchema(database, table string) (*transform.SchemaInfo, error)
3740
}
3841

3942
// ReplicatedDatabaseProvider provides access to replicated database operations
@@ -213,7 +216,7 @@ func (h *CoordinatorHandler) HandleQuery(session *protocol.ConnectionSession, sq
213216
// Build schema lookup function for auto-increment ID injection.
214217
// Uses cached schema via DatabaseManager - does NOT query SQLite PRAGMA.
215218
var schemaLookup protocol.SchemaLookupFunc
216-
if session.CurrentDatabase != "" {
219+
if h.dbManager != nil && session.CurrentDatabase != "" {
217220
dbName := session.CurrentDatabase
218221
schemaLookup = func(table string) string {
219222
col, err := h.dbManager.GetAutoIncrementColumn(dbName, table)
@@ -223,10 +226,29 @@ func (h *CoordinatorHandler) HandleQuery(session *protocol.ConnectionSession, sq
223226
return col
224227
}
225228
}
229+
schemaProvider := func(database, table string) *transform.SchemaInfo {
230+
if table == "" || h.dbManager == nil {
231+
return nil
232+
}
233+
dbName := database
234+
if dbName == "" {
235+
dbName = session.CurrentDatabase
236+
}
237+
if dbName == "" {
238+
return nil
239+
}
240+
241+
schemaInfo, err := h.dbManager.GetTranspilerSchema(dbName, table)
242+
if err != nil {
243+
return nil
244+
}
245+
return schemaInfo
246+
}
226247

227248
// Parse with options based on session state
228249
stmt := protocol.ParseStatementWithOptions(sql, protocol.ParseOptions{
229250
SchemaLookup: schemaLookup,
251+
SchemaProvider: schemaProvider,
230252
SkipTranspilation: !session.TranspilationEnabled,
231253
ExtractLiterals: true, // Enable literal extraction for parameterized execution
232254
})

db/database_manager.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/maxpert/marmot/coordinator"
1616
"github.com/maxpert/marmot/hlc"
17+
"github.com/maxpert/marmot/protocol/query/transform"
1718
"github.com/maxpert/marmot/publisher"
1819
"github.com/rs/zerolog/log"
1920
)
@@ -1291,6 +1292,33 @@ func (dm *DatabaseManager) GetAutoIncrementColumn(database, table string) (strin
12911292
return schema.GetAutoIncrementCol(), nil
12921293
}
12931294

1295+
// GetTranspilerSchema returns schema information used by SQL transpilation rules.
1296+
// Uses cached schema - does NOT query SQLite PRAGMA.
1297+
func (dm *DatabaseManager) GetTranspilerSchema(database, table string) (*transform.SchemaInfo, error) {
1298+
db, err := dm.GetDatabase(database)
1299+
if err != nil {
1300+
return nil, err
1301+
}
1302+
1303+
schema, err := db.GetCachedTableSchema(table)
1304+
if err != nil {
1305+
return nil, fmt.Errorf("schema not cached for table %s: %w", table, err)
1306+
}
1307+
1308+
info := &transform.SchemaInfo{
1309+
AutoIncrementColumn: schema.GetAutoIncrementCol(),
1310+
}
1311+
1312+
// PrimaryKeys uses "rowid" sentinel when no explicit PRIMARY KEY is defined.
1313+
// Keep it out of transpiler conflict targeting to preserve fallback behavior.
1314+
if len(schema.PrimaryKeys) > 0 &&
1315+
!(len(schema.PrimaryKeys) == 1 && strings.EqualFold(schema.PrimaryKeys[0], "rowid")) {
1316+
info.PrimaryKey = append([]string(nil), schema.PrimaryKeys...)
1317+
}
1318+
1319+
return info, nil
1320+
}
1321+
12941322
// GetCommittedTxnCount returns the count of committed transactions in a database
12951323
// This is used by anti-entropy to compare data completeness between nodes
12961324
func (dm *DatabaseManager) GetCommittedTxnCount(database string) (int64, error) {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
//go:build sqlite_preupdate_hook
2+
// +build sqlite_preupdate_hook
3+
4+
package db
5+
6+
import "testing"
7+
8+
func TestGetTranspilerSchema_PrimaryKey(t *testing.T) {
9+
dm, _ := setupTestDatabaseManager(t)
10+
defer dm.Close()
11+
12+
if err := dm.CreateDatabase("testdb"); err != nil {
13+
t.Fatalf("CreateDatabase failed: %v", err)
14+
}
15+
16+
mdb, err := dm.GetDatabase("testdb")
17+
if err != nil {
18+
t.Fatalf("GetDatabase failed: %v", err)
19+
}
20+
21+
cache, ok := mdb.GetSchemaCache().(*SchemaCache)
22+
if !ok || cache == nil {
23+
t.Fatal("schema cache is unavailable")
24+
}
25+
26+
cache.Update("users", &TableSchema{
27+
PrimaryKeys: []string{"tenant_id", "user_id"},
28+
AutoIncrementCol: "user_id",
29+
})
30+
31+
info, err := dm.GetTranspilerSchema("testdb", "users")
32+
if err != nil {
33+
t.Fatalf("GetTranspilerSchema failed: %v", err)
34+
}
35+
36+
if len(info.PrimaryKey) != 2 || info.PrimaryKey[0] != "tenant_id" || info.PrimaryKey[1] != "user_id" {
37+
t.Fatalf("unexpected primary key: %#v", info.PrimaryKey)
38+
}
39+
if info.AutoIncrementColumn != "user_id" {
40+
t.Fatalf("unexpected auto-increment column: %q", info.AutoIncrementColumn)
41+
}
42+
}
43+
44+
func TestGetTranspilerSchema_IgnoresRowIDSentinel(t *testing.T) {
45+
dm, _ := setupTestDatabaseManager(t)
46+
defer dm.Close()
47+
48+
if err := dm.CreateDatabase("testdb"); err != nil {
49+
t.Fatalf("CreateDatabase failed: %v", err)
50+
}
51+
52+
mdb, err := dm.GetDatabase("testdb")
53+
if err != nil {
54+
t.Fatalf("GetDatabase failed: %v", err)
55+
}
56+
57+
cache, ok := mdb.GetSchemaCache().(*SchemaCache)
58+
if !ok || cache == nil {
59+
t.Fatal("schema cache is unavailable")
60+
}
61+
62+
cache.Update("logs", &TableSchema{
63+
PrimaryKeys: []string{"rowid"},
64+
})
65+
66+
info, err := dm.GetTranspilerSchema("testdb", "logs")
67+
if err != nil {
68+
t.Fatalf("GetTranspilerSchema failed: %v", err)
69+
}
70+
71+
if len(info.PrimaryKey) != 0 {
72+
t.Fatalf("expected empty primary key, got %#v", info.PrimaryKey)
73+
}
74+
}

db/db_integration.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,13 @@ func NewReplicatedDatabase(dbPath string, nodeID uint64, clock *hlc.Clock, metaS
7070
// === WRITE CONNECTION ===
7171
// Single connection for all writes (SQLite allows only one writer at a time)
7272
// Uses _txlock=immediate to acquire write lock at BEGIN, avoiding deadlocks
73+
// Uses cache=shared so all local connections share a single page cache.
7374
writeDSN := dbPath
7475
if !isMemoryDB {
7576
if strings.Contains(writeDSN, "?") {
76-
writeDSN += fmt.Sprintf("&_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate", busyTimeoutMS)
77+
writeDSN += fmt.Sprintf("&_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate&cache=shared", busyTimeoutMS)
7778
} else {
78-
writeDSN += fmt.Sprintf("?_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate", busyTimeoutMS)
79+
writeDSN += fmt.Sprintf("?_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate&cache=shared", busyTimeoutMS)
7980
}
8081
}
8182

@@ -108,12 +109,13 @@ func NewReplicatedDatabase(dbPath string, nodeID uint64, clock *hlc.Clock, metaS
108109
// Multiple connections for concurrent reads (WAL mode supports this)
109110
// No _txlock needed for reads - they don't acquire write locks
110111
// Note: Don't use mode=ro as it can interfere with WAL checkpointing
112+
// Uses cache=shared so all local connections share a single page cache.
111113
readDSN := dbPath
112114
if !isMemoryDB {
113115
if strings.Contains(readDSN, "?") {
114-
readDSN += fmt.Sprintf("&_journal_mode=WAL&_busy_timeout=%d", busyTimeoutMS)
116+
readDSN += fmt.Sprintf("&_journal_mode=WAL&_busy_timeout=%d&cache=shared", busyTimeoutMS)
115117
} else {
116-
readDSN += fmt.Sprintf("?_journal_mode=WAL&_busy_timeout=%d", busyTimeoutMS)
118+
readDSN += fmt.Sprintf("?_journal_mode=WAL&_busy_timeout=%d&cache=shared", busyTimeoutMS)
117119
}
118120
}
119121

@@ -357,7 +359,7 @@ func (mdb *ReplicatedDatabase) OpenSQLiteConnections(dbPath string) error {
357359
poolCfg := cfg.Config.ConnectionPool
358360

359361
// Open write connection
360-
writeDSN := fmt.Sprintf("%s?_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate", dbPath, busyTimeoutMS)
362+
writeDSN := fmt.Sprintf("%s?_journal_mode=WAL&_busy_timeout=%d&_txlock=immediate&cache=shared", dbPath, busyTimeoutMS)
361363
writeDB, err := sql.Open(SQLiteDriverName, writeDSN)
362364
if err != nil {
363365
return fmt.Errorf("failed to open write connection: %w", err)
@@ -383,7 +385,7 @@ func (mdb *ReplicatedDatabase) OpenSQLiteConnections(dbPath string) error {
383385
hookDB.SetConnMaxLifetime(0)
384386

385387
// Open read connection pool
386-
readDSN := fmt.Sprintf("%s?_journal_mode=WAL&_busy_timeout=%d", dbPath, busyTimeoutMS)
388+
readDSN := fmt.Sprintf("%s?_journal_mode=WAL&_busy_timeout=%d&cache=shared", dbPath, busyTimeoutMS)
387389
readDB, err := sql.Open(SQLiteDriverName, readDSN)
388390
if err != nil {
389391
writeDB.Close()
@@ -717,14 +719,12 @@ func (mdb *ReplicatedDatabase) ExecuteTransaction(ctx context.Context, statement
717719
return nil
718720
}
719721

720-
// ExecuteQuery executes a read query with snapshot isolation
721-
// Uses the read connection pool for concurrent read access
722+
// ExecuteQuery executes a read query with snapshot isolation.
723+
// Uses the read connection pool for concurrent read access.
722724
func (mdb *ReplicatedDatabase) ExecuteQuery(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
723725
// Get current snapshot timestamp
724726
snapshotTS := mdb.clock.Now()
725727

726-
// Execute on read connection pool for concurrent reads
727-
// SQLite's WAL mode provides snapshot isolation at the connection level
728728
rows, err := mdb.readDB.QueryContext(ctx, query, args...)
729729
if err != nil {
730730
return nil, fmt.Errorf("query failed: %w", err)
@@ -736,11 +736,9 @@ func (mdb *ReplicatedDatabase) ExecuteQuery(ctx context.Context, query string, a
736736
return rows, nil
737737
}
738738

739-
// ExecuteSnapshotRead executes a read query with full transactional support
740-
// Uses rqlite/sql AST parser for proper SQL analysis
741-
// Uses the read connection pool for concurrent read access
739+
// ExecuteSnapshotRead executes a read query with full transactional support.
740+
// Uses the read connection pool for concurrent read access.
742741
func (mdb *ReplicatedDatabase) ExecuteSnapshotRead(ctx context.Context, query string, args ...interface{}) ([]string, []map[string]interface{}, error) {
743-
// SQLite WAL mode provides snapshot isolation at connection level
744742
rows, err := mdb.readDB.QueryContext(ctx, query, args...)
745743
if err != nil {
746744
return nil, nil, err
@@ -776,8 +774,8 @@ func (mdb *ReplicatedDatabase) ExecuteSnapshotRead(ctx context.Context, query st
776774
return columns, results, nil
777775
}
778776

779-
// ExecuteQueryRow executes a single-row query with snapshot isolation
780-
// Uses the read connection pool for concurrent read access
777+
// ExecuteQueryRow executes a single-row query with snapshot isolation.
778+
// Uses the read connection pool for concurrent read access.
781779
func (mdb *ReplicatedDatabase) ExecuteQueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row {
782780
// Get current snapshot timestamp
783781
// Note: SQLite's WAL mode provides snapshot isolation at transaction level.

db/sqlite_batch_committer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,13 @@ func (bc *SQLiteBatchCommitter) openOptimizedConnection() (*sql.DB, error) {
177177
// Build DSN with batch-optimized settings
178178
// WAL mode for compatibility with other connections
179179
// _txlock=immediate to acquire write lock at BEGIN
180+
// cache=shared ensures writes are immediately visible to other connections
180181
dsn := bc.dbPath
181182
if !strings.Contains(dsn, ":memory:") {
182183
if strings.Contains(dsn, "?") {
183-
dsn += "&_journal_mode=WAL&_txlock=immediate"
184+
dsn += "&_journal_mode=WAL&_txlock=immediate&cache=shared"
184185
} else {
185-
dsn += "?_journal_mode=WAL&_txlock=immediate"
186+
dsn += "?_journal_mode=WAL&_txlock=immediate&cache=shared"
186187
}
187188
}
188189

@@ -400,7 +401,7 @@ func (bc *SQLiteBatchCommitter) flush(batch map[uint64]*pendingCommit, trigger s
400401
return
401402
}
402403

403-
// Adaptive checkpoint
404+
// Adaptive checkpoint for larger WAL sizes
404405
if bc.checkpointEnabled {
405406
walSizeMB := bc.checkWALSize()
406407
if walSizeMB >= bc.checkpointPassiveThreshMB {

db/transaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ func (tm *TransactionManager) applyCDCEntries(txnID uint64, entries []*IntentEnt
391391
if err := tx.Commit(); err != nil {
392392
return fmt.Errorf("failed to commit CDC transaction: %w", err)
393393
}
394+
394395
return nil
395396
}
396397

protocol/parser.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/maxpert/marmot/id"
88
"github.com/maxpert/marmot/protocol/query"
9+
"github.com/maxpert/marmot/protocol/query/transform"
910
"github.com/rs/zerolog/log"
1011
)
1112

@@ -114,6 +115,7 @@ type SchemaLookupFunc func(table string) string
114115
// ParseOptions holds options for parsing SQL statements.
115116
type ParseOptions struct {
116117
SchemaLookup SchemaLookupFunc
118+
SchemaProvider transform.SchemaProvider // For ON CONFLICT target resolution
117119
SkipTranspilation bool
118120
ExtractLiterals bool // Enable literal extraction for parameterized execution
119121
}
@@ -129,6 +131,7 @@ func ParseStatement(sql string) Statement {
129131
func ParseStatementWithOptions(sql string, opts ParseOptions) Statement {
130132
ctx := query.NewContext(sql, nil)
131133
ctx.SchemaLookup = opts.SchemaLookup
134+
ctx.SchemaProvider = opts.SchemaProvider
132135
ctx.SkipTranspilation = opts.SkipTranspilation
133136
ctx.ExtractLiterals = opts.ExtractLiterals
134137

protocol/query/context.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,8 @@ type QueryOutput struct {
108108

109109
// TranspiledStatement represents a single transpiled SQL statement with parameters.
110110
type TranspiledStatement struct {
111-
SQL string
112-
Params []interface{}
113-
RequiresPrepare bool
111+
SQL string
112+
Params []interface{}
114113
}
115114

116115
// MySQLParseState holds MySQL-specific parsing state and metadata.
@@ -128,6 +127,9 @@ type MySQLParseState struct {
128127
TableName string
129128
// ShowFilter holds the LIKE pattern for SHOW TABLES LIKE queries
130129
ShowFilter string
130+
// ConflictColumns holds columns for ON CONFLICT clause.
131+
// Set by InsertOnDuplicateKeyRule during transpilation.
132+
ConflictColumns []string
131133
}
132134

133135
// QueryContext holds all state for processing a single query through the pipeline.

0 commit comments

Comments
 (0)