Skip to content

Commit 15478b3

Browse files
authored
chore: Add profilecli command to delete v1 blocks (#4320)
* chore: Add profilecli command to delete v1 blocks As part of the v1 to v2 migration it is useful to clear out the bucket of old v1 blocks. This command will help to dry run this step and then also delete them. * Address PR feedback
1 parent 43b9032 commit 15478b3

File tree

5 files changed

+200
-7
lines changed

5 files changed

+200
-7
lines changed

cmd/profilecli/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ func main() {
102102
raftInfoCmd := raftCmd.Command("info", "Print info about a Raft node.")
103103
raftInfoParams := addRaftInfoParams(raftInfoCmd)
104104

105+
v2MigrationCmd := adminCmd.Command("v2-migration", "Operation to aid the v1 to v2 storage migration.")
106+
v2MigrationBucketCleanupCmd := v2MigrationCmd.Command("bucket-cleanup", "Clean up v1 artificats from data bucket.")
107+
v2MigrationBucketCleanupParams := addV2MigrationBackupCleanupParam(v2MigrationBucketCleanupCmd)
108+
105109
// parse command line arguments
106110
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
107111

@@ -181,6 +185,10 @@ func main() {
181185
if err := raftInfo(ctx, raftInfoParams); err != nil {
182186
os.Exit(checkError(err))
183187
}
188+
case v2MigrationBucketCleanupCmd.FullCommand():
189+
if err := v2MigrationBucketCleanup(ctx, v2MigrationBucketCleanupParams); err != nil {
190+
os.Exit(checkError(err))
191+
}
184192
default:
185193
level.Error(logger).Log("msg", "unknown command", "cmd", parsedCmd)
186194
}

cmd/profilecli/v2-migration.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package main
2+
3+
import (
4+
"container/list"
5+
"context"
6+
"flag"
7+
"fmt"
8+
"strings"
9+
"time"
10+
11+
"github.com/grafana/dskit/flagext"
12+
"golang.org/x/sync/errgroup"
13+
14+
pyroscopecfg "github.com/grafana/pyroscope/pkg/cfg"
15+
pyroscopeobj "github.com/grafana/pyroscope/pkg/objstore"
16+
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client"
17+
)
18+
19+
type v2MigrationBucketCleanupParams struct {
20+
configFile string
21+
configExpandEnv bool
22+
dryRun string
23+
}
24+
25+
func (p *v2MigrationBucketCleanupParams) isDryRun() bool {
26+
return p.dryRun != "false"
27+
}
28+
29+
type minimalConfig struct {
30+
Bucket objstoreclient.Config `yaml:"storage"`
31+
}
32+
33+
// Note: These are not the flags used, but we need to register them to get the defaults.
34+
func (c *minimalConfig) RegisterFlags(f *flag.FlagSet) {
35+
c.Bucket.RegisterFlags(f)
36+
}
37+
38+
func (c *minimalConfig) ApplyDynamicConfig() pyroscopecfg.Source {
39+
return func(dst pyroscopecfg.Cloneable) error {
40+
return nil
41+
}
42+
}
43+
44+
func (c *minimalConfig) Clone() flagext.Registerer {
45+
return func(c minimalConfig) *minimalConfig {
46+
return &c
47+
}(*c)
48+
}
49+
50+
func clientFromParams(ctx context.Context, params *v2MigrationBucketCleanupParams) (pyroscopeobj.Bucket, error) {
51+
if params.configFile == "" {
52+
return nil, fmt.Errorf("config file is required")
53+
}
54+
cfg := &minimalConfig{}
55+
fs := flag.NewFlagSet("config-file-loader", flag.ContinueOnError)
56+
if err := pyroscopecfg.Unmarshal(cfg,
57+
pyroscopecfg.Defaults(fs),
58+
pyroscopecfg.YAMLIgnoreUnknownFields(params.configFile, params.configExpandEnv),
59+
); err != nil {
60+
return nil, fmt.Errorf("failed parsing config: %w", err)
61+
}
62+
63+
return objstoreclient.NewBucket(ctx, cfg.Bucket, "profilecli")
64+
}
65+
66+
func addV2MigrationBackupCleanupParam(c commander) *v2MigrationBucketCleanupParams {
67+
var (
68+
params = &v2MigrationBucketCleanupParams{}
69+
)
70+
c.Flag("config.file", "The path to the pyroscope config").Default("/etc/pyroscope/config.yaml").StringVar(&params.configFile)
71+
c.Flag("config.expand-env", "").Default("false").BoolVar(&params.configExpandEnv)
72+
c.Flag("dry-run", "Dry run the operation.").Default("true").StringVar(&params.dryRun)
73+
return params
74+
}
75+
76+
func v2MigrationBucketCleanup(ctx context.Context, params *v2MigrationBucketCleanupParams) error {
77+
client, err := clientFromParams(ctx, params)
78+
if err != nil {
79+
return fmt.Errorf("failed to create client: %w", err)
80+
}
81+
82+
var pathsToDelete []string
83+
// find prefix called "phlaredb/" on the second level
84+
if err := client.Iter(ctx, "", func(name string) error {
85+
if !strings.HasSuffix(name, "/") {
86+
return nil
87+
}
88+
err := client.Iter(ctx, name, func(name string) error {
89+
if strings.HasSuffix(name, "phlaredb/") {
90+
pathsToDelete = append(pathsToDelete, name)
91+
}
92+
return nil
93+
})
94+
if err != nil {
95+
return err
96+
}
97+
98+
return nil
99+
}); err != nil {
100+
return fmt.Errorf("failed to list tenants: %w", err)
101+
}
102+
103+
if len(pathsToDelete) == 0 {
104+
fmt.Println("No paths to delete")
105+
return nil
106+
}
107+
108+
if params.isDryRun() {
109+
fmt.Println("DRY-RUN: If ran with --dry-run=false, this will delete everything under:")
110+
} else {
111+
fmt.Println("This will delete everything under:")
112+
}
113+
for _, path := range pathsToDelete {
114+
fmt.Println(" - ", path)
115+
}
116+
117+
if params.isDryRun() {
118+
fmt.Println("DRY-RUN: If ran with --dry-run=false, this will delete those object store keys:")
119+
return recurse(ctx, client, func(key string) error {
120+
fmt.Println(" - ", key)
121+
return nil
122+
}, pathsToDelete)
123+
}
124+
125+
// We do actually delete here
126+
fmt.Println("Last chance to cancel, waiting 3 seconds...")
127+
<-time.After(3 * time.Second)
128+
129+
fmt.Println("Deleted object store keys:")
130+
return recurse(ctx, client, func(key string) error {
131+
if err := client.Delete(ctx, key); err != nil {
132+
return fmt.Errorf("failed to delete %s: %w", key, err)
133+
}
134+
fmt.Println(" - ", key)
135+
return nil
136+
}, pathsToDelete)
137+
}
138+
139+
const maxConcurrentActions = 16
140+
141+
func recurse(ctx context.Context, b pyroscopeobj.Bucket, action func(key string) error, paths []string) error {
142+
g, gctx := errgroup.WithContext(ctx)
143+
g.SetLimit(maxConcurrentActions)
144+
145+
g.Go(func() error {
146+
iters := list.New()
147+
for _, path := range paths {
148+
iters.PushBack(path)
149+
}
150+
151+
for iters.Len() > 0 {
152+
e := iters.Front()
153+
path := e.Value.(string)
154+
155+
if err := b.Iter(gctx, path, func(path string) error {
156+
if strings.HasSuffix(path, "/") {
157+
iters.PushBack(path)
158+
return nil
159+
}
160+
161+
g.Go(func() error {
162+
return action(path)
163+
})
164+
165+
return nil
166+
}); err != nil {
167+
return fmt.Errorf("failed to iterate over %s: %w", path, err)
168+
}
169+
iters.Remove(e)
170+
}
171+
172+
return nil
173+
})
174+
175+
return g.Wait()
176+
}

pkg/cfg/cfg_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ server:
1818
timeout: 60h
1919
tls:
2020
key: YAML
21-
`))
21+
`), true)
2222

2323
fs := flag.NewFlagSet(t.Name(), flag.PanicOnError)
2424
flagSource := dFlags(fs, []string{"-verbose", "-server.port=21"})
@@ -51,7 +51,7 @@ servers:
5151
timeoutz: 60h
5252
tls:
5353
keey: YAML
54-
`))
54+
`), true)
5555

5656
fs := flag.NewFlagSet(t.Name(), flag.PanicOnError)
5757
flagSource := dFlags(fs, []string{"-verbose", "-server.port=21"})

pkg/cfg/files.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ func dJSON(y []byte) Source {
4141
// When expandEnvVars is true, variables in the supplied '.yaml\ file are expanded
4242
// using https://pkg.go.dev/github.com/drone/envsubst?tab=overview
4343
func YAML(f string, expandEnvVars bool) Source {
44+
return yamlWithKnowFields(f, expandEnvVars, true)
45+
}
46+
47+
// Like YAML but ignores fields that are not known
48+
func YAMLIgnoreUnknownFields(f string, expandEnvVars bool) Source {
49+
return yamlWithKnowFields(f, expandEnvVars, false)
50+
}
51+
52+
func yamlWithKnowFields(f string, expandEnvVars bool, knownFields bool) Source {
4453
return func(dst Cloneable) error {
4554
y, err := os.ReadFile(f)
4655
if err != nil {
@@ -53,19 +62,19 @@ func YAML(f string, expandEnvVars bool) Source {
5362
}
5463
y = []byte(s)
5564
}
56-
err = dYAML(y)(dst)
65+
err = dYAML(y, knownFields)(dst)
5766
return errors.Wrap(err, f)
5867
}
5968
}
6069

6170
// dYAML returns a YAML source and allows dependency injection
62-
func dYAML(y []byte) Source {
71+
func dYAML(y []byte, knownFields bool) Source {
6372
return func(dst Cloneable) error {
6473
if len(y) == 0 {
6574
return nil
6675
}
6776
dec := yaml.NewDecoder(bytes.NewReader(y))
68-
dec.KnownFields(true)
77+
dec.KnownFields(knownFields)
6978
if err := dec.Decode(dst); err != nil {
7079
return err
7180
}

pkg/cfg/precedence_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestYAMLOverDefaults(t *testing.T) {
2828
fs := flag.NewFlagSet(t.Name(), flag.PanicOnError)
2929
err := Unmarshal(&data,
3030
Defaults(fs),
31-
dYAML([]byte(y)),
31+
dYAML([]byte(y), true),
3232
)
3333

3434
require.NoError(t, err)
@@ -51,7 +51,7 @@ func TestFlagOverYAML(t *testing.T) {
5151

5252
err := Unmarshal(&data,
5353
Defaults(fs),
54-
dYAML([]byte(y)),
54+
dYAML([]byte(y), true),
5555
dFlags(fs, []string{"-verbose=false", "-tls.cert=CLI"}),
5656
)
5757

0 commit comments

Comments
 (0)