77 "fmt"
88 "io"
99 "io/fs"
10+ "runtime"
1011 "sync"
1112
1213 "github.com/brimdata/super"
@@ -18,6 +19,7 @@ import (
1819 arc "github.com/hashicorp/golang-lru/arc/v2"
1920 "github.com/segmentio/ksuid"
2021 "go.uber.org/zap"
22+ "golang.org/x/sync/errgroup"
2123)
2224
2325var (
@@ -98,6 +100,27 @@ func (s *Store) Remove(ctx context.Context, o *Object) error {
98100 return s .engine .Delete (ctx , s .pathOf (o .Commit ))
99101}
100102
103+ // DANGER ZONE - commits should only be removed once a new base has been
104+ // established.
105+ func (s * Store ) DropCommits (ctx context.Context , commits []ksuid.KSUID ) error {
106+ group , ctx := errgroup .WithContext (ctx )
107+ group .SetLimit (runtime .GOMAXPROCS (0 ))
108+ for _ , c := range commits {
109+ group .Go (func () error {
110+ return s .engine .Delete (ctx , s .pathOf (c ))
111+ })
112+ // delete snapshot (if it exists)
113+ group .Go (func () error {
114+ err := s .engine .Delete (ctx , s .snapshotPathOf (c ))
115+ if errors .Is (err , fs .ErrNotExist ) {
116+ err = nil
117+ }
118+ return err
119+ })
120+ }
121+ return group .Wait ()
122+ }
123+
101124func (s * Store ) Snapshot (ctx context.Context , leaf ksuid.KSUID ) (* Snapshot , error ) {
102125 if snap , ok := s .snapshots .Get (leaf ); ok {
103126 return snap , nil
@@ -108,6 +131,18 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
108131 s .snapshots .Add (leaf , snap )
109132 return snap , nil
110133 }
134+ snap , err := s .buildSnapshot (ctx , leaf )
135+ if err != nil {
136+ return nil , err
137+ }
138+ if err := s .putSnapshot (ctx , leaf , snap ); err != nil {
139+ s .logger .Error ("Storing snapshot" , zap .Error (err ))
140+ }
141+ s .snapshots .Add (leaf , snap )
142+ return snap , nil
143+ }
144+
145+ func (s * Store ) buildSnapshot (ctx context.Context , leaf ksuid.KSUID ) (* Snapshot , error ) {
111146 var objects []* Object
112147 var base * Snapshot
113148 for at := leaf ; at != ksuid .Nil ; {
@@ -135,6 +170,18 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
135170 // No snapshot found, so wait for data object.
136171 wg .Wait ()
137172 if oErr != nil {
173+ if errors .Is (oErr , fs .ErrNotExist ) {
174+ // If object get error is not exists then perhaps commits have
175+ // been vacated at this point, check if previous is a base
176+ // commit.
177+ snap , err := s .getBase (ctx )
178+ if err != nil {
179+ return nil , fmt .Errorf ("system error: error fetching base: %w" , err )
180+ }
181+ s .snapshots .Add (ksuid .Nil , snap )
182+ base = snap
183+ break
184+ }
138185 return nil , oErr
139186 }
140187 objects = append (objects , o )
@@ -153,10 +200,6 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
153200 }
154201 }
155202 }
156- if err := s .putSnapshot (ctx , leaf , snap ); err != nil {
157- s .logger .Error ("Storing snapshot" , zap .Error (err ))
158- }
159- s .snapshots .Add (leaf , snap )
160203 return snap , nil
161204}
162205
@@ -181,6 +224,27 @@ func (s *Store) snapshotPathOf(commit ksuid.KSUID) *storage.URI {
181224 return s .path .JoinPath (commit .String () + ".snap.bsup" )
182225}
183226
227+ func (s * Store ) getBase (ctx context.Context ) (* Snapshot , error ) {
228+ r , err := s .engine .Get (ctx , s .basePath ())
229+ if err != nil {
230+ return nil , err
231+ }
232+ defer r .Close ()
233+ return decodeSnapshot (r )
234+ }
235+
236+ func (s * Store ) putBase (ctx context.Context , snap * Snapshot ) error {
237+ b , err := snap .serialize ()
238+ if err != nil {
239+ return err
240+ }
241+ return storage .Put (ctx , s .engine , s .basePath (), bytes .NewReader (b ))
242+ }
243+
244+ func (s * Store ) basePath () * storage.URI {
245+ return s .path .JoinPath ("base.bsup" )
246+ }
247+
184248// Path return the entire path from the commit object to the root
185249// in leaf to root order.
186250func (s * Store ) Path (ctx context.Context , leaf ksuid.KSUID ) ([]ksuid.KSUID , error ) {
@@ -210,11 +274,16 @@ func (s *Store) PathRange(ctx context.Context, from, to ksuid.KSUID) ([]ksuid.KS
210274 }
211275 break
212276 }
213- path = append (path , at )
214277 o , err := s .Get (ctx , at )
215278 if err != nil {
279+ // If we get fs.ErrNotExist it means we have vacated and so we can
280+ // just return the path at this point.
281+ if errors .Is (err , fs .ErrNotExist ) && to .IsNil () {
282+ break
283+ }
216284 return nil , err
217285 }
286+ path = append (path , at )
218287 if at == to {
219288 break
220289 }
@@ -247,6 +316,9 @@ func (s *Store) ReadAll(ctx context.Context, commit, stop ksuid.KSUID) ([]byte,
247316 for commit != ksuid .Nil && commit != stop {
248317 b , commitObject , err := s .GetBytes (ctx , commit )
249318 if err != nil {
319+ if errors .Is (err , fs .ErrNotExist ) {
320+ break
321+ }
250322 return nil , err
251323 }
252324 size += len (b )
@@ -341,6 +413,30 @@ func (s *Store) PatchOfPath(ctx context.Context, base *Snapshot, baseID, commit
341413 return patch , nil
342414}
343415
416+ // SetBase establishes a new base (snapshot) at the provided commit and resets
417+ // the attached caches. The caller is responsible for deleting prior commits.
418+ func (s * Store ) SetBase (ctx context.Context , commit ksuid.KSUID ) (* Snapshot , error ) {
419+ path , err := s .Path (ctx , commit )
420+ if err != nil {
421+ return nil , err
422+ }
423+ if len (path ) <= 1 {
424+ return nil , errors .New ("cannot set base on earliest commit" )
425+ }
426+ // Create snapshot of previous commit.
427+ snap , err := s .buildSnapshot (ctx , path [1 ])
428+ if err != nil {
429+ return nil , err
430+ }
431+ if err := s .putBase (ctx , snap ); err != nil {
432+ return nil , err
433+ }
434+ s .cache .Purge ()
435+ s .paths .Purge ()
436+ s .snapshots .Purge ()
437+ return snap , nil
438+ }
439+
344440// Vacuumable returns the set of data.Objects in the path of leaf that are not referenced
345441// by the leaf's snapshot.
346442func (s * Store ) Vacuumable (ctx context.Context , leaf ksuid.KSUID , out chan <- * data.Object ) error {
0 commit comments