Skip to content

Commit 9351af1

Browse files
committed
feat: fix review comments
Signed-off-by: jyjiangkai <[email protected]>
1 parent 074e7f9 commit 9351af1

File tree

10 files changed

+62
-30
lines changed

10 files changed

+62
-30
lines changed

client/pkg/api/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Eventbus interface {
3232

3333
type BusWriter interface {
3434
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (string, error)
35-
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (string, error)
35+
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) ([]string, error)
3636
}
3737

3838
type BusReader interface {

client/pkg/api/mock_client.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/pkg/eventbus/eventbus.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
185185
if len(b.readableLogs) == 0 {
186186
b.refreshReadableLogs(ctx)
187187
}
188+
b.readableMu.RLock()
189+
defer b.readableMu.RUnlock()
188190
if l, ok := b.readableLogs[logID]; ok {
189191
return l, nil
190192
}
@@ -193,6 +195,8 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
193195
if len(b.writableLogs) == 0 {
194196
b.refreshWritableLogs(ctx)
195197
}
198+
b.writableMu.RLock()
199+
defer b.writableMu.RUnlock()
196200
if l, ok := b.writableLogs[logID]; ok {
197201
return l, nil
198202
}
@@ -217,6 +221,8 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
217221
b.refreshReadableLogs(ctx)
218222
}
219223
eventlogs := make([]api.Eventlog, 0)
224+
b.readableMu.RLock()
225+
defer b.readableMu.RUnlock()
220226
for _, el := range b.readableLogs {
221227
eventlogs = append(eventlogs, el)
222228
}
@@ -226,6 +232,8 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
226232
b.refreshWritableLogs(ctx)
227233
}
228234
eventlogs := make([]api.Eventlog, 0)
235+
b.writableMu.RLock()
236+
defer b.writableMu.RUnlock()
229237
for _, el := range b.writableLogs {
230238
eventlogs = append(eventlogs, el)
231239
}
@@ -476,9 +484,9 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
476484
return eid, nil
477485
}
478486

479-
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (string, error) {
487+
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) ([]string, error) {
480488
// TODO(jiangkai): implement this method, by jiangkai, 2022.10.24
481-
return "", nil
489+
return nil, nil
482490
}
483491

484492
func (w *busWriter) Bus() api.Eventbus {

client/pkg/eventlog/eventlog_impl.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/binary"
2222
stderr "errors"
2323
"io"
24+
"sort"
2425
"sync"
2526
"time"
2627

@@ -110,11 +111,13 @@ type eventlog struct {
110111
writableSegments map[uint64]*segment
111112
writableMu sync.RWMutex
112113
logWriter *logWriter
114+
writerMu sync.RWMutex
113115

114116
readableWatcher *ReadableSegmentsWatcher
115117
readableSegments map[uint64]*segment
116118
readableMu sync.RWMutex
117119
logReader *logReader
120+
readerMu sync.RWMutex
118121
tracer *tracing.Tracer
119122
}
120123

@@ -128,6 +131,8 @@ func (l *eventlog) ID() uint64 {
128131
func (l *eventlog) Close(ctx context.Context) {
129132
l.writableWatcher.Close()
130133
l.readableWatcher.Close()
134+
l.logWriter = nil
135+
l.logReader = nil
131136

132137
for _, segment := range l.writableSegments {
133138
segment.Close(ctx)
@@ -138,6 +143,11 @@ func (l *eventlog) Close(ctx context.Context) {
138143
}
139144

140145
func (l *eventlog) Writer() LogWriter {
146+
if l.logWriter != nil {
147+
return l.logWriter
148+
}
149+
l.writerMu.Lock()
150+
defer l.writerMu.Unlock()
141151
if l.logWriter != nil {
142152
return l.logWriter
143153
}
@@ -151,6 +161,11 @@ func (l *eventlog) Reader(cfg ReaderConfig) LogReader {
151161
if l.logReader != nil {
152162
return l.logReader
153163
}
164+
l.readerMu.Lock()
165+
defer l.readerMu.Unlock()
166+
if l.logWriter != nil {
167+
return l.logReader
168+
}
154169
l.logReader = &logReader{
155170
elog: l,
156171
pos: 0,
@@ -262,6 +277,8 @@ func (l *eventlog) selectWritableSegment(ctx context.Context) (*segment, error)
262277
}
263278

264279
func (l *eventlog) nextWritableSegment(ctx context.Context, seg *segment) (*segment, error) {
280+
l.writableMu.RLock()
281+
defer l.writableMu.RUnlock()
265282
if s, ok := l.writableSegments[seg.nextSegmentId]; ok {
266283
return s, nil
267284
}
@@ -345,15 +362,14 @@ func (l *eventlog) selectReadableSegment(ctx context.Context, offset int64) (*se
345362
return nil, errors.ErrUnderflow
346363
}
347364

348-
for {
349-
if offset >= target.EndOffset() {
350-
target = l.writableSegments[target.nextSegmentId]
351-
} else {
352-
// got target segment
353-
break
354-
}
365+
segmentNum := len(l.readableSegments)
366+
n := sort.Search(segmentNum, func(i int) bool {
367+
return l.readableSegments[uint64(i)].EndOffset() > offset
368+
})
369+
if n < segmentNum {
370+
return l.readableSegments[uint64(n)], nil
355371
}
356-
return target, nil
372+
return nil, errors.ErrNotReadable
357373
}
358374

359375
func (l *eventlog) fetchReadableSegments(ctx context.Context) map[uint64]*segment {
@@ -395,9 +411,9 @@ func (w *logWriter) Append(ctx context.Context, event *ce.Event) (string, error)
395411
// TODO: async for throughput
396412
retryTimes := defaultRetryTimes
397413
for i := 1; i <= retryTimes; i++ {
398-
offset, err := w.doAppend(ctx, event)
414+
eid, err := w.doAppend(ctx, event)
399415
if err == nil {
400-
return w.generateEventID(offset), nil
416+
return eid, nil
401417
}
402418

403419
switch err {
@@ -422,28 +438,28 @@ func (w *logWriter) Append(ctx context.Context, event *ce.Event) (string, error)
422438
return "", errors.ErrUnknown
423439
}
424440

425-
func (w *logWriter) doAppend(ctx context.Context, event *ce.Event) (int64, error) {
441+
func (w *logWriter) doAppend(ctx context.Context, event *ce.Event) (string, error) {
426442
segment, err := w.selectWritableSegment(ctx)
427443
if err != nil {
428-
return -1, err
444+
return "", err
429445
}
430446
offset, err := segment.Append(ctx, event)
431447
if err != nil {
432448
switch err {
433449
case errors.ErrNotWritable, errors.ErrNotEnoughSpace, errors.ErrNoSpace:
434450
segment.SetNotWritable()
435451
}
436-
return -1, err
452+
return "", err
437453
}
438-
return offset, nil
454+
return w.generateEventID(segment, offset), nil
439455
}
440456

441-
func (w *logWriter) generateEventID(offset int64) string {
457+
func (w *logWriter) generateEventID(s *segment, offset int64) string {
442458
var buf [32]byte
443-
binary.BigEndian.PutUint64(buf[0:8], w.cur.id)
459+
binary.BigEndian.PutUint64(buf[0:8], s.id)
444460
binary.BigEndian.PutUint64(buf[8:16], uint64(offset))
445461
binary.BigEndian.PutUint64(buf[16:24], w.elog.ID())
446-
binary.BigEndian.PutUint64(buf[24:32], uint64(offset+w.cur.startOffset))
462+
binary.BigEndian.PutUint64(buf[24:32], uint64(offset+s.startOffset))
447463
return base64.StdEncoding.EncodeToString(buf[:])
448464
}
449465

internal/controller/eventbus/controller.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -411,17 +411,12 @@ func (ctrl *controller) processHeartbeat(ctx context.Context, req *ctrlpb.Segmen
411411
ID: block.SegmentID,
412412
Capacity: info.Capacity,
413413
EventLogID: block.EventlogID,
414+
State: eventlog.SegmentState(info.State),
414415
Size: info.Size,
415416
Number: info.EventNumber,
416417
FirstEventBornTime: time.UnixMilli(info.FirstEventBornTime),
417418
LastEventBornTime: time.UnixMilli(info.LastEventBornTime),
418419
}
419-
// block state transfer to segment state
420-
if info.State == "archiving" {
421-
seg.State = eventlog.StateFreezing
422-
} else if info.State == "archived" {
423-
seg.State = eventlog.StateFrozen
424-
}
425420
logArr = append(logArr, seg)
426421
segments[block.EventlogID.Key()] = logArr
427422
}

internal/store/block/raw.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ const (
6363
StateArchived = State("archived")
6464
)
6565

66+
func (s State) ToSegmentState() string {
67+
if s == StateArchiving {
68+
return "freezing"
69+
} else if s == StateArchived {
70+
return "frozen"
71+
} else {
72+
return string(s)
73+
}
74+
}
75+
6676
type Statistics struct {
6777
ID vanus.ID
6878
Capacity uint64

internal/store/segment/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (r *replica) Status() *metapb.SegmentHealthInfo {
9494
Capacity: int64(stat.Capacity),
9595
Size: int64(stat.EntrySize),
9696
EventNumber: int32(stat.EntryNum),
97-
State: string(stat.State),
97+
State: stat.State.ToSegmentState(),
9898
Leader: cs.Leader.Uint64(),
9999
Term: cs.Term,
100100
FirstEventBornTime: stat.FirstEntryStime,

internal/store/segment/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ func (s *server) onBlockArchived(stat block.Statistics) {
716716
Capacity: int64(stat.Capacity),
717717
Size: int64(stat.EntrySize),
718718
EventNumber: int32(stat.EntryNum),
719-
State: string(stat.State),
719+
State: stat.State.ToSegmentState(),
720720
FirstEventBornTime: stat.FirstEntryStime,
721721
}
722722
if stat.State == block.StateArchived {

internal/store/vsb/block_append_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func TestVSBlock_Append(t *testing.T) {
107107
So(full, ShouldBeFalse)
108108

109109
stat := b.status()
110+
So(stat.State, ShouldEqual, block.StateWorking)
110111
So(stat.EntryNum, ShouldEqual, 0)
111112
So(stat.EntrySize, ShouldEqual, 0)
112113

internal/store/vsb/block_open_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
. "github.com/smartystreets/goconvey/convey"
2626

2727
// this project.
28+
"github.com/linkall-labs/vanus/internal/store/block"
2829
idxtest "github.com/linkall-labs/vanus/internal/store/vsb/index/testing"
2930
vsbtest "github.com/linkall-labs/vanus/internal/store/vsb/testing"
3031
)
@@ -66,6 +67,7 @@ func TestVSBlock_Open(t *testing.T) {
6667
stat := b.status()
6768
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
6869
So(stat.EntryNum, ShouldEqual, 2)
70+
So(stat.State, ShouldEqual, block.StateWorking)
6971
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
7072

7173
So(b.indexes, ShouldHaveLength, 2)

0 commit comments

Comments
 (0)