@@ -27,6 +27,18 @@ const (
2727 checksumSize = 4
2828)
2929
30+ // mvccSnapshot is used solely for gob snapshot serialization.
31+ type mvccSnapshot struct {
32+ LastCommitTS uint64
33+ Entries []mvccSnapshotEntry
34+ }
35+
36+ // mvccSnapshotEntry is used solely for gob snapshot serialization.
37+ type mvccSnapshotEntry struct {
38+ Key []byte
39+ Versions []VersionedValue
40+ }
41+
3042func byteSliceComparator (a , b interface {}) int {
3143 ab , okA := a .([]byte )
3244 bb , okB := b .([]byte )
@@ -69,14 +81,28 @@ func (s *mvccStore) LastCommitTS() uint64 {
6981 return s .lastCommitTS
7082}
7183
84+ // MVCCStoreOption configures the MVCCStore.
85+ type MVCCStoreOption func (* mvccStore )
86+
87+ // WithLogger sets a custom logger for the store.
88+ func WithLogger (l * slog.Logger ) MVCCStoreOption {
89+ return func (s * mvccStore ) {
90+ s .log = l
91+ }
92+ }
93+
7294// NewMVCCStore creates a new MVCC-enabled in-memory store.
73- func NewMVCCStore () MVCCStore {
74- return & mvccStore {
95+ func NewMVCCStore (opts ... MVCCStoreOption ) MVCCStore {
96+ s := & mvccStore {
7597 tree : treemap .NewWith (byteSliceComparator ),
7698 log : slog .New (slog .NewJSONHandler (os .Stdout , & slog.HandlerOptions {
7799 Level : slog .LevelWarn ,
78100 })),
79101 }
102+ for _ , opt := range opts {
103+ opt (s )
104+ }
105+ return s
80106}
81107
82108var _ MVCCStore = (* mvccStore )(nil )
@@ -345,7 +371,7 @@ func (s *mvccStore) Snapshot() (io.ReadWriter, error) {
345371 s .mtx .RLock ()
346372 defer s .mtx .RUnlock ()
347373
348- state := make ([]mvccSnapshotEntry , 0 , s .tree .Size ())
374+ entries := make ([]mvccSnapshotEntry , 0 , s .tree .Size ())
349375 s .tree .Each (func (key interface {}, value interface {}) {
350376 k , ok := key .([]byte )
351377 if ! ok {
@@ -355,14 +381,19 @@ func (s *mvccStore) Snapshot() (io.ReadWriter, error) {
355381 if ! ok {
356382 return
357383 }
358- state = append (state , mvccSnapshotEntry {
384+ entries = append (entries , mvccSnapshotEntry {
359385 Key : bytes .Clone (k ),
360386 Versions : append ([]VersionedValue (nil ), versions ... ),
361387 })
362388 })
363389
390+ snapshot := mvccSnapshot {
391+ LastCommitTS : s .lastCommitTS ,
392+ Entries : entries ,
393+ }
394+
364395 buf := & bytes.Buffer {}
365- if err := gob .NewEncoder (buf ).Encode (state ); err != nil {
396+ if err := gob .NewEncoder (buf ).Encode (snapshot ); err != nil {
366397 return nil , errors .WithStack (err )
367398 }
368399
@@ -388,35 +419,90 @@ func (s *mvccStore) Restore(r io.Reader) error {
388419 return errors .WithStack (ErrInvalidChecksum )
389420 }
390421
391- var state [] mvccSnapshotEntry
392- if err := gob .NewDecoder (bytes .NewReader (payload )).Decode (& state ); err != nil {
422+ var snapshot mvccSnapshot
423+ if err := gob .NewDecoder (bytes .NewReader (payload )).Decode (& snapshot ); err != nil {
393424 return errors .WithStack (err )
394425 }
395426
396427 s .mtx .Lock ()
397428 defer s .mtx .Unlock ()
398429
399430 s .tree .Clear ()
400- for _ , entry := range state {
431+ s .lastCommitTS = snapshot .LastCommitTS
432+ for _ , entry := range snapshot .Entries {
401433 versions := append ([]VersionedValue (nil ), entry .Versions ... )
402434 s .tree .Put (bytes .Clone (entry .Key ), versions )
403- if len (versions ) > 0 {
404- last := versions [len (versions )- 1 ].TS
405- if last > s .lastCommitTS {
406- s .lastCommitTS = last
435+ }
436+
437+ return nil
438+ }
439+
440+ func compactVersions (versions []VersionedValue , minTS uint64 ) ([]VersionedValue , bool ) {
441+ if len (versions ) == 0 {
442+ return versions , false
443+ }
444+
445+ // Find the latest version that is <= minTS
446+ keepIdx := - 1
447+ for i := len (versions ) - 1 ; i >= 0 ; i -- {
448+ if versions [i ].TS <= minTS {
449+ keepIdx = i
450+ break
451+ }
452+ }
453+
454+ // If all versions are newer than minTS, keep everything
455+ if keepIdx == - 1 {
456+ return versions , false
457+ }
458+
459+ // If the oldest version is the one to keep, we can't remove anything before it
460+ if keepIdx == 0 {
461+ return versions , false
462+ }
463+
464+ // We keep versions starting from keepIdx
465+ // The version at keepIdx represents the state at minTS.
466+ newVersions := make ([]VersionedValue , len (versions )- keepIdx )
467+ copy (newVersions , versions [keepIdx :])
468+ return newVersions , true
469+ }
470+
471+ func (s * mvccStore ) Compact (ctx context.Context , minTS uint64 ) error {
472+ s .mtx .Lock ()
473+ defer s .mtx .Unlock ()
474+
475+ var updates map [string ][]VersionedValue = make (map [string ][]VersionedValue )
476+
477+ it := s .tree .Iterator ()
478+ for it .Next () {
479+ versions , ok := it .Value ().([]VersionedValue )
480+ if ! ok {
481+ continue
482+ }
483+
484+ newVersions , changed := compactVersions (versions , minTS )
485+ if changed {
486+ // tree keys are []byte, need string for map key
487+ keyBytes , ok := it .Key ().([]byte )
488+ if ! ok {
489+ continue
407490 }
491+ updates [string (keyBytes )] = newVersions
408492 }
409493 }
410494
495+ for k , v := range updates {
496+ s .tree .Put ([]byte (k ), v )
497+ }
498+
499+ s .log .InfoContext (ctx , "compact" ,
500+ slog .Uint64 ("min_ts" , minTS ),
501+ slog .Int ("updated_keys" , len (updates )),
502+ )
411503 return nil
412504}
413505
414506func (s * mvccStore ) Close () error {
415507 return nil
416508}
417-
418- // mvccSnapshotEntry is used solely for gob snapshot serialization.
419- type mvccSnapshotEntry struct {
420- Key []byte
421- Versions []VersionedValue
422- }
0 commit comments