Skip to content

Commit ff7a6dc

Browse files
authored
[3.2.2 backport] CBG-4446 create a cancel context inside BlipSyncContext (#7314)
* CBG-4370 preqreq deduplicate active replicator pull changes (#7211) * CBG-4370 create a cancel context inside BlipSyncContext (#7201) * CBG-4370 create a cancel context inside BlipSyncContext This cancel context allows a forceable closure of the underlying blip connection. In the case there is a continuous pull replication and there is an error on the changes feed, the only way to stop the pull replication is to shut down the connection. CBL clients do not listen to unsolicited error messages. * Avoid refactoring with the change * pass lint * test active replicator reconnection * test fixup * lint correctly * PR comments * Acquire read lock to present data race * switch query to use all docs index * fix lint issue
1 parent 3754c26 commit ff7a6dc

14 files changed

+243
-98
lines changed

base/leaky_bucket.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,12 @@ type LeakyBucketConfig struct {
125125

126126
// Returns a partial error the first time ViewCustom is called
127127
FirstTimeViewCustomPartialError bool
128-
PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called
128+
129+
// QueryCallback allows tests to set a callback that will be issued prior to issuing a view query
130+
QueryCallback func(ddoc, viewName string, params map[string]any) error
131+
PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called
132+
133+
N1QLQueryCallback func(ctx context.Context, statement string, params map[string]any, consistency ConsistencyMode, adhoc bool) error
129134

130135
PostN1QLQueryCallback func()
131136

base/leaky_datastore.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ func (lds *LeakyDataStore) ViewQuery(ctx context.Context, ddoc, name string, par
241241
if !ok {
242242
return nil, errors.New("bucket does not support views")
243243
}
244+
if lds.config.QueryCallback != nil {
245+
err := lds.config.QueryCallback(ddoc, name, params)
246+
if err != nil {
247+
return nil, err
248+
}
249+
}
244250
iterator, err := vs.ViewQuery(ctx, ddoc, name, params)
245251

246252
if lds.config.FirstTimeViewCustomPartialError {
@@ -324,10 +330,14 @@ func (lds *LeakyDataStore) SetFirstTimeViewCustomPartialError(val bool) {
324330
lds.config.FirstTimeViewCustomPartialError = val
325331
}
326332

327-
func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]interface{})) {
333+
func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]any)) {
328334
lds.config.PostQueryCallback = callback
329335
}
330336

337+
func (lds *LeakyDataStore) SetQueryCallback(fn func(ddoc, viewName string, params map[string]any) error) {
338+
lds.config.QueryCallback = fn
339+
}
340+
331341
func (lds *LeakyDataStore) SetPostN1QLQueryCallback(callback func()) {
332342
lds.config.PostN1QLQueryCallback = callback
333343
}
@@ -447,6 +457,12 @@ func (lds *LeakyDataStore) Query(ctx context.Context, statement string, params m
447457
if err != nil {
448458
return nil, err
449459
}
460+
if lds.config.N1QLQueryCallback != nil {
461+
err := lds.config.N1QLQueryCallback(ctx, statement, params, consistency, adhoc)
462+
if err != nil {
463+
return nil, err
464+
}
465+
}
450466
iterator, err := n1qlStore.Query(ctx, statement, params, consistency, adhoc)
451467

452468
if lds.config.PostN1QLQueryCallback != nil {

db/active_replicator.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
208208
arc.replicationStats.NumConnectAttempts.Add(1)
209209

210210
var originPatterns []string // no origin headers for ISGR
211-
// NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently
212-
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil)
211+
cancelCtx, cancelFunc := context.WithCancel(context.Background())
212+
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx)
213213
if err != nil {
214+
cancelFunc()
214215
return nil, nil, err
215216
}
216217
blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval
@@ -221,7 +222,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
221222
}
222223
}
223224

224-
bsc = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats)
225+
bsc, err = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats, cancelFunc)
226+
if err != nil {
227+
return nil, nil, err
228+
}
225229

226230
bsc.loggingCtx = base.CorrelationIDLogCtx(
227231
arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx),

db/active_replicator_common.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"errors"
1616
"expvar"
1717
"sync"
18+
"sync/atomic"
1819
"testing"
1920
"time"
2021

@@ -52,7 +53,7 @@ type activeReplicatorCommon struct {
5253
ctxCancel context.CancelFunc
5354
reconnectActive base.AtomicBool // Tracks whether reconnect goroutine is active
5455
replicatorConnectFn func() error // the function called inside reconnectLoop.
55-
activeSendChanges base.AtomicBool // Tracks whether sendChanges goroutine is active.
56+
activeSendChanges atomic.Int32 // Tracks whether sendChanges goroutines are active, there is one per collection.
5657
namedCollections map[base.ScopeAndCollectionName]*activeReplicatorCollection // set only if the replicator is running with collections - access with forEachCollection
5758
defaultCollection *activeReplicatorCollection // set only if the replicator is not running with collections - access with forEachCollection
5859
}
@@ -312,8 +313,9 @@ func (a *activeReplicatorCommon) getState() string {
312313
return a.state
313314
}
314315

315-
// requires a.stateErrorLock
316316
func (a *activeReplicatorCommon) _getStateWithErrorMessage() (state string, lastErrorMessage string) {
317+
a.stateErrorLock.RLock()
318+
defer a.stateErrorLock.RUnlock()
317319
if a.lastError == nil {
318320
return a.state, ""
319321
}
@@ -356,6 +358,14 @@ func (a *activeReplicatorCommon) getCheckpointHighSeq() string {
356358
return highSeqStr
357359
}
358360

361+
// publishStatus updates the replication status document in the metadata store.
362+
func (a *activeReplicatorCommon) publishStatus() {
363+
a.lock.Lock()
364+
defer a.lock.Unlock()
365+
a._publishStatus()
366+
}
367+
368+
// _publishStatus updates the replication status document in the metadata store. Requires holding a.lock before calling.
359369
func (a *activeReplicatorCommon) _publishStatus() {
360370
status := a._getStatusCallback()
361371
err := setLocalStatus(a.ctx, a.config.ActiveDB.MetadataStore, a.statusKey, status, int(a.config.ActiveDB.Options.LocalDocExpirySecs))

db/active_replicator_push.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (apr *ActivePushReplicator) Stop() error {
268268
return err
269269
}
270270
teardownStart := time.Now()
271-
for apr.activeSendChanges.IsTrue() && (time.Since(teardownStart) < time.Second*10) {
271+
for apr.activeSendChanges.Load() != 0 && (time.Since(teardownStart) < time.Second*10) {
272272
time.Sleep(10 * time.Millisecond)
273273
}
274274
return nil
@@ -298,8 +298,17 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error {
298298
bh.collection = dbCollectionWithUser
299299
bh.loggingCtx = bh.collection.AddCollectionContext(bh.BlipSyncContext.loggingCtx)
300300

301+
return apr._startSendingChanges(bh, apr.defaultCollection.Checkpointer.lastCheckpointSeq)
302+
}
303+
304+
// _startSendingChanges starts a changes feed for a given collection in a goroutine and starts sending changes to the passive peer from a starting sequence value.
305+
func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since SequenceID) error {
306+
collectionCtx, err := bh.collections.get(bh.collectionIdx)
307+
if err != nil {
308+
return err
309+
}
301310
var channels base.Set
302-
if filteredChannels := apr.config.getFilteredChannels(nil); len(filteredChannels) > 0 {
311+
if filteredChannels := apr.config.getFilteredChannels(bh.collectionIdx); len(filteredChannels) > 0 {
303312
channels = base.SetFromArray(filteredChannels)
304313
}
305314

@@ -320,17 +329,12 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error {
320329
// No special handling for error
321330
}
322331

323-
collectionCtx, err := bh.collections.get(nil)
324-
if err != nil {
325-
return err
326-
}
327-
328-
apr.activeSendChanges.Set(true)
332+
apr.activeSendChanges.Add(1)
329333
go func(s *blip.Sender) {
330-
defer apr.activeSendChanges.Set(false)
331-
isComplete := bh.sendChanges(s, &sendChangesOptions{
334+
defer apr.activeSendChanges.Add(-1)
335+
isComplete, err := bh.sendChanges(s, &sendChangesOptions{
332336
docIDs: apr.config.DocIDs,
333-
since: apr.defaultCollection.Checkpointer.lastCheckpointSeq,
337+
since: since,
334338
continuous: apr.config.Continuous,
335339
activeOnly: apr.config.ActiveOnly,
336340
batchSize: int(apr.config.ChangesBatchSize),
@@ -340,11 +344,17 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error {
340344
ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode.
341345
changesCtx: collectionCtx.changesCtx,
342346
})
343-
// On a normal completion, call complete for the replication
347+
if err != nil {
348+
base.InfofCtx(apr.ctx, base.KeyReplicate, "Terminating blip connection due to changes feed error: %v", err)
349+
bh.ctxCancelFunc()
350+
_ = apr.setError(err)
351+
apr.publishStatus()
352+
return
353+
}
344354
if isComplete {
355+
// On a normal completion, call complete for the replication
345356
apr.Complete()
346357
}
347358
}(apr.blipSender)
348-
349359
return nil
350360
}

db/active_replicator_push_collections.go

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ package db
1010

1111
import (
1212
"fmt"
13-
"strings"
1413

15-
"github.com/couchbase/go-blip"
1614
"github.com/couchbase/sync_gateway/base"
1715
)
1816

@@ -52,47 +50,6 @@ func (apr *ActivePushReplicator) _startPushWithCollections() error {
5250
bh.collectionIdx = collectionIdx
5351
bh.loggingCtx = bh.collection.AddCollectionContext(bh.BlipSyncContext.loggingCtx)
5452

55-
var channels base.Set
56-
if filteredChannels := apr.config.getFilteredChannels(collectionIdx); len(filteredChannels) > 0 {
57-
channels = base.SetFromArray(filteredChannels)
58-
}
59-
60-
apr.blipSyncContext.fatalErrorCallback = func(err error) {
61-
if strings.Contains(err.Error(), ErrUseProposeChanges.Message) {
62-
err = ErrUseProposeChanges
63-
_ = apr.setError(PreHydrogenTargetAllowConflictsError)
64-
err = apr.stopAndDisconnect()
65-
if err != nil {
66-
base.ErrorfCtx(apr.ctx, "Failed to stop and disconnect replication: %v", err)
67-
}
68-
} else if strings.Contains(err.Error(), ErrDatabaseWentAway.Message) {
69-
err = apr.reconnect()
70-
if err != nil {
71-
base.ErrorfCtx(apr.ctx, "Failed to reconnect replication: %v", err)
72-
}
73-
}
74-
// No special handling for error
75-
}
76-
apr.activeSendChanges.Set(true)
77-
go func(s *blip.Sender) {
78-
defer apr.activeSendChanges.Set(false)
79-
isComplete := bh.sendChanges(s, &sendChangesOptions{
80-
docIDs: apr.config.DocIDs,
81-
since: replicationCollection.Checkpointer.lastCheckpointSeq,
82-
continuous: apr.config.Continuous,
83-
activeOnly: apr.config.ActiveOnly,
84-
batchSize: int(apr.config.ChangesBatchSize),
85-
revocations: apr.config.PurgeOnRemoval,
86-
channels: channels,
87-
clientType: clientTypeSGR2,
88-
ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode.
89-
changesCtx: c.changesCtx,
90-
})
91-
// On a normal completion, call complete for the replication
92-
if isComplete {
93-
apr.Complete()
94-
}
95-
}(apr.blipSender)
96-
return nil
53+
return apr._startSendingChanges(bh, replicationCollection.Checkpointer.lastCheckpointSeq)
9754
})
9855
}

db/blip_handler.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
348348
}()
349349
// sendChanges runs until blip context closes, or fails due to error
350350
startTime := time.Now()
351-
_ = bh.sendChanges(rq.Sender, &sendChangesOptions{
351+
_, err = bh.sendChanges(rq.Sender, &sendChangesOptions{
352352
docIDs: subChangesParams.docIDs(),
353353
since: subChangesParams.Since(),
354354
continuous: continuous,
@@ -361,6 +361,10 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
361361
changesCtx: collectionCtx.changesCtx,
362362
requestPlusSeq: requestPlusSeq,
363363
})
364+
if err != nil {
365+
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err)
366+
bh.ctxCancelFunc()
367+
}
364368
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
365369
}()
366370

@@ -428,8 +432,8 @@ func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool {
428432
return flag&deletedFlag != 0
429433
}
430434

431-
// Sends all changes since the given sequence
432-
func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (isComplete bool) {
435+
// sendChanges will start a changes feed and send changes. Returns bool to indicate whether the changes feed finished and all changes were sent. The error value is only used to indicate a fatal error, where the blip connection should be terminated. If the blip connection is disconnected by the client, the error will be nil, but the boolean parameter will be false.
436+
func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) {
433437
defer func() {
434438
if panicked := recover(); panicked != nil {
435439
bh.replicationStats.NumHandlersPanicked.Add(1)
@@ -472,11 +476,10 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
472476
changesDb, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx)
473477
if err != nil {
474478
base.WarnfCtx(bh.loggingCtx, "[%s] error sending changes: %v", bh.blipContext.ID, err)
475-
return false
476-
479+
return false, err
477480
}
478481

479-
forceClose := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
482+
forceClose, err := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
480483
base.DebugfCtx(bh.loggingCtx, base.KeySync, " Sending %d changes", len(changes))
481484
for _, change := range changes {
482485
if !strings.HasPrefix(change.ID, "_") {
@@ -538,8 +541,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
538541
}
539542
bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user)
540543
}
541-
542-
return !forceClose
544+
return (err == nil && !forceClose), err
543545
}
544546

545547
func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} {

db/blip_sync_context.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ const (
3434

3535
var ErrClosedBLIPSender = errors.New("use of closed BLIP sender")
3636

37-
func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext {
37+
func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats, ctxCancelFunc context.CancelFunc) (*BlipSyncContext, error) {
38+
if ctxCancelFunc == nil {
39+
return nil, errors.New("cancelCtxFunc is required")
40+
}
3841
maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches
3942
if db.Options.MaxConcurrentChangesBatches != nil {
4043
maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches
@@ -55,6 +58,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
5558
inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches),
5659
inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs),
5760
collections: &blipCollections{},
61+
ctxCancelFunc: ctxCancelFunc,
5862
}
5963
if bsc.replicationStats == nil {
6064
bsc.replicationStats = NewBlipSyncStats()
@@ -86,7 +90,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
8690
bsc.register(profile, handlerFn)
8791
}
8892
}
89-
return bsc
93+
return bsc, nil
9094
}
9195

9296
// BlipSyncContext represents one BLIP connection (socket) opened by a client.
@@ -133,6 +137,8 @@ type BlipSyncContext struct {
133137
collections *blipCollections // all collections handled by blipSyncContext, implicit or via GetCollections
134138

135139
stats blipSyncStats // internal structure to store stats
140+
141+
ctxCancelFunc context.CancelFunc // function to cancel a blip replication
136142
}
137143

138144
// blipSyncStats has support structures to support reporting stats at regular interval
@@ -248,6 +254,7 @@ func (bsc *BlipSyncContext) Close() {
248254
}
249255
bsc.reportStats(true)
250256
close(bsc.terminator)
257+
bsc.ctxCancelFunc()
251258
})
252259
}
253260

0 commit comments

Comments
 (0)