Skip to content

Commit 34dd813

Browse files
committed
feat: add zstd compressed upload
1 parent 631e6ab commit 34dd813

File tree

2 files changed

+224
-52
lines changed

2 files changed

+224
-52
lines changed

cmd/filecoin-chain-archiver/cmds/create.go

Lines changed: 222 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,20 @@ import (
66
"errors"
77
"fmt"
88
"io"
9-
"io/ioutil"
109
"math/rand"
1110
"net/url"
1211
"strings"
1312
"syscall"
1413
"time"
1514

15+
"golang.org/x/sync/errgroup"
16+
1617
"github.com/filecoin-project/filecoin-chain-archiver/pkg/config"
1718
"github.com/filecoin-project/filecoin-chain-archiver/pkg/consensus"
1819
"github.com/filecoin-project/filecoin-chain-archiver/pkg/export"
1920
"github.com/filecoin-project/filecoin-chain-archiver/pkg/nodelocker/client"
2021
"github.com/filecoin-project/go-state-types/abi"
22+
"github.com/klauspost/compress/zstd"
2123
"github.com/minio/minio-go/v7"
2224
"github.com/minio/minio-go/v7/pkg/credentials"
2325
"github.com/urfave/cli/v2"
@@ -26,6 +28,68 @@ import (
2628
"github.com/filecoin-project/lotus/api"
2729
)
2830

31+
func Compress(in io.Reader, out io.Writer) error {
32+
enc, err := zstd.NewWriter(out)
33+
if err != nil {
34+
return err
35+
}
36+
_, err = io.Copy(enc, in)
37+
if err != nil {
38+
enc.Close()
39+
return err
40+
}
41+
return enc.Close()
42+
}
43+
44+
type autocloser struct {
45+
rc io.ReadCloser
46+
}
47+
48+
func (ac autocloser) Read(p []byte) (n int, err error) {
49+
n, err = ac.rc.Read(p)
50+
if err != nil {
51+
_ = ac.rc.Close()
52+
}
53+
return
54+
}
55+
56+
func AutoCloser(rc io.ReadCloser) io.Reader {
57+
return &autocloser{rc}
58+
}
59+
60+
type multi struct {
61+
io.Writer
62+
cs []io.Closer
63+
}
64+
65+
func MultiWriteCloser(ws ...io.Writer) io.WriteCloser {
66+
m := &multi{Writer: io.MultiWriter(ws...)}
67+
for _, w := range ws {
68+
if c, ok := w.(io.Closer); ok {
69+
m.cs = append(m.cs, c)
70+
}
71+
}
72+
return m
73+
}
74+
75+
func (m *multi) Close() error {
76+
var first error
77+
for _, c := range m.cs {
78+
if err := c.Close(); err != nil && first == nil {
79+
first = err
80+
}
81+
}
82+
return first
83+
}
84+
85+
type snapshotInfo struct {
86+
digest string
87+
size int64
88+
filename string
89+
latestIndex string
90+
latestLocation string
91+
}
92+
2993
var cmdCreate = &cli.Command{
3094
Name: "create",
3195
Usage: "create a chain export",
@@ -276,12 +340,12 @@ var cmdCreate = &cli.Command{
276340
return xerrors.Errorf("failed to aquire lock")
277341
}
278342

279-
r, w := io.Pipe()
280-
h := sha256.New()
343+
rr, wr := io.Pipe()
344+
rc, wc := io.Pipe()
281345

282-
tr := io.TeeReader(r, h)
346+
mw := MultiWriteCloser(wr, wc)
283347

284-
e := export.NewExport(node, tsk, abi.ChainEpoch(flagStaterootCount), true, w)
348+
e := export.NewExport(node, tsk, abi.ChainEpoch(flagStaterootCount), true, mw)
285349
errCh := make(chan error)
286350
go func() {
287351
errCh <- e.Export(ctx)
@@ -328,18 +392,27 @@ var cmdCreate = &cli.Command{
328392
}
329393
}()
330394

331-
var snapshotSize int64
332-
333395
if flagDiscard {
334396
logger.Infow("discarding output")
335-
io.Copy(ioutil.Discard, r)
336-
snapshotSize = 0
397+
g, _ := errgroup.WithContext(ctx)
398+
399+
g.Go(func() error {
400+
_, err := io.Copy(io.Discard, rr)
401+
return err
402+
})
403+
g.Go(func() error {
404+
_, err := io.Copy(io.Discard, rc)
405+
return err
406+
})
407+
408+
if err := g.Wait(); err != nil {
409+
return err
410+
}
337411

338412
if err := <-errCh; err != nil {
339413
return err
340414
}
341415
} else {
342-
343416
host := u.Hostname()
344417
port := u.Port()
345418
if port == "" {
@@ -355,74 +428,172 @@ var cmdCreate = &cli.Command{
355428
Creds: credentials.NewStaticV4(flagBucketAccessKey, flagBucketSecretKey, ""),
356429
Secure: u.Scheme == "https",
357430
})
431+
if err != nil {
432+
return err
433+
}
358434

359435
t := export.TimeAtHeight(gtp, height, 30*time.Second)
360436

361437
name := fmt.Sprintf("%d_%s", height, t.Format("2006_01_02T15_04_05Z"))
362438

363439
logger.Infow("object", "name", name)
364440

365-
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.car", flagNamePrefix, name), tr, -1, minio.PutObjectOptions{
366-
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.car\"", name),
367-
ContentType: "application/octet-stream",
441+
g, ctxGroup := errgroup.WithContext(ctx)
442+
var siRaw *snapshotInfo
443+
var siCompressed *snapshotInfo
444+
g.Go(func() error {
445+
var err error
446+
siRaw, err = runUploadRaw(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID, bt, rr)
447+
return err
368448
})
369-
if err != nil {
370-
return fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s.car", flagNamePrefix, name), err)
449+
g.Go(func() error {
450+
var err error
451+
siCompressed, err = runUploadCompressed(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID, bt, rc)
452+
return err
453+
})
454+
if err := g.Wait(); err != nil {
455+
return err
371456
}
372-
373-
logger.Infow("snapshot upload",
374-
"bucket", info.Bucket,
375-
"key", info.Key,
376-
"etag", info.ETag,
377-
"size", info.Size,
378-
"location", info.Location,
379-
"version_id", info.VersionID,
380-
"expiration", info.Expiration,
381-
"expiration_rule_id", info.ExpirationRuleID,
382-
)
383-
snapshotSize = info.Size
384-
385457
if err := <-errCh; err != nil {
386458
return err
387459
}
388460

389-
latestLocation, err := url.JoinPath(flagRetrievalEndpointPrefix, info.Key)
390-
if err != nil {
391-
logger.Errorw("failed to join request path", "request_prefix", flagRetrievalEndpointPrefix, "key", info.Key)
392-
return fmt.Errorf("failed to join request path: %w", err)
461+
sis := []*snapshotInfo{siRaw, siCompressed}
462+
463+
var sb strings.Builder
464+
for _, x := range sis {
465+
fmt.Fprintf(&sb, "%s *%s\n", x.digest, x.filename)
393466
}
394467

395-
sha256sum := fmt.Sprintf("%x *%s.car\n", h.Sum(nil), name)
468+
sha256sum := sb.String()
396469

397-
info, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
470+
_, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
398471
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.sha256sum\"", name),
399472
ContentType: "text/plain",
400473
})
401474
if err != nil {
402475
logger.Errorw("failed to write sha256sum", "object", fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), "err", err)
403476
}
404477

405-
info, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%slatest", flagNamePrefix), strings.NewReader(latestLocation), -1, minio.PutObjectOptions{
406-
ContentType: "text/plain",
407-
})
408-
if err != nil {
409-
return fmt.Errorf("failed to write latest", "object", fmt.Sprintf("%slatest", flagNamePrefix), "err", err)
410-
}
478+
for _, x := range sis {
479+
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, x.latestIndex), strings.NewReader(x.latestLocation), -1, minio.PutObjectOptions{
480+
ContentType: "text/plain",
481+
})
482+
if err != nil {
483+
return fmt.Errorf("failed to write latest", "object", fmt.Sprintf("%slatest", flagNamePrefix), "err", err)
484+
}
411485

412-
logger.Infow("latest upload",
413-
"bucket", info.Bucket,
414-
"key", info.Key,
415-
"etag", info.ETag,
416-
"size", info.Size,
417-
"location", info.Location,
418-
"version_id", info.VersionID,
419-
"expiration", info.Expiration,
420-
"expiration_rule_id", info.ExpirationRuleID,
421-
)
486+
logger.Infow("latest upload",
487+
"bucket", info.Bucket,
488+
"key", info.Key,
489+
"etag", info.ETag,
490+
"size", info.Size,
491+
"location", info.Location,
492+
"version_id", info.VersionID,
493+
"expiration", info.Expiration,
494+
"expiration_rule_id", info.ExpirationRuleID,
495+
)
496+
}
422497
}
423498

424-
logger.Infow("snapshot job finished", "digiest", fmt.Sprintf("%x", h.Sum(nil)), "elapsed", int64(time.Since(bt).Round(time.Second).Seconds()), "size", snapshotSize, "peer", peerID)
499+
logger.Infow("snapshot job finished", "elapsed", int64(time.Since(bt).Round(time.Second).Seconds()), "peer", peerID)
425500

426501
return nil
427502
},
428503
}
504+
505+
func runUploadRaw(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
506+
h := sha256.New()
507+
r := io.TeeReader(source, h)
508+
509+
filename := fmt.Sprintf("%s.car", name)
510+
511+
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
512+
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),
513+
ContentType: "application/octet-stream",
514+
})
515+
if err != nil {
516+
return nil, fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s", flagNamePrefix, filename), err)
517+
}
518+
519+
logger.Infow("snapshot upload",
520+
"bucket", info.Bucket,
521+
"key", info.Key,
522+
"etag", info.ETag,
523+
"size", info.Size,
524+
"location", info.Location,
525+
"version_id", info.VersionID,
526+
"expiration", info.Expiration,
527+
"expiration_rule_id", info.ExpirationRuleID,
528+
)
529+
530+
snapshotSize := info.Size
531+
532+
latestLocation, err := url.JoinPath(flagRetrievalEndpointPrefix, info.Key)
533+
if err != nil {
534+
logger.Errorw("failed to join request path", "request_prefix", flagRetrievalEndpointPrefix, "key", info.Key)
535+
return nil, fmt.Errorf("failed to join request path: %w", err)
536+
}
537+
538+
digest := fmt.Sprintf("%x", h.Sum(nil))
539+
540+
return &snapshotInfo{
541+
digest: digest,
542+
size: snapshotSize,
543+
filename: filename,
544+
latestIndex: "latest",
545+
latestLocation: latestLocation,
546+
}, nil
547+
}
548+
549+
func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
550+
551+
r1, w1 := io.Pipe()
552+
go func() {
553+
Compress(source, w1)
554+
w1.Close()
555+
}()
556+
h := sha256.New()
557+
r := io.TeeReader(r1, h)
558+
559+
filename := fmt.Sprintf("%s.car.zst", name)
560+
561+
info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
562+
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),
563+
ContentType: "application/octet-stream",
564+
})
565+
if err != nil {
566+
return nil, fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s", flagNamePrefix, filename), err)
567+
}
568+
569+
logger.Infow("compressed snapshot upload",
570+
"bucket", info.Bucket,
571+
"key", info.Key,
572+
"etag", info.ETag,
573+
"size", info.Size,
574+
"location", info.Location,
575+
"version_id", info.VersionID,
576+
"expiration", info.Expiration,
577+
"expiration_rule_id", info.ExpirationRuleID,
578+
)
579+
580+
snapshotSize := info.Size
581+
582+
latestLocation, err := url.JoinPath(flagRetrievalEndpointPrefix, info.Key)
583+
if err != nil {
584+
logger.Errorw("failed to join request path", "request_prefix", flagRetrievalEndpointPrefix, "key", info.Key)
585+
return nil, fmt.Errorf("failed to join request path: %w", err)
586+
}
587+
588+
digest := fmt.Sprintf("%x", h.Sum(nil))
589+
590+
logger.Infow("compressed snapshot job finished", "digiest", digest, "elapsed", int64(time.Since(bt).Round(time.Second).Seconds()), "size", snapshotSize, "peer", peerID)
591+
592+
return &snapshotInfo{
593+
digest: digest,
594+
size: snapshotSize,
595+
filename: filename,
596+
latestIndex: "latest.zst",
597+
latestLocation: latestLocation,
598+
}, nil
599+
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ require (
99
github.com/filecoin-project/lotus v1.15.1
1010
github.com/gorilla/mux v1.8.0
1111
github.com/ipfs/go-log/v2 v2.5.1
12+
github.com/klauspost/compress v1.13.6
1213
github.com/minio/minio-go/v7 v7.0.24
13-
github.com/multiformats/go-multiaddr v0.5.0
1414
github.com/prometheus/client_golang v1.11.0
1515
github.com/slok/go-http-metrics v0.10.0
1616
github.com/stretchr/testify v1.7.0
1717
github.com/urfave/cli/v2 v2.4.0
18+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1819
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f
1920
)
2021

0 commit comments

Comments
 (0)