Skip to content

Commit 3b1bf69

Browse files
authored
feat: finite stream mode added
1 parent 0edea3b commit 3b1bf69

File tree

10 files changed

+218
-33
lines changed

10 files changed

+218
-33
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ $ go get github.com/Trendyol/go-dcp
8989
| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. |
9090
| `debug` | bool | no | false | For debugging purpose. |
9191
| `dcp.bufferSize` | int | no | 16mb | DCP internal queue buffer size (x Node Count). Check this if you get OOM Killed. |
92+
| `dcp.mode` | string | no | infinite | Set DCP mode `finite` If you want to listen to DCP events until now. Set DCP mode `infinite` If you want to listen to DCP events infinitely. |
9293
| `dcp.connectionBufferSize` | uint, string | no | 20mb | DCP tcp connection buffer size (x Node Count). Check this if you get OOM Killed. |
9394
| `dcp.connectionTimeout` | time.Duration | no | 1m | DCP connection timeout. |
9495
| `dcp.maxQueueSize` | int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full. |

config/dcp.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ const (
4343
KubernetesLeaderElectorRetryPeriodConfig = "retryPeriod"
4444
)
4545

46+
type DcpMode string
47+
48+
const (
49+
DcpModeInfinite DcpMode = "infinite"
50+
DcpModeFinite DcpMode = "finite"
51+
)
52+
4653
type DCPGroupMembership struct {
4754
Config map[string]string `yaml:"config"`
4855
Type string `yaml:"type"`
@@ -66,6 +73,7 @@ type ExternalDcpConfig struct {
6673

6774
type ExternalDcp struct {
6875
BufferSize any `yaml:"bufferSize"`
76+
Mode DcpMode `yaml:"mode"`
6977
ConnectionBufferSize any `yaml:"connectionBufferSize"`
7078
Listener DCPListener `yaml:"listener"`
7179
Group DCPGroup `yaml:"group"`
@@ -151,6 +159,10 @@ func (c *Dcp) IsCouchbaseMetadata() bool {
151159
return c.Metadata.Type == MetadataTypeCouchbase
152160
}
153161

162+
func (c *Dcp) IsDcpModeFinite() bool {
163+
return c.Dcp.Mode == DcpModeFinite
164+
}
165+
154166
func (c *Dcp) IsFileMetadata() bool {
155167
return c.Metadata.Type == MetadataTypeFile
156168
}

config/dcp_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,61 @@ func TestApplyDefaultMetadata(t *testing.T) {
323323
t.Errorf("Metadata.Type is not set to expected value")
324324
}
325325
}
326+
327+
func TestDcpMode(t *testing.T) {
328+
t.Run("it should return true when dcp mode finite", func(t *testing.T) {
329+
// Arrange
330+
dcp := &Dcp{
331+
Dcp: ExternalDcp{
332+
Mode: DcpModeFinite,
333+
},
334+
}
335+
336+
expectedValue := true
337+
338+
// Act
339+
actualValue := dcp.IsDcpModeFinite()
340+
341+
// Assert
342+
if expectedValue != actualValue {
343+
t.Errorf("isDcpModeFinite check result. got %v want %v", actualValue, expectedValue)
344+
}
345+
})
346+
347+
t.Run("it should return false when dcp mode infinite", func(t *testing.T) {
348+
// Arrange
349+
dcp := &Dcp{
350+
Dcp: ExternalDcp{
351+
Mode: DcpModeInfinite,
352+
},
353+
}
354+
355+
expectedValue := false
356+
357+
// Act
358+
actualValue := dcp.IsDcpModeFinite()
359+
360+
// Assert
361+
if expectedValue != actualValue {
362+
t.Errorf("isDcpModeFinite check result. got %v want %v", actualValue, expectedValue)
363+
}
364+
})
365+
366+
t.Run("it should return false when dcp mode empty", func(t *testing.T) {
367+
// Arrange
368+
dcp := &Dcp{
369+
Dcp: ExternalDcp{
370+
Mode: "",
371+
},
372+
}
373+
expectedValue := false
374+
375+
// Act
376+
actualValue := dcp.IsDcpModeFinite()
377+
378+
// Assert
379+
if expectedValue != actualValue {
380+
t.Errorf("isDcpModeFinite check result. got %v want %v", actualValue, expectedValue)
381+
}
382+
})
383+
}

couchbase/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ func (s *client) GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
600600
func (s *client) openStreamWithRollback(vbID uint16,
601601
failedSeqNo gocbcore.SeqNo,
602602
rollbackSeqNo gocbcore.SeqNo,
603+
latestSeqNo gocbcore.SeqNo,
603604
observer Observer,
604605
openStreamOptions gocbcore.OpenStreamOptions,
605606
) error {
@@ -635,7 +636,7 @@ func (s *client) openStreamWithRollback(vbID uint16,
635636
0,
636637
targetUUID,
637638
rollbackSeqNo,
638-
0xffffffffffffffff,
639+
latestSeqNo,
639640
rollbackSeqNo,
640641
rollbackSeqNo,
641642
observer,
@@ -696,7 +697,7 @@ func (s *client) OpenStream(
696697
0x80,
697698
offset.VbUUID,
698699
gocbcore.SeqNo(offset.SeqNo),
699-
0xffffffffffffffff,
700+
gocbcore.SeqNo(offset.LatestSeqNo),
700701
gocbcore.SeqNo(offset.StartSeqNo),
701702
gocbcore.SeqNo(offset.EndSeqNo),
702703
observer,
@@ -721,10 +722,10 @@ func (s *client) OpenStream(
721722
if err != nil {
722723
if rollbackErr, ok := err.(gocbcore.DCPRollbackError); ok {
723724
logger.Log.Info("need to rollback for vbID: %d, vbUUID: %d", vbID, offset.VbUUID)
724-
return s.openStreamWithRollback(vbID, gocbcore.SeqNo(offset.SeqNo), rollbackErr.SeqNo, observer, openStreamOptions)
725+
return s.openStreamWithRollback(vbID, gocbcore.SeqNo(offset.SeqNo), rollbackErr.SeqNo,
726+
gocbcore.SeqNo(offset.LatestSeqNo), observer, openStreamOptions)
725727
}
726728
}
727-
728729
return err
729730
}
730731

helpers/constants.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ const (
99
MembershipChangedBusEventName string = "membershipChanged"
1010
PersistSeqNoChangedBusEventName string = "persistSeqNoChanged"
1111

12-
JSONFlags uint32 = 50333696
12+
JSONFlags uint32 = 50333696
13+
MaxIntValue uint64 = 0xffffffffffffffff
1314
)

models/type.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import (
88

99
type Offset struct {
1010
*SnapshotMarker
11-
VbUUID gocbcore.VbUUID
12-
SeqNo uint64
11+
VbUUID gocbcore.VbUUID
12+
SeqNo uint64
13+
LatestSeqNo uint64
1314
}
1415

1516
type VbIDRange struct {

stream/checkpoint.go

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"sync"
66
"time"
77

8+
"github.com/Trendyol/go-dcp/stream/offset"
9+
"github.com/couchbase/gocbcore/v10"
10+
811
"github.com/Trendyol/go-dcp/wrapper"
912

1013
"github.com/Trendyol/go-dcp/config"
@@ -16,8 +19,6 @@ import (
1619
"github.com/Trendyol/go-dcp/models"
1720

1821
"github.com/Trendyol/go-dcp/logger"
19-
20-
"github.com/couchbase/gocbcore/v10"
2122
)
2223

2324
const (
@@ -40,16 +41,17 @@ type CheckpointMetric struct {
4041
}
4142

4243
type checkpoint struct {
43-
stream Stream
44-
client couchbase.Client
45-
metadata metadata.Metadata
46-
config *config.Dcp
47-
saveLock *sync.Mutex
48-
loadLock *sync.Mutex
49-
metric *CheckpointMetric
50-
bucketUUID string
51-
vbIds []uint16
52-
running bool
44+
stream Stream
45+
client couchbase.Client
46+
metadata metadata.Metadata
47+
config *config.Dcp
48+
saveLock *sync.Mutex
49+
loadLock *sync.Mutex
50+
metric *CheckpointMetric
51+
offsetLatestSeqNoInit *offset.OffsetLatestSeqNoInit
52+
bucketUUID string
53+
vbIds []uint16
54+
running bool
5355
}
5456

5557
func (s *checkpoint) Save() {
@@ -150,13 +152,16 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
150152
panic(err)
151153
}
152154

155+
latestOffsetSeqNo := s.offsetLatestSeqNoInit.InitializeLatestSeqNo(currentSeqNo)
156+
153157
offsets.Store(vbID, &models.Offset{
154158
SnapshotMarker: &models.SnapshotMarker{
155159
StartSeqNo: currentSeqNo,
156160
EndSeqNo: currentSeqNo,
157161
},
158-
VbUUID: failOverLogs[0].VbUUID,
159-
SeqNo: currentSeqNo,
162+
VbUUID: failOverLogs[0].VbUUID,
163+
SeqNo: currentSeqNo,
164+
LatestSeqNo: latestOffsetSeqNo,
160165
})
161166

162167
return true
@@ -176,13 +181,16 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
176181
panic(err)
177182
}
178183

184+
latestOffsetSeqNo := s.offsetLatestSeqNoInit.InitializeLatestSeqNo(latestSeqNo)
185+
179186
offsets.Store(vbID, &models.Offset{
180187
SnapshotMarker: &models.SnapshotMarker{
181188
StartSeqNo: doc.Checkpoint.Snapshot.StartSeqNo,
182189
EndSeqNo: doc.Checkpoint.Snapshot.EndSeqNo,
183190
},
184-
VbUUID: gocbcore.VbUUID(doc.Checkpoint.VbUUID),
185-
SeqNo: doc.Checkpoint.SeqNo,
191+
VbUUID: gocbcore.VbUUID(doc.Checkpoint.VbUUID),
192+
SeqNo: doc.Checkpoint.SeqNo,
193+
LatestSeqNo: latestOffsetSeqNo,
186194
})
187195

188196
return true
@@ -242,16 +250,18 @@ func NewCheckpoint(
242250
client couchbase.Client,
243251
metadata metadata.Metadata,
244252
config *config.Dcp,
253+
offsetLatestSeqNoInit *offset.OffsetLatestSeqNoInit,
245254
) Checkpoint {
246255
return &checkpoint{
247-
client: client,
248-
stream: stream,
249-
vbIds: vbIds,
250-
bucketUUID: getBucketUUID(client),
251-
metadata: metadata,
252-
config: config,
253-
saveLock: &sync.Mutex{},
254-
loadLock: &sync.Mutex{},
255-
metric: &CheckpointMetric{},
256+
client: client,
257+
stream: stream,
258+
vbIds: vbIds,
259+
bucketUUID: getBucketUUID(client),
260+
metadata: metadata,
261+
config: config,
262+
saveLock: &sync.Mutex{},
263+
loadLock: &sync.Mutex{},
264+
metric: &CheckpointMetric{},
265+
offsetLatestSeqNoInit: offsetLatestSeqNoInit,
256266
}
257267
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package offset
2+
3+
import (
4+
"github.com/Trendyol/go-dcp/config"
5+
"github.com/Trendyol/go-dcp/helpers"
6+
)
7+
8+
type OffsetLatestSeqNoInit struct {
9+
config *config.Dcp
10+
}
11+
12+
func NewOffsetLatestSeqNoInit(
13+
config *config.Dcp,
14+
) *OffsetLatestSeqNoInit {
15+
return &OffsetLatestSeqNoInit{
16+
config,
17+
}
18+
}
19+
20+
func (l *OffsetLatestSeqNoInit) InitializeLatestSeqNo(vBucketSeqNo uint64) uint64 {
21+
var latestSeqNo uint64
22+
if l.config.IsDcpModeFinite() {
23+
latestSeqNo = vBucketSeqNo
24+
} else {
25+
latestSeqNo = helpers.MaxIntValue
26+
}
27+
return latestSeqNo
28+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package offset
2+
3+
import (
4+
"testing"
5+
6+
"github.com/Trendyol/go-dcp/config"
7+
"github.com/Trendyol/go-dcp/helpers"
8+
)
9+
10+
func TestOffsetLatestSeqNoInit_InitializeLatestSeqNo(t *testing.T) {
11+
t.Run("it should return max integer value when dcp mode infinite", func(t *testing.T) {
12+
// Arrange
13+
c := &config.Dcp{
14+
Dcp: config.ExternalDcp{
15+
Mode: config.DcpModeInfinite,
16+
},
17+
}
18+
initializer := NewOffsetLatestSeqNoInit(c)
19+
var vBucketLatestSeqNo uint64 = 14
20+
expectedOffsetLatestSeqNo := helpers.MaxIntValue
21+
22+
// Act
23+
actualOffsetLatestSeqNo := initializer.InitializeLatestSeqNo(vBucketLatestSeqNo)
24+
25+
// Assert
26+
if expectedOffsetLatestSeqNo != actualOffsetLatestSeqNo {
27+
t.Errorf("offsetLatestSeqNo is not set to expected value. got %v want %v", actualOffsetLatestSeqNo, expectedOffsetLatestSeqNo)
28+
}
29+
})
30+
31+
t.Run("it should return max integer when dcp mode empty", func(t *testing.T) {
32+
// Arrange
33+
c := &config.Dcp{
34+
Dcp: config.ExternalDcp{
35+
Mode: "",
36+
},
37+
}
38+
initializer := NewOffsetLatestSeqNoInit(c)
39+
var vBucketLatestSeqNo uint64 = 14
40+
expectedOffsetLatestSeqNo := helpers.MaxIntValue
41+
42+
// Act
43+
actualOffsetLatestSeqNo := initializer.InitializeLatestSeqNo(vBucketLatestSeqNo)
44+
45+
// Assert
46+
if expectedOffsetLatestSeqNo != actualOffsetLatestSeqNo {
47+
t.Errorf("offsetLatestSeqNo is not set to expected value. got %v want %v", actualOffsetLatestSeqNo, expectedOffsetLatestSeqNo)
48+
}
49+
})
50+
51+
t.Run("it should return vBucket latestSeqNo when dcp mode finite", func(t *testing.T) {
52+
// Arrange
53+
c := &config.Dcp{
54+
Dcp: config.ExternalDcp{
55+
Mode: config.DcpModeFinite,
56+
},
57+
}
58+
initializer := NewOffsetLatestSeqNoInit(c)
59+
var vBucketLatestSeqNo uint64 = 14
60+
61+
// Act
62+
actualOffsetLatestSeqNo := initializer.InitializeLatestSeqNo(vBucketLatestSeqNo)
63+
64+
// Assert
65+
if vBucketLatestSeqNo != actualOffsetLatestSeqNo {
66+
t.Errorf("offsetLatestSeqNo is not set to expected value. got %v want %v", actualOffsetLatestSeqNo, vBucketLatestSeqNo)
67+
}
68+
})
69+
}

stream/stream.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"sync/atomic"
88
"time"
99

10+
"github.com/Trendyol/go-dcp/stream/offset"
11+
1012
"github.com/Trendyol/go-dcp/tracing"
1113

1214
"github.com/Trendyol/go-dcp/membership"
@@ -243,7 +245,9 @@ func (s *stream) Open() {
243245

244246
s.activeStreams.Swap(int32(len(vbIDs)))
245247

246-
s.checkpoint = NewCheckpoint(s, vbIDs, s.client, s.metadata, s.config)
248+
latestSeqNoInitializer := offset.NewOffsetLatestSeqNoInit(s.config)
249+
250+
s.checkpoint = NewCheckpoint(s, vbIDs, s.client, s.metadata, s.config, latestSeqNoInitializer)
247251
s.offsets, s.dirtyOffsets, s.anyDirtyOffset = s.checkpoint.Load()
248252

249253
s.observers = wrapper.CreateConcurrentSwissMap[uint16, couchbase.Observer](1024)

0 commit comments

Comments
 (0)