Skip to content

Commit a2c8c59

Browse files
erayarslanziollek
andauthored
feat: implement feature allowing tracking progress of events consumption #124 (#125)
* Implement feature allowing tracking progress of events consuption #124 Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl> * fix typo, remove debug print --------- Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl> Co-authored-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
1 parent 44d5bd5 commit a2c8c59

File tree

3 files changed

+50
-13
lines changed

3 files changed

+50
-13
lines changed

dcp.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type dcp struct {
6666
version *couchbase.Version
6767
bucketInfo *couchbase.BucketInfo
6868
healthCheck couchbase.HealthCheck
69-
listener models.Listener
69+
consumer models.Consumer
7070
readyCh chan struct{}
7171
cancelCh chan os.Signal
7272
stopCh chan struct{}
@@ -125,7 +125,7 @@ func (s *dcp) Start() {
125125

126126
s.stream = stream.NewStream(
127127
s.client, s.metadata, s.config, s.version, s.bucketInfo, s.vBucketDiscovery,
128-
s.listener, collectionIDs, s.stopCh, s.bus, s.eventHandler,
128+
s.consumer, collectionIDs, s.stopCh, s.bus, s.eventHandler,
129129
tc,
130130
)
131131

@@ -245,7 +245,7 @@ func (s *dcp) GetVersion() *couchbase.Version {
245245
return s.version
246246
}
247247

248-
func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
248+
func newDcp(config *config.Dcp, consumer models.Consumer) (Dcp, error) {
249249
config.ApplyDefaults()
250250
copyOfConfig := config
251251
printConfiguration(*copyOfConfig)
@@ -292,7 +292,7 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
292292

293293
return &dcp{
294294
client: client,
295-
listener: listener,
295+
consumer: consumer,
296296
config: config,
297297
version: version,
298298
bucketInfo: bucketInfo,
@@ -306,29 +306,60 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
306306
}, nil
307307
}
308308

309+
type simplifiedConsumer struct {
310+
listener models.Listener
311+
}
312+
313+
func NewSimpleConsumer(listener models.Listener) models.Consumer {
314+
return &simplifiedConsumer{listener: listener}
315+
}
316+
317+
func (s *simplifiedConsumer) ConsumeEvent(ctx *models.ListenerContext) {
318+
s.listener(ctx)
319+
}
320+
321+
func (s *simplifiedConsumer) TrackOffset(vbID uint16, offset *models.Offset) {}
322+
323+
// NewExtendedDcp creates a new Dcp client
324+
//
325+
// config: path to a configuration file or a configuration struct
326+
// consumer must implement models.Consumer interface containing both ConsumeEvent and TrackOffset methods
327+
func NewExtendedDcp(cfg any, consumer models.Consumer) (Dcp, error) {
328+
switch v := cfg.(type) {
329+
case *config.Dcp:
330+
return newDcp(v, consumer)
331+
case config.Dcp:
332+
return newDcp(&v, consumer)
333+
case string:
334+
return newDcpWithPath(v, consumer)
335+
default:
336+
return nil, errors.New("invalid config")
337+
}
338+
}
339+
309340
// NewDcp creates a new Dcp client
310341
//
311342
// config: path to a configuration file or a configuration struct
312343
// listener is a callback function that will be called when a mutation, deletion or expiration event occurs
313344
func NewDcp(cfg any, listener models.Listener) (Dcp, error) {
314345
switch v := cfg.(type) {
315346
case *config.Dcp:
316-
return newDcp(v, listener)
347+
return newDcp(v, NewSimpleConsumer(listener))
317348
case config.Dcp:
318-
return newDcp(&v, listener)
349+
return newDcp(&v, NewSimpleConsumer(listener))
319350
case string:
320-
return newDcpWithPath(v, listener)
351+
return newDcpWithPath(v, NewSimpleConsumer(listener))
321352
default:
322353
return nil, errors.New("invalid config")
323354
}
324355
}
325356

326-
func newDcpWithPath(path string, listener models.Listener) (Dcp, error) {
357+
func newDcpWithPath(path string, consumer models.Consumer) (Dcp, error) {
327358
c, err := newDcpConfig(path)
328359
if err != nil {
329360
return nil, err
330361
}
331-
return newDcp(&c, listener)
362+
return newDcp(&c, consumer)
332363
}
333364

334365
func newDcpConfig(path string) (config.Dcp, error) {

models/listeners.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ type (
2626
ListenerCh chan ListenerArgs
2727
ListenerEndCh chan DcpStreamEndContext
2828
)
29+
30+
type Consumer interface {
31+
ConsumeEvent(ctx *ListenerContext)
32+
TrackOffset(vbID uint16, offset *Offset)
33+
}

stream/stream.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type stream struct {
6464
vbIDRange *models.VbIDRange
6565
dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool]
6666
stopCh chan struct{}
67-
listener models.Listener
67+
consumer models.Consumer
6868
bucketInfo *couchbase.BucketInfo
6969
finishStreamWithEndEventCh chan struct{}
7070
finishStreamWithCloseCh chan struct{}
@@ -94,6 +94,7 @@ func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
9494
return
9595
}
9696
s.offsets.Store(vbID, offset)
97+
s.consumer.TrackOffset(vbID, offset)
9798
if !dirty {
9899
return
99100
}
@@ -136,7 +137,7 @@ func (s *stream) waitAndForward(
136137

137138
start := time.Now()
138139

139-
s.listener(ctx)
140+
s.consumer.ConsumeEvent(ctx)
140141

141142
s.metric.ProcessLatency = time.Since(start).Milliseconds()
142143
}
@@ -466,7 +467,7 @@ func NewStream(client couchbase.Client,
466467
version *couchbase.Version,
467468
bucketInfo *couchbase.BucketInfo,
468469
vBucketDiscovery VBucketDiscovery,
469-
listener models.Listener,
470+
consumer models.Consumer,
470471
collectionIDs map[uint32]string,
471472
stopCh chan struct{},
472473
bus EventBus.Bus,
@@ -476,7 +477,7 @@ func NewStream(client couchbase.Client,
476477
stream := &stream{
477478
client: client,
478479
metadata: metadata,
479-
listener: listener,
480+
consumer: consumer,
480481
config: config,
481482
bucketInfo: bucketInfo,
482483
vBucketDiscovery: vBucketDiscovery,

0 commit comments

Comments
 (0)