-
Notifications
You must be signed in to change notification settings - Fork 95
feat: optimize route switching when freeze segment #321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #321 +/- ##
==========================================
- Coverage 62.72% 59.08% -3.64%
==========================================
Files 133 175 +42
Lines 12128 14474 +2346
==========================================
+ Hits 7607 8552 +945
- Misses 3977 5306 +1329
- Partials 544 616 +72
Continue to review full report at Codecov.
|
b4563e9 to
8b132b7
Compare
internal/store/vsb/block.go
Outdated
| func (b *vsBlock) status() block.Statistics { | ||
| return b.stat(b.makeSnapshot()) | ||
| m, indexes := b.makeSnapshot() | ||
| return b.stat(m, indexes, block.StateWorking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass state with m.archived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.archived has been deleted, and the segment status cannot be rolled back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meta.archived
Signed-off-by: jyjiangkai <[email protected]>
Signed-off-by: jyjiangkai <[email protected]>
| if l.logWriter != nil { | ||
| return l.logWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe access l.logWriter is not safe.
| So(archived, ShouldBeTrue) | ||
|
|
||
| stat = b.status() | ||
| So(stat.Archived, ShouldBeTrue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check state
|
|
||
| stat := b.status() | ||
| So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1) | ||
| So(stat.Archived, ShouldBeTrue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check state
|
|
||
| stat := b.status() | ||
| So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1) | ||
| So(stat.Archived, ShouldBeFalse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check state
internal/store/block/raw.go
Outdated
| StateArchived = State("archived") | ||
| ) | ||
|
|
||
| func (s State) ToSegmentState() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't place this at here.
client/pkg/eventlog/eventlog_impl.go
Outdated
|
|
||
| segmentNum := len(l.readableSegments) | ||
| n := sort.Search(segmentNum, func(i int) bool { | ||
| return l.readableSegments[uint64(i)].EndOffset() > offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the key of readableSegments is not a index
Signed-off-by: jyjiangkai <[email protected]>
Signed-off-by: jyjiangkai <[email protected]>
internal/store/vsb/block_append.go
Outdated
| return seqs, frag, actx.size(b.dataOffset) >= b.capacity, nil | ||
| full := actx.size(b.dataOffset) >= b.capacity | ||
| if full && b.lis != nil { | ||
| m, indexes := makeSnapshot(b.actx, b.indexes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use b.actx and b.indexes in PrepareAppend()
internal/store/segment/server.go
Outdated
|
|
||
| s.replicas.Range(func(key, value interface{}) bool { | ||
| b, _ := value.(replica) | ||
| if b.appender.Status().Leader.Equals(vanus.ID(segment.LeaderBlockId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use LeaderBlockId is unsafe, because it may be not equal to info.leader from info.term.
internal/store/vsb/block.go
Outdated
| state := block.StateWorking | ||
| m, indexes := b.makeSnapshot() | ||
| if m.archived { | ||
| state = block.StateArchiving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StateArchiving does not match the semantics of archived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the value of state needs to persist to disk.
client/pkg/eventbus/eventbus.go
Outdated
| b.readableMu.RLock() | ||
| defer b.readableMu.RUnlock() | ||
| if len(b.readableLogs) == 0 { | ||
| b.refreshReadableLogs(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlock when refresh route info.
client/pkg/eventbus/eventbus.go
Outdated
| b.writableMu.RLock() | ||
| defer b.writableMu.RUnlock() | ||
| if len(b.writableLogs) == 0 { | ||
| b.refreshWritableLogs(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlock when refresh route info
client/pkg/eventlog/eventlog_impl.go
Outdated
| l.logWriter = nil | ||
| l.logReader = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do not call Close()?
client/pkg/eventbus/eventbus.go
Outdated
| b.readableMu.RLock() | ||
| defer b.readableMu.RUnlock() | ||
| if len(b.readableLogs) == 0 { | ||
| b.refreshReadableLogs(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlock when refresh route info.
client/pkg/eventbus/eventbus.go
Outdated
| b.writableMu.RLock() | ||
| defer b.writableMu.RUnlock() | ||
| if len(b.writableLogs) == 0 { | ||
| b.refreshWritableLogs(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlock when refresh route info.
client/pkg/eventlog/eventlog_impl.go
Outdated
| // LastEntryStime | ||
| // time.UnixMilli | ||
| tailSeg := fetchTailSegment(ctx, segs) | ||
| if tailSeg.firstEventBornAt.Before(t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use firstEventBornAt is unreasonable.
af4e196 to
f4afed5
Compare
Signed-off-by: jyjiangkai <[email protected]>
What problem does this PR solve?
Optimize the routing logic for segment full
Issue Number: close #xxx
Problem Summary
What is changed and how does it work?
Check List
Tests