Skip to content

Commit 6f4411c

Browse files
committed
fixes bugs and only uploads compressed car
* fixes a lot of bugs, found through local testing * now that lotus support importing zstd compressed snapshots, don't even bother with the raw snapshots
1 parent 82b7727 commit 6f4411c

File tree

4 files changed

+56
-606
lines changed

4 files changed

+56
-606
lines changed

cmd/filecoin-chain-archiver/cmds/create.go

Lines changed: 34 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,22 @@ var cmdCreate = &cli.Command{
179179
EnvVars: []string{"FCA_CREATE_HEIGHT"},
180180
Value: 0,
181181
},
182+
&cli.StringFlag{
183+
Name: "filename",
184+
Usage: "name of exported CAR file for internal chain export",
185+
EnvVars: []string{"FCA_EXPORT_FILENAME"},
186+
},
182187
&cli.DurationFlag{
183188
Name: "progress-update",
184189
Usage: "how frequenty to provide provide update logs",
185190
EnvVars: []string{"FCA_CREATE_PROGRESS_UPDATE"},
186191
Value: 60 * time.Second,
187192
},
193+
&cli.StringFlag{
194+
Name: "export-dir",
195+
Usage: "directory where to save the exported CAR file",
196+
EnvVars: []string{"FCA_EXPORT_DIR"},
197+
},
188198
},
189199
Action: func(cctx *cli.Context) error {
190200
ctx := context.Background()
@@ -203,6 +213,8 @@ var cmdCreate = &cli.Command{
203213
flagConfidence := cctx.Int("confidence")
204214
flagHeight := cctx.Int("height")
205215
flagAfter := cctx.Int("after")
216+
flagExportDir := cctx.String("export-dir")
217+
flagFileName := cctx.String("filename")
206218

207219
u, err := url.Parse(flagBucketEndpoint)
208220
if err != nil {
@@ -331,7 +343,7 @@ var cmdCreate = &cli.Command{
331343
return xerrors.Errorf("failed to aquire lock")
332344
}
333345

334-
e := export.NewExport(node, headTs, tailTs)
346+
e := export.NewExport(node, headTs, tailTs, flagFileName, flagExportDir)
335347
errCh := make(chan error)
336348
go func() {
337349
errCh <- e.Export(ctx)
@@ -358,35 +370,34 @@ var cmdCreate = &cli.Command{
358370
}
359371
}()
360372

361-
name, err := filepath.Glob("snapshot_" + string(tailHeight) + "_" + string(height) + "*.car")
362-
if err != nil {
363-
return err
373+
rrPath := filepath.Join(flagExportDir, flagFileName)
374+
for {
375+
info, err := os.Stat(rrPath)
376+
if os.IsNotExist(err) {
377+
logger.Infow("waiting for snapshot car file to begin writing")
378+
time.Sleep(time.Second * 15)
379+
continue
380+
} else if info.IsDir() {
381+
return xerrors.Errorf("trying to open directory instead of car file")
382+
}
383+
break
364384
}
365-
rrfn := filepath.Join(cctx.String("repo"), name[0])
366-
rr, err := os.OpenFile(rrfn, os.O_RDONLY, 444)
385+
rr, err := os.OpenFile(rrPath, os.O_RDONLY, 444)
367386
if err != nil {
368387
return err
369388
}
370389
defer rr.Close()
371390

372-
cname := name[0] + ".zstd"
373-
rcfn := filepath.Join(cctx.String("repo"), string(cname[0]))
374-
rc, err := os.OpenFile(rcfn, os.O_RDONLY, 444)
375-
if err != nil {
376-
return err
377-
}
378-
defer rc.Close()
379-
380391
go func() {
381392
var lastSize int64
393+
fmt.Printf("rrPath: %v", rrPath)
382394
for {
383395
select {
384396
case <-time.After(flagProgressUpdate):
385-
size := e.Progress(rrfn)
397+
size := e.Progress(rrPath)
386398
if size == 0 {
387399
continue
388400
}
389-
390401
logger.Infow("update", "total", size, "speed", (size-lastSize)/int64(flagProgressUpdate/time.Second))
391402
lastSize = size
392403
}
@@ -401,10 +412,6 @@ var cmdCreate = &cli.Command{
401412
_, err := io.Copy(io.Discard, rr)
402413
return err
403414
})
404-
g.Go(func() error {
405-
_, err := io.Copy(io.Discard, rc)
406-
return err
407-
})
408415

409416
if err := g.Wait(); err != nil {
410417
return err
@@ -435,19 +442,13 @@ var cmdCreate = &cli.Command{
435442

436443
//t := export.TimeAtHeight(gtp, height, 30*time.Second)
437444

438-
logger.Infow("object", "name", name[0])
445+
logger.Infow("object", "name", flagFileName)
439446

440447
g, ctxGroup := errgroup.WithContext(ctx)
441-
var siRaw *snapshotInfo
442448
var siCompressed *snapshotInfo
443449
g.Go(func() error {
444450
var err error
445-
siRaw, err = runUploadRaw(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name[0], peerID, bt, rr)
446-
return err
447-
})
448-
g.Go(func() error {
449-
var err error
450-
siCompressed, err = runUploadCompressed(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name[0], peerID, bt, rc)
451+
siCompressed, err = runUploadCompressed(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, flagFileName + ".zstd", peerID, bt, rr)
451452
return err
452453
})
453454
if err := g.Wait(); err != nil {
@@ -457,7 +458,7 @@ var cmdCreate = &cli.Command{
457458
return err
458459
}
459460

460-
sis := []*snapshotInfo{siRaw, siCompressed}
461+
sis := []*snapshotInfo{siCompressed}
461462

462463
var sb strings.Builder
463464
for _, x := range sis {
@@ -466,12 +467,12 @@ var cmdCreate = &cli.Command{
466467

467468
sha256sum := sb.String()
468469

469-
_, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
470-
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.sha256sum\"", name),
470+
_, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, flagFileName), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
471+
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.sha256sum\"", flagFileName),
471472
ContentType: "text/plain",
472473
})
473474
if err != nil {
474-
logger.Errorw("failed to write sha256sum", "object", fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), "err", err)
475+
logger.Errorw("failed to write sha256sum", "object", fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, flagFileName), "err", err)
475476
}
476477

477478
for _, x := range sis {
@@ -501,50 +502,6 @@ var cmdCreate = &cli.Command{
501502
},
502503
}
503504

504-
func runUploadRaw(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
505-
h := sha256.New()
506-
r := io.TeeReader(source, h)
507-
508-
filename := fmt.Sprintf("%s.car", name)
509-
510-
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
511-
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),
512-
ContentType: "application/octet-stream",
513-
})
514-
if err != nil {
515-
return nil, fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s", flagNamePrefix, filename), err)
516-
}
517-
518-
logger.Infow("snapshot upload",
519-
"bucket", info.Bucket,
520-
"key", info.Key,
521-
"etag", info.ETag,
522-
"size", info.Size,
523-
"location", info.Location,
524-
"version_id", info.VersionID,
525-
"expiration", info.Expiration,
526-
"expiration_rule_id", info.ExpirationRuleID,
527-
)
528-
529-
snapshotSize := info.Size
530-
531-
latestLocation, err := url.JoinPath(flagRetrievalEndpointPrefix, info.Key)
532-
if err != nil {
533-
logger.Errorw("failed to join request path", "request_prefix", flagRetrievalEndpointPrefix, "key", info.Key)
534-
return nil, fmt.Errorf("failed to join request path: %w", err)
535-
}
536-
537-
digest := fmt.Sprintf("%x", h.Sum(nil))
538-
539-
return &snapshotInfo{
540-
digest: digest,
541-
size: snapshotSize,
542-
filename: filename,
543-
latestIndex: "latest",
544-
latestLocation: latestLocation,
545-
}, nil
546-
}
547-
548505
func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
549506

550507
r1, w1 := io.Pipe()
@@ -555,7 +512,7 @@ func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBuc
555512
h := sha256.New()
556513
r := io.TeeReader(r1, h)
557514

558-
filename := fmt.Sprintf("%s.car.zst", name)
515+
filename := name
559516

560517
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
561518
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/BurntSushi/toml v1.1.0
77
github.com/filecoin-project/go-jsonrpc v0.2.3
88
github.com/filecoin-project/go-state-types v0.10.0
9-
github.com/filecoin-project/lotus v1.21.0-rc1
9+
github.com/filecoin-project/lotus v1.20.4-0.20230330155014-09459e581a6d
1010
github.com/gorilla/mux v1.8.0
1111
github.com/ipfs/go-log/v2 v2.5.1
1212
github.com/klauspost/compress v1.15.12
@@ -82,7 +82,7 @@ require (
8282
github.com/ipfs/bbloom v0.0.4 // indirect
8383
github.com/ipfs/go-block-format v0.1.1 // indirect
8484
github.com/ipfs/go-blockservice v0.5.0 // indirect
85-
github.com/ipfs/go-cid v0.3.2 // indirect
85+
github.com/ipfs/go-cid v0.4.0 // indirect
8686
github.com/ipfs/go-datastore v0.6.0 // indirect
8787
github.com/ipfs/go-ds-badger2 v0.1.3 // indirect
8888
github.com/ipfs/go-ds-leveldb v0.5.0 // indirect
@@ -98,11 +98,11 @@ require (
9898
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
9999
github.com/ipfs/go-ipld-format v0.4.0 // indirect
100100
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
101-
github.com/ipfs/go-libipfs v0.5.0 // indirect
101+
github.com/ipfs/go-libipfs v0.7.0 // indirect
102102
github.com/ipfs/go-log v1.0.5 // indirect
103103
github.com/ipfs/go-merkledag v0.9.0 // indirect
104104
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
105-
github.com/ipfs/go-path v0.3.0 // indirect
105+
github.com/ipfs/go-path v0.3.1 // indirect
106106
github.com/ipfs/go-unixfs v0.4.3 // indirect
107107
github.com/ipfs/go-verifcid v0.0.2 // indirect
108108
github.com/ipfs/interface-go-ipfs-core v0.11.1 // indirect

0 commit comments

Comments
 (0)