Skip to content

Commit 43ba813

Browse files
committed
MB-60659 implement cbgt rollback hook
context: https://review.couchbase.org/c/cbgt/+/205499 Aim is to make rollback phases atomic and serialize them as a whole. This PR register the cbft specific rollback steps (rolling back the scorch) to cbgt as a hook. Change-Id: Id7559b4708b911414e3666bc513393f566f1e5b0 Reviewed-on: https://review.couchbase.org/c/cbft/+/205500 Tested-by: Mohd Shaad Khan <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Aditi Ahuja <[email protected]> Well-Formed: Build Bot <[email protected]>
1 parent 0eb7a62 commit 43ba813

File tree

3 files changed

+95
-45
lines changed

3 files changed

+95
-45
lines changed

pindex_bleve.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -306,25 +306,33 @@ func (sr *SearchRequest) ConvertToBleveSearchRequest() (*bleve.SearchRequest, er
306306
return r, r.Validate()
307307
}
308308

309+
type RollbackInfo struct {
310+
partition string
311+
vBucketUUID uint64
312+
rollbackSeq uint64
313+
}
314+
309315
type BleveDest struct {
310316
path string
311317
sourceName string
312318

313319
bleveDocConfig BleveDocumentConfig
314320

315321
// Invoked when mgr should rollback this BleveDest
316-
rollback func()
322+
rollback func()
323+
rollbackInfo *RollbackInfo
317324

318325
stats *cbgt.PIndexStoreStats
319326
copyStats *CopyPartitionStats
320327

321-
m sync.RWMutex // Protects the fields that follow.
322-
bindex bleve.Index
323-
partitions map[string]*BleveDestPartition
324-
rev uint64 // Incremented whenever bindex changes.
325-
batchReqChs []chan *batchRequest
326-
stopCh chan struct{}
327-
removeCh chan struct{}
328+
m sync.RWMutex // Protects the fields that follow.
329+
bindex bleve.Index
330+
partitions map[string]*BleveDestPartition
331+
rev uint64 // Incremented whenever bindex changes.
332+
batchReqChs []chan *batchRequest
333+
stopCh chan struct{}
334+
removeCh chan struct{}
335+
isRollbackInProgress bool
328336
}
329337

330338
// Used to track state for a single partition.

pindex_bleve_rollback.go

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,63 @@ import (
1515

1616
"github.com/blevesearch/bleve/v2/index/scorch"
1717

18+
"github.com/couchbase/cbgt"
1819
log "github.com/couchbase/clog"
1920
)
2021

22+
func init() {
23+
cbgt.RollbackHook = func(phase cbgt.RollbackPhase, pindex *cbgt.PIndex) (err error) {
24+
df, ok := pindex.Dest.(*cbgt.DestForwarder)
25+
if !ok || df == nil {
26+
return fmt.Errorf("rollbackhook: invalid dest for pindex:%s",
27+
pindex.Name)
28+
}
29+
bd, ok := df.DestProvider.(*BleveDest)
30+
if !ok || bd == nil {
31+
return fmt.Errorf("rollbackhook: invalid destProvider for pindex:%s",
32+
pindex.Name)
33+
}
34+
35+
bd.m.Lock()
36+
defer bd.m.Unlock()
37+
38+
switch phase {
39+
case cbgt.RollbackInit:
40+
41+
pindexName := bd.bindex.Name()
42+
wasClosed, wasPartial, err := bd.partialRollbackLOCKED()
43+
44+
log.Printf("pindex_bleve_rollback: path: %s, wasClosed: %t, "+
45+
"wasPartial: %t, err: %v", bd.path, wasClosed, wasPartial, err)
46+
47+
if !wasClosed {
48+
bd.closeLOCKED(false)
49+
}
50+
51+
if !wasPartial {
52+
atomic.AddUint64(&TotRollbackFull, 1)
53+
if ServerlessMode {
54+
// this is a full rollback, so the paritition is going to be
55+
// rebuilt a fresh. The reason we are refunding over here is
56+
// because this is not a end-user problem, but rather a
57+
// couchbase cluster problem. So, once the partition is built
58+
// afresh, we would essentially any loss of cost by charging
59+
// for 0 - original high seq no. and after that we will
60+
// actually start costing the user.
61+
RollbackRefund(pindexName, bd.sourceName, 0)
62+
}
63+
os.RemoveAll(bd.path) // Full rollback to zero.
64+
} else {
65+
atomic.AddUint64(&TotRollbackPartial, 1)
66+
}
67+
case cbgt.RollbackCompleted:
68+
bd.isRollbackInProgress = false
69+
}
70+
71+
return nil
72+
}
73+
}
74+
2175
func (t *BleveDest) Rollback(partition string, vBucketUUID uint64, rollbackSeq uint64) error {
2276
t.AddError("dest rollback", partition, nil, rollbackSeq, nil, nil)
2377

@@ -28,47 +82,27 @@ func (t *BleveDest) Rollback(partition string, vBucketUUID uint64, rollbackSeq u
2882

2983
// The BleveDest may be closed due to another partition(BleveDestPartition) of
3084
// the same pindex being rolled back earlier.
31-
if t.bindex != nil {
32-
pindexName := t.bindex.Name()
33-
wasClosed, wasPartial, err := t.partialRollbackLOCKED(partition,
34-
vBucketUUID, rollbackSeq)
35-
36-
log.Printf("pindex_bleve_rollback: path: %s,"+
37-
" wasClosed: %t, wasPartial: %t, err: %v",
38-
t.path, wasClosed, wasPartial, err)
39-
40-
if !wasClosed {
41-
t.closeLOCKED(false)
42-
}
85+
if t.bindex == nil || t.isRollbackInProgress {
86+
return nil
87+
}
4388

44-
if !wasPartial {
45-
atomic.AddUint64(&TotRollbackFull, 1)
46-
if ServerlessMode {
47-
// this is a full rollback, so the paritition is going to be
48-
// rebuilt a fresh. The reason we are refunding over here is
49-
// because this is not a end-user problem, but rather a
50-
// couchbase cluster problem. So, once the partition is built
51-
// afresh, we would essentially any loss of cost by charging
52-
// for 0 - original high seq no. and after that we will
53-
// actually start costing the user.
54-
RollbackRefund(pindexName, t.sourceName, 0)
55-
}
56-
os.RemoveAll(t.path) // Full rollback to zero.
57-
} else {
58-
atomic.AddUint64(&TotRollbackPartial, 1)
59-
}
89+
t.isRollbackInProgress = true
6090

61-
// Whether partial or full rollback, restart the BleveDest so that
62-
// feeds are restarted.
63-
t.rollback()
91+
t.rollbackInfo = &RollbackInfo{
92+
partition: partition,
93+
vBucketUUID: vBucketUUID,
94+
rollbackSeq: rollbackSeq,
6495
}
6596

97+
// Whether partial or full rollback, restart the BleveDest so that
98+
// feeds are restarted.
99+
t.rollback()
100+
66101
return nil
67102
}
68103

69104
// Attempt partial rollback.
70-
func (t *BleveDest) partialRollbackLOCKED(partition string,
71-
vBucketUUID uint64, rollbackSeq uint64) (bool, bool, error) {
105+
func (t *BleveDest) partialRollbackLOCKED() (bool, bool, error) {
72106
if t.bindex == nil {
73107
return false, false, nil
74108
}
@@ -79,8 +113,7 @@ func (t *BleveDest) partialRollbackLOCKED(partition string,
79113
}
80114

81115
if sh, ok := index.(*scorch.Scorch); ok {
82-
return t.partialScorchRollbackLOCKED(sh,
83-
partition, vBucketUUID, rollbackSeq)
116+
return t.partialScorchRollbackLOCKED(sh)
84117
}
85118

86119
return false, false, fmt.Errorf("unknown index type")

pindex_bleve_scorch_rollback.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,17 @@ import (
2525
// Determine which point is of relevance to get to at or before the
2626
// wanted rollbackSeq and vBucketUUID. If found, rollback to that
2727
// particular point.
28-
func (t *BleveDest) partialScorchRollbackLOCKED(sh *scorch.Scorch,
29-
partition string, vBucketUUID, rollbackSeq uint64) (bool, bool, error) {
28+
func (t *BleveDest) partialScorchRollbackLOCKED(sh *scorch.Scorch) (
29+
bool, bool, error) {
30+
if t == nil || t.rollbackInfo == nil {
31+
return false, false, fmt.Errorf("pindex_bleve_scorch_rollback: invalid" +
32+
" bleveDest or rollbackInfo")
33+
}
34+
35+
partition := t.rollbackInfo.partition
36+
vBucketUUID := t.rollbackInfo.vBucketUUID
37+
rollbackSeq := t.rollbackInfo.rollbackSeq
38+
3039
seqMaxKey := []byte(partition)
3140

3241
// get vBucketMap/Opaque key

0 commit comments

Comments
 (0)