Skip to content

Commit 0088b8d

Browse files
author
Zohaib Sibte Hassan
committed
replica: make forwarded writes leader-transparent and idempotent
Rework the read-only replica write-forwarding path so forwarded writes execute through a persistent leader-side ConnectionSession, matching coordinator transaction semantics instead of custom statement buffering/replay. Key changes: - ForwardQuery now requires request_id and executes through idempotent session orchestration (duplicate request_id waits for/returns the original result). - Leader forward sessions now own a persistent protocol.ConnectionSession and serialize execution to preserve per-session ordering and transaction state. - Forwarded BEGIN/COMMIT/ROLLBACK are routed directly through coordinator transaction control on that session. - Replica now generates monotonic per-session request ids (ConnectionSession.NextForwardRequestID) for all forwarded write and txn-control requests. - Replica forwarding retries once on retriable transport errors (deadline/unavailable/canceled/resource exhausted) using the same request_id for safe dedupe. - wait_for_replication semantics now cover forwarded explicit COMMIT as well (waits on committed_txn_id when enabled). - Extend ForwardQueryRequest proto with request_id and regenerate grpc bindings. - Update forward handler/session tests to validate request-id dedupe, session execution, and transaction state behavior under the new model. Validation: - go test -tags sqlite_preupdate_hook ./grpc -run 'TestForwardHandler_|TestForwardSession' (run twice) - go test -tags sqlite_preupdate_hook -race ./grpc -run 'TestForwardHandler_|TestForwardSession' - go test -tags sqlite_preupdate_hook ./replica (run twice) - go test -tags sqlite_preupdate_hook -race ./replica - go test -tags sqlite_preupdate_hook ./protocol (run twice) - go test -tags sqlite_preupdate_hook -race ./protocol - go build -tags sqlite_preupdate_hook ./...
1 parent 93a7340 commit 0088b8d

File tree

9 files changed

+512
-695
lines changed

9 files changed

+512
-695
lines changed

grpc/forward_handler.go

Lines changed: 136 additions & 263 deletions
Large diffs are not rendered by default.

grpc/forward_handler_test.go

Lines changed: 69 additions & 271 deletions
Large diffs are not rendered by default.

grpc/forward_session.go

Lines changed: 132 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package grpc
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67
"time"
78

8-
"github.com/maxpert/marmot/hlc"
9+
"github.com/maxpert/marmot/protocol"
910
"github.com/rs/zerolog/log"
1011
)
1112

@@ -15,27 +16,25 @@ type ForwardSessionKey struct {
1516
SessionID uint64
1617
}
1718

18-
// BufferedStatement pairs a SQL statement with its parameters for deferred execution
19-
type BufferedStatement struct {
20-
SQL string
21-
Params []any
22-
}
19+
const maxCachedForwardRequests = 1024
2320

24-
// ForwardTransaction represents an active transaction being built on the leader
25-
type ForwardTransaction struct {
26-
TxnID uint64
27-
StartTS hlc.Timestamp
28-
Statements []BufferedStatement
29-
Database string
21+
type forwardRequestState struct {
22+
done chan struct{}
23+
resp *ForwardQueryResponse
3024
}
3125

3226
// ForwardSession tracks a single client session on the leader for write forwarding
3327
type ForwardSession struct {
3428
Key ForwardSessionKey
3529
Database string
36-
ActiveTxn *ForwardTransaction
30+
ConnSession *protocol.ConnectionSession
3731
LastActivity time.Time
38-
mu sync.Mutex
32+
33+
execMu sync.Mutex
34+
mu sync.Mutex
35+
36+
requestStates map[uint64]*forwardRequestState
37+
requestOrder []uint64
3938
}
4039

4140
// ForwardSessionManager manages all active forwarding sessions on the leader
@@ -46,11 +45,6 @@ type ForwardSessionManager struct {
4645
stopCh chan struct{}
4746
}
4847

49-
var (
50-
errNoActiveTransaction = errors.New("no active transaction")
51-
errTransactionAlreadyActive = errors.New("transaction already active")
52-
)
53-
5448
// NewForwardSessionManager creates a new session manager and starts cleanup loop
5549
func NewForwardSessionManager(timeout time.Duration) *ForwardSessionManager {
5650
m := &ForwardSessionManager{
@@ -73,9 +67,12 @@ func (m *ForwardSessionManager) GetOrCreateSession(key ForwardSessionKey, db str
7367
}
7468

7569
session := &ForwardSession{
76-
Key: key,
77-
Database: db,
78-
LastActivity: time.Now(),
70+
Key: key,
71+
Database: db,
72+
ConnSession: newForwardConnSession(key.SessionID, db),
73+
LastActivity: time.Now(),
74+
requestStates: make(map[uint64]*forwardRequestState),
75+
requestOrder: make([]uint64, 0, 16),
7976
}
8077
m.sessions[key] = session
8178
return session
@@ -148,55 +145,138 @@ func (m *ForwardSessionManager) Stop() {
148145
close(m.stopCh)
149146
}
150147

151-
// BeginTransaction starts a new transaction in the session
152-
func (s *ForwardSession) BeginTransaction(txnID uint64, startTS hlc.Timestamp, db string) error {
148+
func newForwardConnSession(connID uint64, db string) *protocol.ConnectionSession {
149+
return &protocol.ConnectionSession{
150+
ConnID: connID,
151+
CurrentDatabase: db,
152+
TranspilationEnabled: false, // Forwarded SQL is already transpiled on replica
153+
}
154+
}
155+
156+
// BeginRequest reserves a request slot for idempotent dedupe.
157+
// Returns (state, true) for new requests, (state, false) for retries/duplicates.
158+
func (s *ForwardSession) BeginRequest(requestID uint64) (*forwardRequestState, bool) {
159+
if requestID == 0 {
160+
return nil, true
161+
}
162+
153163
s.mu.Lock()
154164
defer s.mu.Unlock()
155165

156-
if s.ActiveTxn != nil {
157-
return errTransactionAlreadyActive
166+
if state, ok := s.requestStates[requestID]; ok {
167+
return state, false
158168
}
159169

160-
s.ActiveTxn = &ForwardTransaction{
161-
TxnID: txnID,
162-
StartTS: startTS,
163-
Statements: make([]BufferedStatement, 0),
164-
Database: db,
170+
state := &forwardRequestState{
171+
done: make(chan struct{}),
165172
}
173+
s.requestStates[requestID] = state
174+
s.requestOrder = append(s.requestOrder, requestID)
166175
s.LastActivity = time.Now()
167-
return nil
176+
return state, true
168177
}
169178

170-
// AddStatement adds a statement with its params to the active transaction
171-
func (s *ForwardSession) AddStatement(sql string, params []any) error {
172-
s.mu.Lock()
173-
defer s.mu.Unlock()
174-
175-
if s.ActiveTxn == nil {
176-
return errNoActiveTransaction
179+
// WaitForRequest waits until an in-flight duplicate request finishes.
180+
func (s *ForwardSession) WaitForRequest(ctx context.Context, state *forwardRequestState) (*ForwardQueryResponse, error) {
181+
if state == nil {
182+
return nil, nil
177183
}
178184

179-
s.ActiveTxn.Statements = append(s.ActiveTxn.Statements, BufferedStatement{
180-
SQL: sql,
181-
Params: params,
182-
})
183-
s.LastActivity = time.Now()
184-
return nil
185+
select {
186+
case <-state.done:
187+
return cloneForwardResponse(state.resp), nil
188+
case <-ctx.Done():
189+
return nil, ctx.Err()
190+
}
185191
}
186192

187-
// GetTransaction returns the active transaction or nil
188-
func (s *ForwardSession) GetTransaction() *ForwardTransaction {
193+
// CompleteRequest stores the response and unblocks duplicate waiters.
194+
func (s *ForwardSession) CompleteRequest(requestID uint64, state *forwardRequestState, resp *ForwardQueryResponse) {
195+
if requestID == 0 || state == nil {
196+
return
197+
}
198+
189199
s.mu.Lock()
190200
defer s.mu.Unlock()
191-
return s.ActiveTxn
201+
202+
state.resp = cloneForwardResponse(resp)
203+
close(state.done)
204+
s.LastActivity = time.Now()
205+
s.pruneCompletedRequestsLocked()
192206
}
193207

194-
// ClearTransaction clears the active transaction
195-
func (s *ForwardSession) ClearTransaction() {
208+
// Execute serializes all operations for a forwarded session, preserving
209+
// single-connection ordering semantics and coordinator transaction state.
210+
func (s *ForwardSession) Execute(database string, fn func(connSession *protocol.ConnectionSession) (*ForwardQueryResponse, error)) (*ForwardQueryResponse, error) {
211+
s.execMu.Lock()
212+
defer s.execMu.Unlock()
213+
196214
s.mu.Lock()
197-
defer s.mu.Unlock()
198-
s.ActiveTxn = nil
215+
s.ensureDatabaseLocked(database)
216+
connSession := s.ConnSession
199217
s.LastActivity = time.Now()
218+
s.mu.Unlock()
219+
220+
if connSession == nil {
221+
return nil, errors.New("forward session connection not initialized")
222+
}
223+
224+
return fn(connSession)
225+
}
226+
227+
func (s *ForwardSession) HasActiveTransaction() bool {
228+
s.mu.Lock()
229+
connSession := s.ConnSession
230+
s.mu.Unlock()
231+
232+
if connSession == nil {
233+
return false
234+
}
235+
return connSession.InTransaction()
236+
}
237+
238+
func (s *ForwardSession) ensureDatabaseLocked(db string) {
239+
if db == "" {
240+
return
241+
}
242+
s.Database = db
243+
if s.ConnSession == nil {
244+
s.ConnSession = newForwardConnSession(s.Key.SessionID, db)
245+
return
246+
}
247+
s.ConnSession.CurrentDatabase = db
248+
}
249+
250+
func (s *ForwardSession) pruneCompletedRequestsLocked() {
251+
if len(s.requestStates) <= maxCachedForwardRequests {
252+
return
253+
}
254+
255+
filtered := make([]uint64, 0, len(s.requestOrder))
256+
for _, requestID := range s.requestOrder {
257+
state, ok := s.requestStates[requestID]
258+
if !ok {
259+
continue
260+
}
261+
if len(s.requestStates) > maxCachedForwardRequests {
262+
select {
263+
case <-state.done:
264+
delete(s.requestStates, requestID)
265+
continue
266+
default:
267+
}
268+
}
269+
filtered = append(filtered, requestID)
270+
}
271+
s.requestOrder = filtered
272+
}
273+
274+
func cloneForwardResponse(resp *ForwardQueryResponse) *ForwardQueryResponse {
275+
if resp == nil {
276+
return nil
277+
}
278+
cloned := *resp
279+
return &cloned
200280
}
201281

202282
// Touch updates the last activity timestamp

0 commit comments

Comments
 (0)