Skip to content

Commit 7da367d

Browse files
feat: support lookup offset by timestamp (#309)
* feat: support lookup offset by time * update * update * Update internal/store/segment/server.go Co-authored-by: James Yin <[email protected]> * update due to review * add todo Co-authored-by: James Yin <[email protected]>
1 parent 5fdb2bb commit 7da367d

File tree

18 files changed

+506
-316
lines changed

18 files changed

+506
-316
lines changed

client/internal/vanus/eventlog/name_service.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
// standard libraries.
1919
"context"
2020
"math"
21+
"time"
2122

2223
// third-party libraries.
2324
"go.opentelemetry.io/otel/trace"
@@ -26,7 +27,7 @@ import (
2627
// first-party libraries.
2728
"github.com/linkall-labs/vanus/observability/tracing"
2829
"github.com/linkall-labs/vanus/pkg/controller"
29-
ctlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
30+
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
3031
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
3132

3233
// this project.
@@ -42,15 +43,15 @@ func NewNameService(endpoints []string) *NameService {
4243
}
4344

4445
type NameService struct {
45-
client ctlpb.EventLogControllerClient
46+
client ctrlpb.EventLogControllerClient
4647
tracer *tracing.Tracer
4748
}
4849

4950
func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) (*record.Segment, error) {
5051
ctx, span := ns.tracer.Start(ctx, "LookupWritableSegment")
5152
defer span.End()
5253

53-
req := &ctlpb.GetAppendableSegmentRequest{
54+
req := &ctrlpb.GetAppendableSegmentRequest{
5455
EventLogId: logID,
5556
Limited: 1,
5657
}
@@ -71,7 +72,7 @@ func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64)
7172
ctx, span := ns.tracer.Start(ctx, "LookupReadableSegments")
7273
defer span.End()
7374

74-
req := &ctlpb.ListSegmentRequest{
75+
req := &ctrlpb.ListSegmentRequest{
7576
EventLogId: logID,
7677
StartOffset: 0,
7778
EndOffset: math.MaxInt64,
@@ -87,13 +88,13 @@ func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64)
8788
return segments, nil
8889
}
8990

90-
func toSegments(segmentpbs []*metapb.Segment) []*record.Segment {
91-
if len(segmentpbs) == 0 {
91+
func toSegments(pbs []*metapb.Segment) []*record.Segment {
92+
if len(pbs) == 0 {
9293
return make([]*record.Segment, 0)
9394
}
94-
segments := make([]*record.Segment, 0, len(segmentpbs))
95-
for _, segmentpb := range segmentpbs {
96-
segment := toSegment(segmentpb)
95+
segments := make([]*record.Segment, 0, len(pbs))
96+
for _, pb := range pbs {
97+
segment := toSegment(pb)
9798
segments = append(segments, segment)
9899
// only return first working segment
99100
if segment.Writable {
@@ -103,23 +104,22 @@ func toSegments(segmentpbs []*metapb.Segment) []*record.Segment {
103104
return segments
104105
}
105106

106-
func toSegment(segmentpb *metapb.Segment) *record.Segment {
107-
blocks := make(map[uint64]*record.Block, len(segmentpb.Replicas))
108-
for blockID, blockpb := range segmentpb.Replicas {
107+
func toSegment(segment *metapb.Segment) *record.Segment {
108+
blocks := make(map[uint64]*record.Block, len(segment.Replicas))
109+
for blockID, block := range segment.Replicas {
109110
blocks[blockID] = &record.Block{
110-
ID: blockpb.Id,
111-
Endpoint: blockpb.Endpoint,
111+
ID: block.Id,
112+
Endpoint: block.Endpoint,
112113
}
113114
}
114-
segment := &record.Segment{
115-
ID: segmentpb.GetId(),
116-
StartOffset: segmentpb.GetStartOffsetInLog(),
117-
// TODO align to server side
118-
EndOffset: segmentpb.GetEndOffsetInLog() + 1,
119-
// TODO: writable
120-
Writable: segmentpb.State == "working",
121-
Blocks: blocks,
122-
LeaderBlockID: segmentpb.GetLeaderBlockId(),
115+
return &record.Segment{
116+
ID: segment.GetId(),
117+
StartOffset: segment.GetStartOffsetInLog(),
118+
EndOffset: segment.GetEndOffsetInLog(),
119+
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
120+
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
121+
Writable: segment.State == "working", // TODO: writable
122+
Blocks: blocks,
123+
LeaderBlockID: segment.GetLeaderBlockId(),
123124
}
124-
return segment
125125
}

client/internal/vanus/store/block_store.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
// standard libraries
1919
"context"
2020
"strings"
21+
"time"
2122

2223
"github.com/linkall-labs/vanus/observability/tracing"
2324
"go.opentelemetry.io/otel/trace"
@@ -155,3 +156,24 @@ func (s *BlockStore) Read(
155156

156157
return []*ce.Event{}, err
157158
}
159+
160+
func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
161+
ctx, span := s.tracer.Start(ctx, "LookupOffset")
162+
defer span.End()
163+
164+
req := &segpb.LookupOffsetInBlockRequest{
165+
BlockId: blockID,
166+
Stime: t.UnixMilli(),
167+
}
168+
169+
client, err := s.client.Get(ctx)
170+
if err != nil {
171+
return -1, err
172+
}
173+
174+
res, err := client.(segpb.SegmentServerClient).LookupOffsetInBlock(ctx, req)
175+
if err != nil {
176+
return -1, err
177+
}
178+
return res.Offset, nil
179+
}

client/pkg/eventlog/eventlog_impl.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,40 @@ func (l *eventlog) Length(ctx context.Context) (int64, error) {
174174
// TODO(kai.jiangkai)
175175
return 0, nil
176176
}
177+
177178
func (l *eventlog) QueryOffsetByTime(ctx context.Context, timestamp int64) (int64, error) {
178-
// TODO(james.yin): lookup offset by timestamp
179-
return 0, nil
179+
t := time.UnixMilli(timestamp)
180+
// get all segments
181+
var target *segment
182+
segs := l.fetchReadableSegments(ctx)
183+
184+
if len(segs) == 0 {
185+
return -1, nil
186+
}
187+
188+
if segs[0].firstEventBornAt.After(t) {
189+
return segs[0].startOffset, nil
190+
}
191+
192+
if segs[len(segs)-1].lastEventBornAt.Before(t) {
193+
// the target offset maybe in newer segment, refresh immediately
194+
l.refreshReadableSegments(ctx)
195+
segs = l.fetchReadableSegments(ctx)
196+
}
197+
198+
for idx := range segs {
199+
s := segs[idx]
200+
if !t.Before(s.firstEventBornAt) && !t.After(s.lastEventBornAt) {
201+
target = s
202+
break
203+
}
204+
}
205+
206+
if target == nil {
207+
target = segs[len(segs)-1]
208+
}
209+
210+
return target.LookupOffset(ctx, t)
180211
}
181212

182213
func (l *eventlog) updateWritableSegment(ctx context.Context, r *record.Segment) {
@@ -286,12 +317,10 @@ func (l *eventlog) fetchReadableSegments(ctx context.Context) []*segment {
286317
defer l.readableMu.RUnlock()
287318

288319
if len(l.readableSegments) == 0 {
320+
l.readableMu.RUnlock()
289321
// refresh
290-
func() {
291-
l.readableMu.RUnlock()
292-
defer l.readableMu.RLock()
293-
l.refreshReadableSegments(ctx)
294-
}()
322+
l.refreshReadableSegments(ctx)
323+
l.readableMu.RLock()
295324
}
296325

297326
return l.readableSegments

client/pkg/eventlog/log_segment.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/binary"
2121
"math"
2222
"sync"
23+
"time"
2324

2425
"github.com/linkall-labs/vanus/observability/tracing"
2526
"go.opentelemetry.io/otel/trace"
@@ -44,12 +45,14 @@ func newSegment(ctx context.Context, r *record.Segment, towrite bool) (*segment,
4445
}
4546

4647
segment := &segment{
47-
id: r.ID,
48-
startOffset: r.StartOffset,
49-
endOffset: atomic.Int64{},
50-
writable: atomic.Bool{},
51-
prefer: prefer,
52-
tracer: tracing.NewTracer("internal.eventlog.segment", trace.SpanKindClient),
48+
id: r.ID,
49+
startOffset: r.StartOffset,
50+
endOffset: atomic.Int64{},
51+
writable: atomic.Bool{},
52+
firstEventBornAt: r.FirstEventBornAt,
53+
lastEventBornAt: r.LastEventBornAt,
54+
prefer: prefer,
55+
tracer: tracing.NewTracer("internal.eventlog.segment", trace.SpanKindClient),
5356
}
5457

5558
if !r.Writable {
@@ -82,10 +85,12 @@ func newBlockExt(ctx context.Context, r *record.Segment, leaderOnly bool) (*bloc
8285
}
8386

8487
type segment struct {
85-
id uint64
86-
startOffset int64
87-
endOffset atomic.Int64
88-
writable atomic.Bool
88+
id uint64
89+
startOffset int64
90+
endOffset atomic.Int64
91+
writable atomic.Bool
92+
firstEventBornAt time.Time
93+
lastEventBornAt time.Time
8994

9095
prefer *block
9196
mu sync.RWMutex
@@ -117,7 +122,9 @@ func (s *segment) Close(ctx context.Context) {
117122
}
118123

119124
func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) error {
120-
// When a segment become read-only, the end offset needs to be set to the readlly value.
125+
// When a segment become read-only, the end offset needs to be set to the real value.
126+
// TODO(wenfeng) data race?
127+
s.lastEventBornAt = r.LastEventBornAt
121128
if s.Writable() && !r.Writable && s.writable.CAS(true, false) {
122129
s.endOffset.Store(r.EndOffset)
123130
return nil
@@ -219,3 +226,7 @@ func (s *segment) setPreferSegmentBlock(prefer *block) {
219226
defer s.mu.Unlock()
220227
s.prefer = prefer
221228
}
229+
230+
func (s *segment) LookupOffset(ctx context.Context, t time.Time) (int64, error) {
231+
return s.preferSegmentBlock().LookupOffset(ctx, t)
232+
}

client/pkg/eventlog/segment_block.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package eventlog
1717
import (
1818
// third-party libraries
1919
"context"
20+
"time"
2021

2122
ce "github.com/cloudevents/sdk-go/v2"
2223

@@ -47,6 +48,10 @@ func (s *block) Close(ctx context.Context) {
4748
store.Put(ctx, s.store)
4849
}
4950

51+
func (s *block) LookupOffset(ctx context.Context, t time.Time) (int64, error) {
52+
return s.store.LookupOffset(ctx, s.id, t)
53+
}
54+
5055
func (s *block) Append(ctx context.Context, event *ce.Event) (int64, error) {
5156
return s.store.Append(ctx, s.id, event)
5257
}

client/pkg/primitive/watcher.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,21 @@ func (w *Watcher) Run() {
7979
func (w *Watcher) Refresh(ctx context.Context) error {
8080
// batch multi-refresh into a group
8181

82-
wg := func() *sync.WaitGroup {
83-
w.mu.RLock()
84-
defer w.mu.RUnlock()
85-
return w.wg
86-
}()
82+
w.mu.RLock()
83+
wg := w.wg
84+
w.mu.RUnlock()
8785

8886
isLeader := false
8987
if wg == nil {
90-
wg = func() *sync.WaitGroup {
91-
w.mu.Lock()
92-
defer w.mu.Unlock()
93-
94-
if w.wg == nil { // double check
95-
w.wg = &sync.WaitGroup{}
96-
w.wg.Add(1)
97-
isLeader = true
98-
}
99-
100-
return w.wg
101-
}()
88+
w.mu.Lock()
89+
// double check
90+
if w.wg == nil {
91+
w.wg = &sync.WaitGroup{}
92+
w.wg.Add(1)
93+
isLeader = true
94+
}
95+
wg = w.wg
96+
w.mu.Unlock()
10297
}
10398

10499
if isLeader {
@@ -108,8 +103,8 @@ func (w *Watcher) Refresh(ctx context.Context) error {
108103

109104
ch := make(chan struct{})
110105
go func() {
111-
defer close(ch)
112106
wg.Wait()
107+
close(ch)
113108
}()
114109

115110
select {

client/pkg/record/record.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package record
1616

17+
import "time"
18+
1719
type Eventbus struct {
1820
Name string
1921
Logs []*Eventlog
@@ -81,10 +83,12 @@ func allReadable(ls []*Eventlog) bool {
8183
}
8284

8385
type Segment struct {
84-
ID uint64
85-
StartOffset int64
86-
EndOffset int64
87-
Writable bool
86+
ID uint64
87+
StartOffset int64
88+
EndOffset int64
89+
Writable bool
90+
FirstEventBornAt time.Time
91+
LastEventBornAt time.Time
8892

8993
Blocks map[uint64]*Block
9094
LeaderBlockID uint64

internal/controller/eventbus/eventlog/eventlog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (mgr *eventlogManager) UpdateSegment(ctx context.Context, m map[string][]Se
319319
})
320320
continue
321321
}
322-
// TODO(wenfeng.wang) Don't update state in isNeedUpdate
322+
// TODO(wenfeng.wang) Don't update state in isNeedUpdate, rename?
323323
if seg.isNeedUpdate(newSeg) {
324324
err := el.updateSegment(ctx, seg)
325325
if err != nil {

internal/controller/eventbus/eventlog/segment.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,19 @@ func Convert2ProtoSegment(ctx context.Context, ins ...*Segment) []*metapb.Segmen
149149
}
150150
}
151151
segs[idx] = &metapb.Segment{
152-
Id: seg.ID.Uint64(),
153-
PreviousSegmentId: seg.PreviousSegmentID.Uint64(),
154-
NextSegmentId: seg.NextSegmentID.Uint64(),
155-
EventLogId: seg.EventLogID.Uint64(),
156-
StartOffsetInLog: seg.StartOffsetInLog,
157-
EndOffsetInLog: seg.StartOffsetInLog + int64(seg.Number) - 1,
158-
Size: seg.Size,
159-
Capacity: seg.Capacity,
160-
NumberEventStored: seg.Number,
161-
Replicas: blocks,
162-
State: string(seg.State),
152+
Id: seg.ID.Uint64(),
153+
PreviousSegmentId: seg.PreviousSegmentID.Uint64(),
154+
NextSegmentId: seg.NextSegmentID.Uint64(),
155+
EventLogId: seg.EventLogID.Uint64(),
156+
StartOffsetInLog: seg.StartOffsetInLog,
157+
EndOffsetInLog: seg.StartOffsetInLog + int64(seg.Number),
158+
Size: seg.Size,
159+
Capacity: seg.Capacity,
160+
NumberEventStored: seg.Number,
161+
Replicas: blocks,
162+
State: string(seg.State),
163+
FirstEventBornAtByUnixMs: seg.FirstEventBornTime.UnixMilli(),
164+
LastEvnetBornAtByUnixMs: seg.LastEventBornTime.UnixMilli(),
163165
}
164166
if seg.GetLeaderBlock() != nil {
165167
segs[idx].LeaderBlockId = seg.GetLeaderBlock().ID.Uint64()

0 commit comments

Comments
 (0)