Skip to content

Commit 9c695ff

Browse files
authored
fix(ctrl): did not delete expired segments in time (#576)
Signed-off-by: James Yin <[email protected]>
1 parent 4ec5540 commit 9c695ff

File tree

6 files changed

+216
-115
lines changed

6 files changed

+216
-115
lines changed

client/internal/vanus/eventlog/name_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func toSegment(segment *metapb.Segment) *record.Segment {
128128
StartOffset: segment.GetStartOffsetInLog(),
129129
EndOffset: segment.GetEndOffsetInLog(),
130130
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
131-
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
131+
LastEventBornAt: time.UnixMilli(segment.LastEventBornAtByUnixMs),
132132
Writable: segment.State == "working", // TODO: writable
133133
Blocks: blocks,
134134
LeaderBlockID: segment.GetLeaderBlockId(),

internal/controller/eventbus/eventlog/eventlog.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -603,27 +603,17 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
603603
executionID := uuid.NewString()
604604
mgr.eventlogMap.Range(func(key, value interface{}) bool {
605605
elog, _ := value.(*eventlog)
606-
head := elog.head()
607-
checkCtx := context.Background()
608-
for head != nil {
606+
for head, next := elog.headAndNext(); head != nil; head, next = elog.headAndNext() {
609607
switch {
610-
case head.LastEventBornTime.Second() == 0:
611-
// TODO(wenfeng.wang) fix if set
612-
head.LastEventBornTime = time.Now().Add(mgr.segmentExpiredTime)
613-
elog.lock()
614-
if err := elog.updateSegment(checkCtx, head); err != nil {
615-
log.Warning(ctx, "update segment's metadata failed", map[string]interface{}{
616-
log.KeyError: err,
617-
"segment": head.String(),
618-
"eventlog": elog.md.ID.String(),
619-
})
620-
head.LastEventBornTime = time.Time{}
621-
}
622-
elog.unlock()
608+
case !head.isFull() || next == nil:
623609
return true
624-
case !head.isFull():
625-
return true
626-
case time.Since(head.LastEventBornTime.Add(mgr.segmentExpiredTime)) > 0:
610+
case next.StartOffsetInLog == 0:
611+
// StartOffsetInLog must be set when mark previous segment full.
612+
panic("next segment has not StartOffsetInLog") // unreachable
613+
case head.LastEventBornTime.IsZero():
614+
// LastEventBornTime must be set when mark the segment full.
615+
panic("full segment has not LastEventBornTime") // unreachable
616+
case time.Since(head.LastEventBornTime) > mgr.segmentExpiredTime:
627617
err := elog.deleteHead(ctx)
628618
if err != nil {
629619
log.Warning(ctx, "delete segment error", map[string]interface{}{
@@ -652,7 +642,6 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
652642
default:
653643
return true
654644
}
655-
head = elog.head()
656645
}
657646
return true
658647
})
@@ -1079,6 +1068,24 @@ func (el *eventlog) head() *Segment {
10791068
return ptr.Value.(*Segment)
10801069
}
10811070

1071+
// headAndNext returns copies of head and next segment in the eventlog.
1072+
func (el *eventlog) headAndNext() (*Segment, *Segment) {
1073+
el.mutex.RLock()
1074+
defer el.mutex.RUnlock()
1075+
1076+
switch el.size() {
1077+
case 0:
1078+
return nil, nil
1079+
case 1:
1080+
head := *el.segmentList.Front().Value.(*Segment)
1081+
return &head, nil
1082+
default:
1083+
ptr := el.segmentList.Front()
1084+
head, next := *ptr.Value.(*Segment), *ptr.Next().Value.(*Segment)
1085+
return &head, &next
1086+
}
1087+
}
1088+
10821089
func (el *eventlog) tail() *Segment {
10831090
if el.size() == 0 {
10841091
return nil
@@ -1179,18 +1186,22 @@ func (el *eventlog) listOfPrevious(seg *Segment) []*Segment { //nolint:unused //
11791186
func (el *eventlog) deleteHead(ctx context.Context) error {
11801187
el.mutex.Lock()
11811188
defer el.mutex.Unlock()
1189+
11821190
if el.segmentList.Len() == 0 {
11831191
return nil
11841192
}
1193+
11851194
headV := el.segmentList.Front()
11861195
nextV := headV.Next()
11871196
head, _ := headV.Value.(*Segment)
1197+
11881198
segments := make([]vanus.ID, 0, len(el.segments)-1)
11891199
for _, v := range el.segments {
11901200
if v.Uint64() != head.ID.Uint64() {
11911201
segments = append(segments, v)
11921202
}
11931203
}
1204+
11941205
if err := el.kvClient.Delete(ctx, metadata.GetEventlogSegmentsMetadataKey(el.md.ID, head.ID)); err != nil {
11951206
log.Warning(ctx, "delete segment failed when delete head", map[string]interface{}{
11961207
log.KeyError: err,
@@ -1213,11 +1224,14 @@ func (el *eventlog) deleteHead(ctx context.Context) error {
12131224
return err
12141225
}
12151226
}
1227+
12161228
if el.writePtr == head {
12171229
el.writePtr = nil
12181230
}
1231+
12191232
_ = el.segmentList.RemoveFront()
12201233
el.segments = segments
1234+
12211235
return nil
12221236
}
12231237

0 commit comments

Comments
 (0)