Skip to content

Commit e694c4d

Browse files
Zohaib Sibte Hassanclaude
andcommitted
Fix: Skip transpilation for forwarded queries from replica
SQL forwarded from replica is already transpiled (MySQL -> SQLite). The primary was re-transpiling it, causing syntax errors like "ON CONFLICT" appearing in wrong position. - Set TranspilationEnabled=false for forwarded query sessions - Execute SQL directly without re-parsing via ParseStatementsWithSchema - Simplify handleStatement to single SQL execution Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 366ba02 commit e694c4d

File tree

1 file changed

+19
-56
lines changed

1 file changed

+19
-56
lines changed

grpc/forward_handler.go

Lines changed: 19 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -225,48 +225,25 @@ func (h *ForwardHandler) handleStatement(ctx context.Context, session *ForwardSe
225225
}, nil
226226
}
227227

228-
// Use multi-statement parser to handle DDL that generates multiple statements
229-
// (e.g., CREATE TABLE with KEY definitions generates CREATE TABLE + CREATE INDEX)
230-
stmts := protocol.ParseStatementsWithSchema(req.Sql, nil)
231-
if len(stmts) == 0 {
232-
return &ForwardQueryResponse{
233-
Success: false,
234-
ErrorMessage: "failed to parse SQL",
235-
}, nil
236-
}
237-
238-
// Check for parse errors in first statement
239-
if stmts[0].Error != "" {
240-
return &ForwardQueryResponse{
241-
Success: false,
242-
ErrorMessage: stmts[0].Error,
243-
}, nil
244-
}
228+
// SQL from replica is already transpiled - execute directly without re-parsing
229+
// The replica handles transpilation and multi-statement splitting
245230

246231
txn := session.GetTransaction()
247232
if txn != nil {
248-
// For transactions, buffer all statements
249-
for i, stmt := range stmts {
250-
// Params only apply to first statement
251-
var stmtParams []interface{}
252-
if i == 0 {
253-
stmtParams = params
254-
}
255-
if err := session.AddStatement(stmt.SQL, stmtParams); err != nil {
256-
return &ForwardQueryResponse{
257-
Success: false,
258-
ErrorMessage: err.Error(),
259-
}, nil
260-
}
233+
// For transactions, buffer the statement
234+
if err := session.AddStatement(req.Sql, params); err != nil {
235+
return &ForwardQueryResponse{
236+
Success: false,
237+
ErrorMessage: err.Error(),
238+
}, nil
261239
}
262240

263241
log.Debug().
264242
Uint64("replica_node_id", req.ReplicaNodeId).
265243
Uint64("session_id", req.SessionId).
266244
Uint64("txn_id", txn.TxnID).
267-
Int("stmt_count", len(stmts)).
268245
Str("sql", req.Sql).
269-
Msg("Buffered statement(s) in transaction")
246+
Msg("Buffered statement in transaction")
270247

271248
return &ForwardQueryResponse{
272249
Success: true,
@@ -283,10 +260,11 @@ func (h *ForwardHandler) handleStatement(ctx context.Context, session *ForwardSe
283260
execCtx, cancel := context.WithTimeout(ctx, timeout)
284261
defer cancel()
285262

263+
// TranspilationEnabled=false because SQL from replica is already transpiled
286264
connSession := &protocol.ConnectionSession{
287265
ConnID: req.SessionId,
288266
CurrentDatabase: req.Database,
289-
TranspilationEnabled: true, // Required for proper statement type detection (CREATE DATABASE, etc.)
267+
TranspilationEnabled: false,
290268
}
291269

292270
done := make(chan struct{})
@@ -296,27 +274,13 @@ func (h *ForwardHandler) handleStatement(ctx context.Context, session *ForwardSe
296274
var execErr error
297275

298276
go func() {
299-
// Execute all statements in order
300-
for i, stmt := range stmts {
301-
// Params only apply to first statement
302-
var stmtParams []interface{}
303-
if i == 0 {
304-
stmtParams = params
305-
}
306-
rs, err := h.coordHandler.HandleQuery(connSession, stmt.SQL, stmtParams)
307-
if err != nil {
308-
execErr = err
309-
break
310-
}
311-
if rs != nil {
312-
totalRowsAffected += rs.RowsAffected
313-
if rs.LastInsertId > 0 {
314-
lastInsertId = rs.LastInsertId
315-
}
316-
if rs.CommittedTxnId > 0 {
317-
committedTxnId = rs.CommittedTxnId
318-
}
319-
}
277+
rs, err := h.coordHandler.HandleQuery(connSession, req.Sql, params)
278+
if err != nil {
279+
execErr = err
280+
} else if rs != nil {
281+
totalRowsAffected = rs.RowsAffected
282+
lastInsertId = rs.LastInsertId
283+
committedTxnId = rs.CommittedTxnId
320284
}
321285
close(done)
322286
}()
@@ -346,9 +310,8 @@ func (h *ForwardHandler) handleStatement(ctx context.Context, session *ForwardSe
346310
log.Debug().
347311
Uint64("replica_node_id", req.ReplicaNodeId).
348312
Uint64("session_id", req.SessionId).
349-
Int("stmt_count", len(stmts)).
350313
Int64("rows_affected", resp.RowsAffected).
351-
Msg("Statement(s) executed successfully")
314+
Msg("Statement executed successfully")
352315

353316
return resp, nil
354317

0 commit comments

Comments
 (0)