Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ type QueryWarning struct {
Warning string `json:"warning" super:"warning"`
}

type VacateResponse struct {
CommitIDs []ksuid.KSUID `super:"commit_ids"`
}

type VacuumResponse struct {
ObjectIDs []ksuid.KSUID `super:"object_ids"`
}
Expand Down
11 changes: 11 additions & 0 deletions api/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (c *Connection) delete(ctx context.Context, poolID ksuid.KSUID, branchName
return commit, err
}

func (c *Connection) Vacate(ctx context.Context, pool, revision string, dryrun bool) (api.VacateResponse, error) {
path := urlPath("pool", pool, "revision", revision, "vacate")
if dryrun {
path += "?dryrun=true"
}
req := c.NewRequest(ctx, http.MethodPost, path, nil)
var res api.VacateResponse
err := c.doAndUnmarshal(req, &res)
return res, err
}

func (c *Connection) Vacuum(ctx context.Context, pool, revision string, dryrun bool) (api.VacuumResponse, error) {
path := urlPath("pool", pool, "revision", revision, "vacuum")
if dryrun {
Expand Down
60 changes: 57 additions & 3 deletions cmd/super/db/vacate/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package vacate
import (
"errors"
"flag"
"fmt"
"strings"

"github.com/brimdata/super/cli/poolflags"
"github.com/brimdata/super/cmd/super/db"
"github.com/brimdata/super/pkg/charm"
"github.com/brimdata/super/pkg/plural"
)

var spec = &charm.Spec{
Name: "vacate",
Usage: "vacate [options] commit",
Short: "compact a pool's commit history by squashing old commit objects",
Short: "compact a pool's commit history by removing old commit objects",
Long: `
See https://superdb.org/command/db.html#super-db-vacate
`,
Expand All @@ -24,12 +28,62 @@ func init() {

type Command struct {
*db.Command
poolFlags poolflags.Flags
dryrun bool
force bool
}

func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
return &Command{Command: parent.(*db.Command)}, nil
c := &Command{Command: parent.(*db.Command)}
c.poolFlags.SetFlags(f)
f.BoolVar(&c.dryrun, "dryrun", false, "view the number of commits to be deleted")
f.BoolVar(&c.force, "f", false, "do not prompt for confirmation")
return c, nil
}

func (c *Command) Run(args []string) error {
return errors.New("issue #2545")
ctx, cleanup, err := c.Init()
if err != nil {
return err
}
defer cleanup()
at, err := c.poolFlags.HEAD()
if err != nil {
return err
}
db, err := c.DBFlags.Open(ctx)
if err != nil {
return err
}
verb := "would vacate"
if !c.dryrun {
verb = "vacated"
if err := c.confirm(at.String()); err != nil {
return err
}
}
cids, err := db.Vacate(ctx, at.Pool, at.Branch, c.dryrun)
if err != nil {
return err
}
if !c.DBFlags.Quiet {
fmt.Printf("%s %d commit%s\n", verb, len(cids), plural.Slice(cids, "s"))
}
return nil
}

func (c *Command) confirm(name string) error {
if c.force {
return nil
}
fmt.Printf("Are you sure you want to vacate previous commits from %q? There is no going back... [y|n]\n", name)
var input string
if _, err := fmt.Scanln(&input); err != nil {
return err
}
input = strings.ToLower(input)
if input == "y" || input == "yes" {
return nil
}
return errors.New("operation canceled")
}
1 change: 1 addition & 0 deletions db/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Interface interface {
Revert(ctx context.Context, poolID ksuid.KSUID, branch string, commitID ksuid.KSUID, commit api.CommitMessage) (ksuid.KSUID, error)
AddVectors(ctx context.Context, pool, revision string, objects []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error)
DeleteVectors(ctx context.Context, pool, revision string, objects []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error)
Vacate(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error)
Vacuum(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error)
}

Expand Down
16 changes: 16 additions & 0 deletions db/api/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ func (l *local) DeleteVectors(ctx context.Context, pool, revision string, ids []
return branch.DeleteVectors(ctx, ids, message.Author, message.Body)
}

func (l *local) Vacate(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error) {
poolID, err := l.PoolID(ctx, pool)
if err != nil {
return nil, err
}
p, err := l.db.OpenPool(ctx, poolID)
if err != nil {
return nil, err
}
commit, err := p.ResolveRevision(ctx, revision)
if err != nil {
return nil, err
}
return p.Vacate(ctx, commit, dryrun)
}

func (l *local) Vacuum(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error) {
poolID, err := l.PoolID(ctx, pool)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions db/api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (r *remote) DeleteVectors(ctx context.Context, pool, revision string, ids [
return res.Commit, err
}

func (r *remote) Vacate(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error) {
res, err := r.conn.Vacate(ctx, pool, revision, dryrun)
return res.CommitIDs, err
}

func (r *remote) Vacuum(ctx context.Context, pool, revision string, dryrun bool) ([]ksuid.KSUID, error) {
res, err := r.conn.Vacuum(ctx, pool, revision, dryrun)
return res.ObjectIDs, err
Expand Down
6 changes: 6 additions & 0 deletions db/commits/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package commits

import (
"context"
"errors"
"io/fs"

"github.com/brimdata/super"
"github.com/brimdata/super/sio"
Expand Down Expand Up @@ -37,6 +39,10 @@ func (r *LogReader) Read() (*super.Value, error) {
}
_, commitObject, err := r.store.GetBytes(r.ctx, r.cursor)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
r.cursor = ksuid.Nil
err = nil
}
return nil, err
}
next := commitObject.Parent
Expand Down
114 changes: 109 additions & 5 deletions db/commits/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/fs"
"runtime"
"sync"

"github.com/brimdata/super"
Expand All @@ -18,6 +19,7 @@ import (
arc "github.com/hashicorp/golang-lru/arc/v2"
"github.com/segmentio/ksuid"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -98,6 +100,35 @@ func (s *Store) Remove(ctx context.Context, o *Object) error {
return s.engine.Delete(ctx, s.pathOf(o.Commit))
}

// DANGER ZONE - commits should only be removed once a new base has been
// established.
func (s *Store) DeleteCommits(ctx context.Context, commits []ksuid.KSUID) error {
deleteIfExists := func(path *storage.URI) error {
err := s.engine.Delete(ctx, path)
if errors.Is(err, fs.ErrNotExist) {
err = nil
}
return err
}
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(runtime.GOMAXPROCS(0))
for i, c := range commits {
group.Go(func() error {
return deleteIfExists(s.pathOf(c))
})
group.Go(func() error {
return deleteIfExists(s.snapshotPathOf(c))
})
if i > 0 {
// Do not delete base if on first commit.
group.Go(func() error {
return deleteIfExists(s.basePathOf(c))
})
}
}
return group.Wait()
}

func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, error) {
if snap, ok := s.snapshots.Get(leaf); ok {
return snap, nil
Expand All @@ -108,6 +139,18 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
s.snapshots.Add(leaf, snap)
return snap, nil
}
snap, err := s.buildSnapshot(ctx, leaf)
if err != nil {
return nil, err
}
if err := s.putSnapshot(ctx, leaf, snap); err != nil {
s.logger.Error("Storing snapshot", zap.Error(err))
}
s.snapshots.Add(leaf, snap)
return snap, nil
}

func (s *Store) buildSnapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, error) {
var objects []*Object
var base *Snapshot
for at := leaf; at != ksuid.Nil; {
Expand Down Expand Up @@ -135,6 +178,17 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
// No snapshot found, so wait for data object.
wg.Wait()
if oErr != nil {
if errors.Is(oErr, fs.ErrNotExist) {
// If object get error is not exists then perhaps commits have
// been vacated at this point, check if previous is a base
// commit.
snap, err := s.getBase(ctx, at)
if err != nil {
return nil, fmt.Errorf("system error: error fetching base: %w", err)
}
base = snap
break
}
return nil, oErr
}
objects = append(objects, o)
Expand All @@ -153,10 +207,6 @@ func (s *Store) Snapshot(ctx context.Context, leaf ksuid.KSUID) (*Snapshot, erro
}
}
}
if err := s.putSnapshot(ctx, leaf, snap); err != nil {
s.logger.Error("Storing snapshot", zap.Error(err))
}
s.snapshots.Add(leaf, snap)
return snap, nil
}

Expand All @@ -181,6 +231,27 @@ func (s *Store) snapshotPathOf(commit ksuid.KSUID) *storage.URI {
return s.path.JoinPath(commit.String() + ".snap.bsup")
}

func (s *Store) getBase(ctx context.Context, commit ksuid.KSUID) (*Snapshot, error) {
r, err := s.engine.Get(ctx, s.basePathOf(commit))
if err != nil {
return nil, err
}
defer r.Close()
return decodeSnapshot(r)
}

func (s *Store) putBase(ctx context.Context, snap *Snapshot, commit ksuid.KSUID) error {
b, err := snap.serialize()
if err != nil {
return err
}
return storage.Put(ctx, s.engine, s.basePathOf(commit), bytes.NewReader(b))
}

func (s *Store) basePathOf(commit ksuid.KSUID) *storage.URI {
return s.path.JoinPath(commit.String() + ".base.bsup")
}

// Path return the entire path from the commit object to the root
// in leaf to root order.
func (s *Store) Path(ctx context.Context, leaf ksuid.KSUID) ([]ksuid.KSUID, error) {
Expand Down Expand Up @@ -210,11 +281,16 @@ func (s *Store) PathRange(ctx context.Context, from, to ksuid.KSUID) ([]ksuid.KS
}
break
}
path = append(path, at)
o, err := s.Get(ctx, at)
if err != nil {
// If we get fs.ErrNotExist it means we have vacated and so we can
// just return the path at this point.
if errors.Is(err, fs.ErrNotExist) && to.IsNil() {
break
}
return nil, err
}
path = append(path, at)
if at == to {
break
}
Expand Down Expand Up @@ -247,6 +323,9 @@ func (s *Store) ReadAll(ctx context.Context, commit, stop ksuid.KSUID) ([]byte,
for commit != ksuid.Nil && commit != stop {
b, commitObject, err := s.GetBytes(ctx, commit)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
break
}
return nil, err
}
size += len(b)
Expand Down Expand Up @@ -341,6 +420,31 @@ func (s *Store) PatchOfPath(ctx context.Context, base *Snapshot, baseID, commit
return patch, nil
}

// SetBase establishes a new base (snapshot) at the provided commit and resets
// the attached caches. The caller is responsible for deleting prior commits.
func (s *Store) SetBase(ctx context.Context, commit ksuid.KSUID) (*Snapshot, error) {
path, err := s.Path(ctx, commit)
if err != nil {
return nil, err
}
if len(path) <= 1 {
return nil, errors.New("cannot set base on earliest commit")
}
// Create snapshot of previous commit.
snap, err := s.buildSnapshot(ctx, path[1])
if err != nil {
return nil, err
}
if err := s.putBase(ctx, snap, path[1]); err != nil {
return nil, err
}
s.cache.Purge()
s.paths.Purge()
s.snapshots.Purge()
s.snapshots.Add(path[1], snap)
return snap, nil
}

// Vacuumable returns the set of data.Objects in the path of leaf that are not referenced
// by the leaf's snapshot.
func (s *Store) Vacuumable(ctx context.Context, leaf ksuid.KSUID, out chan<- *data.Object) error {
Expand Down
Loading