Skip to content

Commit 30da938

Browse files
committed
fix(store): handle saveLastCommitTS error and refactor ScanAt
1 parent 2bd4025 commit 30da938

File tree

1 file changed

+53
-47
lines changed

1 file changed

+53
-47
lines changed

store/lsm_store.go

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ func (s *pebbleStore) updateLastCommitTS(ts uint64) {
176176
if ts > s.lastCommitTS {
177177
s.lastCommitTS = ts
178178
// Best effort persist
179-
_ = s.saveLastCommitTS(ts)
179+
if err := s.saveLastCommitTS(ts); err != nil {
180+
s.log.Error("failed to persist last commit timestamp", slog.Any("error", err))
181+
}
180182
}
181183
}
182184

@@ -262,75 +264,82 @@ func (s *pebbleStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool
262264
return val != nil, nil
263265
}
264266

265-
func (s *pebbleStore) scanProcessKey(iter *pebble.Iterator, ts uint64, lastUserKey *[]byte) (*KVPair, bool) {
267+
func (s *pebbleStore) seekToVisibleVersion(iter *pebble.Iterator, userKey []byte, currentVersion, ts uint64) bool {
268+
if currentVersion <= ts {
269+
return true
270+
}
271+
seekKey := encodeKey(userKey, ts)
272+
if !iter.SeekGE(seekKey) {
273+
return false
274+
}
266275
k := iter.Key()
267-
userKey, version := decodeKey(k)
276+
currentUserKey, _ := decodeKey(k)
277+
return bytes.Equal(currentUserKey, userKey)
278+
}
268279

269-
if bytes.Equal(userKey, *lastUserKey) {
270-
iter.Next()
271-
return nil, true // continue loop
280+
func (s *pebbleStore) processFoundValue(iter *pebble.Iterator, userKey []byte, ts uint64) (*KVPair, error) {
281+
valBytes := iter.Value()
282+
sv, err := decodeValue(valBytes)
283+
if err != nil {
284+
return nil, err
272285
}
273286

274-
if version <= ts {
275-
valBytes := iter.Value()
276-
sv, err := decodeValue(valBytes)
277-
if err != nil {
278-
s.log.Error("failed to decode value", slog.Any("error", err), slog.String("key", string(k)))
279-
iter.Next()
280-
return nil, true // continue loop
281-
}
282-
283-
*lastUserKey = append([]byte(nil), userKey...)
284-
285-
if !sv.Tombstone && (sv.ExpireAt == 0 || sv.ExpireAt > ts) {
286-
iter.Next()
287-
return &KVPair{
288-
Key: userKey,
289-
Value: sv.Value,
290-
}, false
291-
}
292-
iter.Next()
293-
return nil, false // processed but filtered out
287+
if !sv.Tombstone && (sv.ExpireAt == 0 || sv.ExpireAt > ts) {
288+
return &KVPair{
289+
Key: userKey,
290+
Value: sv.Value,
291+
}, nil
294292
}
295-
296-
return nil, false // Need seek
293+
return nil, nil
297294
}
298295

299-
func (s *pebbleStore) seekNextVersion(iter *pebble.Iterator, userKey []byte, ts uint64) bool {
300-
target := encodeKey(userKey, ts)
301-
return iter.SeekGE(target)
296+
func (s *pebbleStore) skipToNextUserKey(iter *pebble.Iterator, userKey []byte) bool {
297+
if !iter.SeekGE(encodeKey(userKey, 0)) {
298+
return false
299+
}
300+
k := iter.Key()
301+
u, _ := decodeKey(k)
302+
if bytes.Equal(u, userKey) {
303+
return iter.Next()
304+
}
305+
return true
302306
}
303307

304-
func (s *pebbleStore) scanLoop(iter *pebble.Iterator, end []byte, limit int, ts uint64) []*KVPair {
308+
func (s *pebbleStore) collectScanResults(iter *pebble.Iterator, start, end []byte, limit int, ts uint64) ([]*KVPair, error) {
305309
result := make([]*KVPair, 0, limit)
306-
var lastUserKey []byte
307310

308-
for iter.Valid() {
311+
for iter.SeekGE(encodeKey(start, math.MaxUint64)); iter.Valid(); {
309312
if len(result) >= limit {
310313
break
311314
}
312315

313316
k := iter.Key()
314-
userKey, _ := decodeKey(k)
317+
userKey, version := decodeKey(k)
315318

316319
if end != nil && bytes.Compare(userKey, end) > 0 {
317320
break
318321
}
319322

320-
kv, cont := s.scanProcessKey(iter, ts, &lastUserKey)
323+
// Find the correct version for userKey
324+
if !s.seekToVisibleVersion(iter, userKey, version, ts) {
325+
continue
326+
}
327+
328+
// Now iter is at the latest visible version for userKey.
329+
kv, err := s.processFoundValue(iter, userKey, ts)
330+
if err != nil {
331+
return nil, err
332+
}
321333
if kv != nil {
322334
result = append(result, kv)
323335
}
324-
if cont {
325-
continue
326-
}
327336

328-
// Seek forward
329-
if !s.seekNextVersion(iter, userKey, ts) {
330-
break
337+
// Move to the next user key.
338+
if !s.skipToNextUserKey(iter, userKey) {
339+
break // No more keys
331340
}
332341
}
333-
return result
342+
return result, nil
334343
}
335344

336345
func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error) {
@@ -346,10 +355,7 @@ func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limi
346355
}
347356
defer iter.Close()
348357

349-
// We want to scan keys >= start.
350-
iter.SeekGE(encodeKey(start, math.MaxUint64))
351-
352-
return s.scanLoop(iter, end, limit, ts), nil
358+
return s.collectScanResults(iter, start, end, limit, ts)
353359
}
354360

355361
func (s *pebbleStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error {

0 commit comments

Comments
 (0)