77 "fmt"
88 "io"
99 "io/fs"
10+ "runtime"
1011 "sync"
1112
1213 "github.com/brimdata/super"
@@ -18,6 +19,7 @@ import (
1819 arc "github.com/hashicorp/golang-lru/arc/v2"
1920 "github.com/segmentio/ksuid"
2021 "go.uber.org/zap"
22+ "golang.org/x/sync/errgroup"
2123)
2224
2325var (
@@ -98,6 +100,35 @@ func (s *Store) Remove(ctx context.Context, o *Object) error {
98100 return s .engine .Delete (ctx , s .pathOf (o .Commit ))
99101}
100102
103+ // DANGER ZONE - commits should only be removed once a new base has been
104+ // established.
105+ func (s * Store ) DeleteCommits (ctx context.Context , commits []ksuid.KSUID ) error {
106+ deleteIfExists := func (path * storage.URI ) error {
107+ err := s .engine .Delete (ctx , path )
108+ if errors .Is (err , fs .ErrNotExist ) {
109+ err = nil
110+ }
111+ return err
112+ }
113+ group , ctx := errgroup .WithContext (ctx )
114+ group .SetLimit (runtime .GOMAXPROCS (0 ))
115+ for i , c := range commits {
116+ group .Go (func () error {
117+ return deleteIfExists (s .pathOf (c ))
118+ })
119+ group .Go (func () error {
120+ return deleteIfExists (s .snapshotPathOf (c ))
121+ })
122+ if i > 0 {
123+ // Do not delete base if on first commit.
124+ group .Go (func () error {
125+ return deleteIfExists (s .basePathOf (c ))
126+ })
127+ }
128+ }
129+ return group .Wait ()
130+ }
131+
101132func (s * Store ) Snapshot (ctx context.Context , leaf ksuid.KSUID ) (* Snapshot , error ) {
102133 if snap , ok := s .snapshots .Get (leaf ); ok {
103134 return snap , nil
@@ -108,6 +139,18 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
108139 s .snapshots .Add (leaf , snap )
109140 return snap , nil
110141 }
142+ snap , err := s .buildSnapshot (ctx , leaf )
143+ if err != nil {
144+ return nil , err
145+ }
146+ if err := s .putSnapshot (ctx , leaf , snap ); err != nil {
147+ s .logger .Error ("Storing snapshot" , zap .Error (err ))
148+ }
149+ s .snapshots .Add (leaf , snap )
150+ return snap , nil
151+ }
152+
153+ func (s * Store ) buildSnapshot (ctx context.Context , leaf ksuid.KSUID ) (* Snapshot , error ) {
111154 var objects []* Object
112155 var base * Snapshot
113156 for at := leaf ; at != ksuid .Nil ; {
@@ -135,6 +178,17 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
135178 // No snapshot found, so wait for data object.
136179 wg .Wait ()
137180 if oErr != nil {
181+ if errors .Is (oErr , fs .ErrNotExist ) {
182+ // If object get error is not exists then perhaps commits have
183+ // been vacated at this point, check if previous is a base
184+ // commit.
185+ snap , err := s .getBase (ctx , at )
186+ if err != nil {
187+ return nil , fmt .Errorf ("system error: error fetching base: %w" , err )
188+ }
189+ base = snap
190+ break
191+ }
138192 return nil , oErr
139193 }
140194 objects = append (objects , o )
@@ -153,10 +207,6 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
153207 }
154208 }
155209 }
156- if err := s .putSnapshot (ctx , leaf , snap ); err != nil {
157- s .logger .Error ("Storing snapshot" , zap .Error (err ))
158- }
159- s .snapshots .Add (leaf , snap )
160210 return snap , nil
161211}
162212
@@ -181,6 +231,27 @@ func (s *Store) snapshotPathOf(commit ksuid.KSUID) *storage.URI {
181231 return s .path .JoinPath (commit .String () + ".snap.bsup" )
182232}
183233
234+ func (s * Store ) getBase (ctx context.Context , commit ksuid.KSUID ) (* Snapshot , error ) {
235+ r , err := s .engine .Get (ctx , s .basePathOf (commit ))
236+ if err != nil {
237+ return nil , err
238+ }
239+ defer r .Close ()
240+ return decodeSnapshot (r )
241+ }
242+
243+ func (s * Store ) putBase (ctx context.Context , snap * Snapshot , commit ksuid.KSUID ) error {
244+ b , err := snap .serialize ()
245+ if err != nil {
246+ return err
247+ }
248+ return storage .Put (ctx , s .engine , s .basePathOf (commit ), bytes .NewReader (b ))
249+ }
250+
251+ func (s * Store ) basePathOf (commit ksuid.KSUID ) * storage.URI {
252+ return s .path .JoinPath (commit .String () + ".base.bsup" )
253+ }
254+
184255// Path return the entire path from the commit object to the root
185256// in leaf to root order.
186257func (s * Store ) Path (ctx context.Context , leaf ksuid.KSUID ) ([]ksuid.KSUID , error ) {
@@ -210,11 +281,16 @@ func (s *Store) PathRange(ctx context.Context, from, to ksuid.KSUID) ([]ksuid.KS
210281 }
211282 break
212283 }
213- path = append (path , at )
214284 o , err := s .Get (ctx , at )
215285 if err != nil {
286+ // If we get fs.ErrNotExist it means we have vacated and so we can
287+ // just return the path at this point.
288+ if errors .Is (err , fs .ErrNotExist ) && to .IsNil () {
289+ break
290+ }
216291 return nil , err
217292 }
293+ path = append (path , at )
218294 if at == to {
219295 break
220296 }
@@ -247,6 +323,9 @@ func (s *Store) ReadAll(ctx context.Context, commit, stop ksuid.KSUID) ([]byte,
247323 for commit != ksuid .Nil && commit != stop {
248324 b , commitObject , err := s .GetBytes (ctx , commit )
249325 if err != nil {
326+ if errors .Is (err , fs .ErrNotExist ) {
327+ break
328+ }
250329 return nil , err
251330 }
252331 size += len (b )
@@ -341,6 +420,31 @@ func (s *Store) PatchOfPath(ctx context.Context, base *Snapshot, baseID, commit
341420 return patch , nil
342421}
343422
423+ // SetBase establishes a new base (snapshot) at the provided commit and resets
424+ // the attached caches. The caller is responsible for deleting prior commits.
425+ func (s * Store ) SetBase (ctx context.Context , commit ksuid.KSUID ) (* Snapshot , error ) {
426+ path , err := s .Path (ctx , commit )
427+ if err != nil {
428+ return nil , err
429+ }
430+ if len (path ) <= 1 {
431+ return nil , errors .New ("cannot set base on earliest commit" )
432+ }
433+ // Create snapshot of previous commit.
434+ snap , err := s .buildSnapshot (ctx , path [1 ])
435+ if err != nil {
436+ return nil , err
437+ }
438+ if err := s .putBase (ctx , snap , path [1 ]); err != nil {
439+ return nil , err
440+ }
441+ s .cache .Purge ()
442+ s .paths .Purge ()
443+ s .snapshots .Purge ()
444+ s .snapshots .Add (path [1 ], snap )
445+ return snap , nil
446+ }
447+
344448// Vacuumable returns the set of data.Objects in the path of leaf that are not referenced
345449// by the leaf's snapshot.
346450func (s * Store ) Vacuumable (ctx context.Context , leaf ksuid.KSUID , out chan <- * data.Object ) error {
0 commit comments