@@ -17,8 +17,6 @@ package eventbus
1717import (
1818 // standard libraries.
1919 "context"
20- "encoding/base64"
21- "encoding/binary"
2220 stderrors "errors"
2321 "io"
2422 "strings"
@@ -37,7 +35,7 @@ import (
3735 "github.com/linkall-labs/vanus/client/pkg/errors"
3836 "github.com/linkall-labs/vanus/client/pkg/eventlog"
3937 "github.com/linkall-labs/vanus/client/pkg/policy"
40- vlog "github.com/linkall-labs/vanus/observability/log"
38+ "github.com/linkall-labs/vanus/observability/log"
4139
4240 eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus"
4341 el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
@@ -66,7 +64,7 @@ func NewEventbus(cfg *eb.Config) *eventbus {
6664 for {
6765 re , ok := <- ch
6866 if ! ok {
69- vlog .Debug (context .Background (), "eventbus quits writable watcher" , map [string ]interface {}{
67+ log .Debug (context .Background (), "eventbus quits writable watcher" , map [string ]interface {}{
7068 "eventbus" : bus .cfg .Name ,
7169 })
7270 break
@@ -88,7 +86,7 @@ func NewEventbus(cfg *eb.Config) *eventbus {
8886 for {
8987 re , ok := <- ch
9088 if ! ok {
91- vlog .Debug (context .Background (), "eventbus quits readable watcher" , map [string ]interface {}{
89+ log .Debug (context .Background (), "eventbus quits readable watcher" , map [string ]interface {}{
9290 "eventbus" : bus .cfg .Name ,
9391 })
9492 break
@@ -187,16 +185,16 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
187185 if len (b .readableLogs ) == 0 {
188186 b .refreshReadableLogs (ctx )
189187 }
190- if log , ok := b .readableLogs [logID ]; ok {
191- return log , nil
188+ if l , ok := b .readableLogs [logID ]; ok {
189+ return l , nil
192190 }
193191 return nil , errors .ErrNotFound
194192 } else if op .Policy .AccessMode () == api .ReadWrite {
195193 if len (b .writableLogs ) == 0 {
196194 b .refreshWritableLogs (ctx )
197195 }
198- if log , ok := b .writableLogs [logID ]; ok {
199- return log , nil
196+ if l , ok := b .writableLogs [logID ]; ok {
197+ return l , nil
200198 }
201199 return nil , errors .ErrNotFound
202200 } else {
@@ -312,8 +310,8 @@ func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResul
312310 Endpoints : b .cfg .Endpoints ,
313311 ID : logID ,
314312 }
315- log := eventlog .NewEventLog (cfg )
316- lws [logID ] = log
313+ l := eventlog .NewEventLog (cfg )
314+ lws [logID ] = l
317315 return true
318316 })
319317 b .setWritableLogs (s , lws )
@@ -407,8 +405,8 @@ func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResul
407405 Endpoints : b .cfg .Endpoints ,
408406 ID : logID ,
409407 }
410- log := eventlog .NewEventLog (cfg )
411- lws [logID ] = log
408+ l := eventlog .NewEventLog (cfg )
409+ lws [logID ] = l
412410 return true
413411 })
414412 b .setReadableLogs (s , lws )
@@ -451,7 +449,7 @@ type busWriter struct {
451449
452450var _ api.BusWriter = (* busWriter )(nil )
453451
454- func (w * busWriter ) AppendOne (ctx context.Context , event * ce.Event , opts ... api.WriteOption ) (eid string , err error ) {
452+ func (w * busWriter ) AppendOne (ctx context.Context , event * ce.Event , opts ... api.WriteOption ) (string , error ) {
455453 _ctx , span := w .tracer .Start (ctx , "AppendOne" )
456454 defer span .End ()
457455
@@ -470,21 +468,15 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
470468 }
471469
472470 // 2. append the event to the eventlog
473- off , err := lw .Append (_ctx , event )
471+ eid , err := lw .Append (_ctx , event )
474472 if err != nil {
475473 return "" , err
476474 }
477475
478- // 3. generate event ID
479- var buf [16 ]byte
480- binary .BigEndian .PutUint64 (buf [0 :8 ], lw .Log ().ID ())
481- binary .BigEndian .PutUint64 (buf [8 :16 ], uint64 (off ))
482- encoded := base64 .StdEncoding .EncodeToString (buf [:])
483-
484- return encoded , nil
476+ return eid , nil
485477}
486478
487- func (w * busWriter ) AppendMany (ctx context.Context , events []* ce.Event , opts ... api.WriteOption ) (eid string , err error ) {
479+ func (w * busWriter ) AppendMany (ctx context.Context , events []* ce.Event , opts ... api.WriteOption ) (string , error ) {
488480 // TODO(jiangkai): implement this method, by jiangkai, 2022.10.24
489481 return "" , nil
490482}
@@ -497,17 +489,17 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions)
497489 _ctx , span := w .tracer .Start (ctx , "pickWritableLog" )
498490 defer span .End ()
499491
500- log , err := opts .Policy .NextLog (ctx )
492+ l , err := opts .Policy .NextLog (ctx )
501493 if err != nil {
502494 return nil , err
503495 }
504496
505- l := w .ebus .getWritableLog (_ctx , log .ID ())
506- if l == nil {
497+ lw := w .ebus .getWritableLog (_ctx , l .ID ())
498+ if lw == nil {
507499 return nil , stderrors .New ("can not pick writable log" )
508500 }
509501
510- return l .Writer (), nil
502+ return lw .Writer (), nil
511503}
512504
513505type busReader struct {
@@ -558,11 +550,11 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions)
558550 _ctx , span := r .tracer .Start (ctx , "pickReadableLog" )
559551 defer span .End ()
560552
561- log , err := opts .Policy .NextLog (ctx )
553+ l , err := opts .Policy .NextLog (ctx )
562554 if err != nil {
563555 return nil , err
564556 }
565- lr := r .ebus .getReadableLog (_ctx , log .ID ())
557+ lr := r .ebus .getReadableLog (_ctx , l .ID ())
566558 if lr == nil {
567559 return nil , stderrors .New ("can not pick readable log" )
568560 }
0 commit comments