@@ -6,18 +6,20 @@ import (
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
- "io/ioutil"
10
9
"math/rand"
11
10
"net/url"
12
11
"strings"
13
12
"syscall"
14
13
"time"
15
14
15
+ "golang.org/x/sync/errgroup"
16
+
16
17
"github.com/filecoin-project/filecoin-chain-archiver/pkg/config"
17
18
"github.com/filecoin-project/filecoin-chain-archiver/pkg/consensus"
18
19
"github.com/filecoin-project/filecoin-chain-archiver/pkg/export"
19
20
"github.com/filecoin-project/filecoin-chain-archiver/pkg/nodelocker/client"
20
21
"github.com/filecoin-project/go-state-types/abi"
22
+ "github.com/klauspost/compress/zstd"
21
23
"github.com/minio/minio-go/v7"
22
24
"github.com/minio/minio-go/v7/pkg/credentials"
23
25
"github.com/urfave/cli/v2"
@@ -26,6 +28,68 @@ import (
26
28
"github.com/filecoin-project/lotus/api"
27
29
)
28
30
31
+ func Compress (in io.Reader , out io.Writer ) error {
32
+ enc , err := zstd .NewWriter (out )
33
+ if err != nil {
34
+ return err
35
+ }
36
+ _ , err = io .Copy (enc , in )
37
+ if err != nil {
38
+ enc .Close ()
39
+ return err
40
+ }
41
+ return enc .Close ()
42
+ }
43
+
44
+ type autocloser struct {
45
+ rc io.ReadCloser
46
+ }
47
+
48
+ func (ac autocloser ) Read (p []byte ) (n int , err error ) {
49
+ n , err = ac .rc .Read (p )
50
+ if err != nil {
51
+ _ = ac .rc .Close ()
52
+ }
53
+ return
54
+ }
55
+
56
+ func AutoCloser (rc io.ReadCloser ) io.Reader {
57
+ return & autocloser {rc }
58
+ }
59
+
60
+ type multi struct {
61
+ io.Writer
62
+ cs []io.Closer
63
+ }
64
+
65
+ func MultiWriteCloser (ws ... io.Writer ) io.WriteCloser {
66
+ m := & multi {Writer : io .MultiWriter (ws ... )}
67
+ for _ , w := range ws {
68
+ if c , ok := w .(io.Closer ); ok {
69
+ m .cs = append (m .cs , c )
70
+ }
71
+ }
72
+ return m
73
+ }
74
+
75
+ func (m * multi ) Close () error {
76
+ var first error
77
+ for _ , c := range m .cs {
78
+ if err := c .Close (); err != nil && first == nil {
79
+ first = err
80
+ }
81
+ }
82
+ return first
83
+ }
84
+
85
+ type snapshotInfo struct {
86
+ digest string
87
+ size int64
88
+ filename string
89
+ latestIndex string
90
+ latestLocation string
91
+ }
92
+
29
93
var cmdCreate = & cli.Command {
30
94
Name : "create" ,
31
95
Usage : "create a chain export" ,
@@ -276,12 +340,12 @@ var cmdCreate = &cli.Command{
276
340
return xerrors .Errorf ("failed to aquire lock" )
277
341
}
278
342
279
- r , w := io .Pipe ()
280
- h := sha256 . New ()
343
+ rr , wr := io .Pipe ()
344
+ rc , wc := io . Pipe ()
281
345
282
- tr := io . TeeReader ( r , h )
346
+ mw := MultiWriteCloser ( wr , wc )
283
347
284
- e := export .NewExport (node , tsk , abi .ChainEpoch (flagStaterootCount ), true , w )
348
+ e := export .NewExport (node , tsk , abi .ChainEpoch (flagStaterootCount ), true , mw )
285
349
errCh := make (chan error )
286
350
go func () {
287
351
errCh <- e .Export (ctx )
@@ -328,18 +392,27 @@ var cmdCreate = &cli.Command{
328
392
}
329
393
}()
330
394
331
- var snapshotSize int64
332
-
333
395
if flagDiscard {
334
396
logger .Infow ("discarding output" )
335
- io .Copy (ioutil .Discard , r )
336
- snapshotSize = 0
397
+ g , _ := errgroup .WithContext (ctx )
398
+
399
+ g .Go (func () error {
400
+ _ , err := io .Copy (io .Discard , rr )
401
+ return err
402
+ })
403
+ g .Go (func () error {
404
+ _ , err := io .Copy (io .Discard , rc )
405
+ return err
406
+ })
407
+
408
+ if err := g .Wait (); err != nil {
409
+ return err
410
+ }
337
411
338
412
if err := <- errCh ; err != nil {
339
413
return err
340
414
}
341
415
} else {
342
-
343
416
host := u .Hostname ()
344
417
port := u .Port ()
345
418
if port == "" {
@@ -355,74 +428,172 @@ var cmdCreate = &cli.Command{
355
428
Creds : credentials .NewStaticV4 (flagBucketAccessKey , flagBucketSecretKey , "" ),
356
429
Secure : u .Scheme == "https" ,
357
430
})
431
+ if err != nil {
432
+ return err
433
+ }
358
434
359
435
t := export .TimeAtHeight (gtp , height , 30 * time .Second )
360
436
361
437
name := fmt .Sprintf ("%d_%s" , height , t .Format ("2006_01_02T15_04_05Z" ))
362
438
363
439
logger .Infow ("object" , "name" , name )
364
440
365
- info , err := minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s.car" , flagNamePrefix , name ), tr , - 1 , minio.PutObjectOptions {
366
- ContentDisposition : fmt .Sprintf ("attachment; filename=\" %s.car\" " , name ),
367
- ContentType : "application/octet-stream" ,
441
+ g , ctxGroup := errgroup .WithContext (ctx )
442
+ var siRaw * snapshotInfo
443
+ var siCompressed * snapshotInfo
444
+ g .Go (func () error {
445
+ var err error
446
+ siRaw , err = runUploadRaw (ctxGroup , minioClient , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID , bt , rr )
447
+ return err
368
448
})
369
- if err != nil {
370
- return fmt .Errorf ("failed to upload object (%s): %w" , fmt .Sprintf ("%s%s.car" , flagNamePrefix , name ), err )
449
+ g .Go (func () error {
450
+ var err error
451
+ siCompressed , err = runUploadCompressed (ctxGroup , minioClient , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID , bt , rc )
452
+ return err
453
+ })
454
+ if err := g .Wait (); err != nil {
455
+ return err
371
456
}
372
-
373
- logger .Infow ("snapshot upload" ,
374
- "bucket" , info .Bucket ,
375
- "key" , info .Key ,
376
- "etag" , info .ETag ,
377
- "size" , info .Size ,
378
- "location" , info .Location ,
379
- "version_id" , info .VersionID ,
380
- "expiration" , info .Expiration ,
381
- "expiration_rule_id" , info .ExpirationRuleID ,
382
- )
383
- snapshotSize = info .Size
384
-
385
457
if err := <- errCh ; err != nil {
386
458
return err
387
459
}
388
460
389
- latestLocation , err := url .JoinPath (flagRetrievalEndpointPrefix , info .Key )
390
- if err != nil {
391
- logger .Errorw ("failed to join request path" , "request_prefix" , flagRetrievalEndpointPrefix , "key" , info .Key )
392
- return fmt .Errorf ("failed to join request path: %w" , err )
461
+ sis := []* snapshotInfo {siRaw , siCompressed }
462
+
463
+ var sb strings.Builder
464
+ for _ , x := range sis {
465
+ fmt .Fprintf (& sb , "%s *%s\n " , x .digest , x .filename )
393
466
}
394
467
395
- sha256sum := fmt . Sprintf ( "%x *%s.car \n " , h . Sum ( nil ), name )
468
+ sha256sum := sb . String ( )
396
469
397
- info , err = minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s.sha256sum" , flagNamePrefix , name ), strings .NewReader (sha256sum ), - 1 , minio.PutObjectOptions {
470
+ _ , err = minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s.sha256sum" , flagNamePrefix , name ), strings .NewReader (sha256sum ), - 1 , minio.PutObjectOptions {
398
471
ContentDisposition : fmt .Sprintf ("attachment; filename=\" %s.sha256sum\" " , name ),
399
472
ContentType : "text/plain" ,
400
473
})
401
474
if err != nil {
402
475
logger .Errorw ("failed to write sha256sum" , "object" , fmt .Sprintf ("%s%s.sha256sum" , flagNamePrefix , name ), "err" , err )
403
476
}
404
477
405
- info , err = minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%slatest" , flagNamePrefix ), strings .NewReader (latestLocation ), - 1 , minio.PutObjectOptions {
406
- ContentType : "text/plain" ,
407
- })
408
- if err != nil {
409
- return fmt .Errorf ("failed to write latest" , "object" , fmt .Sprintf ("%slatest" , flagNamePrefix ), "err" , err )
410
- }
478
+ for _ , x := range sis {
479
+ info , err := minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s" , flagNamePrefix , x .latestIndex ), strings .NewReader (x .latestLocation ), - 1 , minio.PutObjectOptions {
480
+ ContentType : "text/plain" ,
481
+ })
482
+ if err != nil {
483
+ return fmt .Errorf ("failed to write latest" , "object" , fmt .Sprintf ("%slatest" , flagNamePrefix ), "err" , err )
484
+ }
411
485
412
- logger .Infow ("latest upload" ,
413
- "bucket" , info .Bucket ,
414
- "key" , info .Key ,
415
- "etag" , info .ETag ,
416
- "size" , info .Size ,
417
- "location" , info .Location ,
418
- "version_id" , info .VersionID ,
419
- "expiration" , info .Expiration ,
420
- "expiration_rule_id" , info .ExpirationRuleID ,
421
- )
486
+ logger .Infow ("latest upload" ,
487
+ "bucket" , info .Bucket ,
488
+ "key" , info .Key ,
489
+ "etag" , info .ETag ,
490
+ "size" , info .Size ,
491
+ "location" , info .Location ,
492
+ "version_id" , info .VersionID ,
493
+ "expiration" , info .Expiration ,
494
+ "expiration_rule_id" , info .ExpirationRuleID ,
495
+ )
496
+ }
422
497
}
423
498
424
- logger .Infow ("snapshot job finished" , "digiest" , fmt . Sprintf ( "%x" , h . Sum ( nil )), " elapsed" , int64 (time .Since (bt ).Round (time .Second ).Seconds ()), "size" , snapshotSize , "peer" , peerID )
499
+ logger .Infow ("snapshot job finished" , "elapsed" , int64 (time .Since (bt ).Round (time .Second ).Seconds ()), "peer" , peerID )
425
500
426
501
return nil
427
502
},
428
503
}
504
+
505
+ func runUploadRaw (ctx context.Context , minioClient * minio.Client , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID string , bt time.Time , source io.Reader ) (* snapshotInfo , error ) {
506
+ h := sha256 .New ()
507
+ r := io .TeeReader (source , h )
508
+
509
+ filename := fmt .Sprintf ("%s.car" , name )
510
+
511
+ info , err := minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s" , flagNamePrefix , filename ), r , - 1 , minio.PutObjectOptions {
512
+ ContentDisposition : fmt .Sprintf ("attachment; filename=\" %s\" " , filename ),
513
+ ContentType : "application/octet-stream" ,
514
+ })
515
+ if err != nil {
516
+ return nil , fmt .Errorf ("failed to upload object (%s): %w" , fmt .Sprintf ("%s%s" , flagNamePrefix , filename ), err )
517
+ }
518
+
519
+ logger .Infow ("snapshot upload" ,
520
+ "bucket" , info .Bucket ,
521
+ "key" , info .Key ,
522
+ "etag" , info .ETag ,
523
+ "size" , info .Size ,
524
+ "location" , info .Location ,
525
+ "version_id" , info .VersionID ,
526
+ "expiration" , info .Expiration ,
527
+ "expiration_rule_id" , info .ExpirationRuleID ,
528
+ )
529
+
530
+ snapshotSize := info .Size
531
+
532
+ latestLocation , err := url .JoinPath (flagRetrievalEndpointPrefix , info .Key )
533
+ if err != nil {
534
+ logger .Errorw ("failed to join request path" , "request_prefix" , flagRetrievalEndpointPrefix , "key" , info .Key )
535
+ return nil , fmt .Errorf ("failed to join request path: %w" , err )
536
+ }
537
+
538
+ digest := fmt .Sprintf ("%x" , h .Sum (nil ))
539
+
540
+ return & snapshotInfo {
541
+ digest : digest ,
542
+ size : snapshotSize ,
543
+ filename : filename ,
544
+ latestIndex : "latest" ,
545
+ latestLocation : latestLocation ,
546
+ }, nil
547
+ }
548
+
549
+ func runUploadCompressed (ctx context.Context , minioClient * minio.Client , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID string , bt time.Time , source io.Reader ) (* snapshotInfo , error ) {
550
+
551
+ r1 , w1 := io .Pipe ()
552
+ go func () {
553
+ Compress (source , w1 )
554
+ w1 .Close ()
555
+ }()
556
+ h := sha256 .New ()
557
+ r := io .TeeReader (r1 , h )
558
+
559
+ filename := fmt .Sprintf ("%s.car.zst" , name )
560
+
561
+ info , err := minioClient .PutObject (ctx , flagBucket , fmt .Sprintf ("%s%s" , flagNamePrefix , filename ), r , - 1 , minio.PutObjectOptions {
562
+ ContentDisposition : fmt .Sprintf ("attachment; filename=\" %s\" " , filename ),
563
+ ContentType : "application/octet-stream" ,
564
+ })
565
+ if err != nil {
566
+ return nil , fmt .Errorf ("failed to upload object (%s): %w" , fmt .Sprintf ("%s%s" , flagNamePrefix , filename ), err )
567
+ }
568
+
569
+ logger .Infow ("compressed snapshot upload" ,
570
+ "bucket" , info .Bucket ,
571
+ "key" , info .Key ,
572
+ "etag" , info .ETag ,
573
+ "size" , info .Size ,
574
+ "location" , info .Location ,
575
+ "version_id" , info .VersionID ,
576
+ "expiration" , info .Expiration ,
577
+ "expiration_rule_id" , info .ExpirationRuleID ,
578
+ )
579
+
580
+ snapshotSize := info .Size
581
+
582
+ latestLocation , err := url .JoinPath (flagRetrievalEndpointPrefix , info .Key )
583
+ if err != nil {
584
+ logger .Errorw ("failed to join request path" , "request_prefix" , flagRetrievalEndpointPrefix , "key" , info .Key )
585
+ return nil , fmt .Errorf ("failed to join request path: %w" , err )
586
+ }
587
+
588
+ digest := fmt .Sprintf ("%x" , h .Sum (nil ))
589
+
590
+ logger .Infow ("compressed snapshot job finished" , "digiest" , digest , "elapsed" , int64 (time .Since (bt ).Round (time .Second ).Seconds ()), "size" , snapshotSize , "peer" , peerID )
591
+
592
+ return & snapshotInfo {
593
+ digest : digest ,
594
+ size : snapshotSize ,
595
+ filename : filename ,
596
+ latestIndex : "latest.zst" ,
597
+ latestLocation : latestLocation ,
598
+ }, nil
599
+ }
0 commit comments