Skip to content

Commit 517ea5d

Browse files
authored
feat: support stream uploading file (#3)
Signed-off-by: LinPr <[email protected]>
1 parent 3e7de74 commit 517ea5d

File tree

7 files changed

+179
-29
lines changed

7 files changed

+179
-29
lines changed

cmd/du/du.go

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,119 @@
1-
package cmd
1+
package du
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
9+
s3store "github.com/LinPr/s6cmd/storage/s3"
10+
"github.com/LinPr/s6cmd/storage/uri"
11+
"github.com/go-playground/validator/v10"
12+
"github.com/spf13/cobra"
13+
)
14+
15+
func NewDuCmd() *cobra.Command {
16+
o := newOptions()
17+
cmd := cobra.Command{
18+
Use: "du [flags] <target>",
19+
Short: "calculate total size of bucket or objects",
20+
Args: cobra.ExactArgs(1),
21+
Run: func(cmd *cobra.Command, args []string) {
22+
if len(args) > 0 {
23+
o.S3Uri = args[0]
24+
}
25+
if err := o.complete(); err != nil {
26+
fmt.Fprintf(os.Stderr, "err: %v\n", err)
27+
return
28+
}
29+
if err := o.validate(); err != nil {
30+
fmt.Fprintf(os.Stderr, "err: %v\n", err)
31+
return
32+
}
33+
if err := o.run(); err != nil {
34+
fmt.Fprintf(os.Stderr, "err: %v\n", err)
35+
return
36+
}
37+
},
38+
}
39+
40+
cmd.Flags().BoolVarP(&o.DryRun, "dryRun", "n", false, "show what would be transferred")
41+
42+
return &cmd
43+
}
44+
45+
type Args struct {
46+
S3Uri string `validate:"omitempty"`
47+
}
48+
type Flags struct {
49+
DryRun bool `json:"DryRun" yaml:"DryRun"`
50+
}
51+
52+
type Options struct {
53+
Args
54+
Flags
55+
}
56+
57+
func newOptions() *Options {
58+
return &Options{}
59+
}
60+
61+
func (o *Options) complete() error {
62+
// 使用 viper 获取到最终生效的配置 flag > env > config > default
63+
return nil
64+
}
65+
66+
func (o *Options) validate() error {
67+
if err := validator.New().Struct(o); err != nil {
68+
return err
69+
}
70+
71+
return nil
72+
}
73+
74+
func (o *Options) run() error {
75+
j, _ := json.Marshal(o)
76+
fmt.Fprintf(os.Stdout, "options: %s\n", string(j))
77+
// return nil
78+
79+
cli, err := s3store.NewS3Client(context.TODO())
80+
if err != nil {
81+
return err
82+
}
83+
84+
parsedUri, err := uri.ParseS3Url(o.S3Uri)
85+
if err != nil {
86+
return err
87+
}
88+
if parsedUri.GetBucket() == "" {
89+
return fmt.Errorf("bucket is required")
90+
}
91+
92+
return listObjects(cli, parsedUri.GetBucket(), parsedUri.GetKey())
93+
}
94+
95+
func listObjects(cli *s3store.S3Store, bucket, key string) error {
96+
objs, err := cli.ListObjects(context.TODO(), bucket, key)
97+
if err != nil {
98+
return err
99+
}
100+
var size int64
101+
for _, obj := range objs.Contents {
102+
size += *obj.Size
103+
}
104+
unit := "bytes"
105+
if size > 1024 {
106+
size = size / 1024
107+
unit = "KB"
108+
}
109+
if size > 1024 {
110+
size = size / 1024
111+
unit = "MB"
112+
}
113+
if size > 1024 {
114+
size = size / 1024
115+
unit = "GB"
116+
}
117+
fmt.Printf("Total size: %d %s\n", size, unit)
118+
return nil
119+
}

cmd/ls/ls.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (o *Options) run() error {
9393
return listBuckets(cli)
9494
}
9595

96-
return listObjects(cli, o.S3Uri)
96+
return listObjects(cli, parsedUri.GetBucket(), parsedUri.GetKey())
9797
}
9898

9999
func listBuckets(cli *s3store.S3Store) error {
@@ -107,8 +107,8 @@ func listBuckets(cli *s3store.S3Store) error {
107107
return nil
108108
}
109109

110-
func listObjects(cli *s3store.S3Store, s3uri string) error {
111-
objs, err := cli.ListObjects(context.TODO(), s3uri)
110+
func listObjects(cli *s3store.S3Store, bucket, key string) error {
111+
objs, err := cli.ListObjects(context.TODO(), bucket, key)
112112
if err != nil {
113113
return err
114114
}

cmd/put/put.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ func (o *Options) run() error {
8686
if err != nil {
8787
return err
8888
}
89+
if o.localFile == "-" {
90+
if _, err := s.UploadFromStdin(context.TODO(), parsedUri.GetBucket(), parsedUri.GetKey()); err != nil {
91+
return err
92+
}
93+
return nil
94+
}
8995

9096
if _, err := s.UploadFile(context.TODO(), o.localFile, parsedUri.GetBucket(), parsedUri.GetKey()); err != nil {
9197
return err

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"sync"
1010

11+
"github.com/LinPr/s6cmd/cmd/du"
1112
"github.com/LinPr/s6cmd/cmd/get"
1213
"github.com/LinPr/s6cmd/cmd/ls"
1314
"github.com/LinPr/s6cmd/cmd/mb"
@@ -137,4 +138,5 @@ func registerSubCommands(cmd *cobra.Command) {
137138
cmd.AddCommand(get.NewGetCmd())
138139
cmd.AddCommand(put.NewPutCmd())
139140
cmd.AddCommand(stat.NewStatCmd())
141+
cmd.AddCommand(du.NewDuCmd())
140142
}

storage/s3/object.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"os"
1111
"time"
1212

13-
"github.com/LinPr/s6cmd/storage/uri"
14-
1513
"github.com/aws/aws-sdk-go-v2/aws"
1614
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1715
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -217,15 +215,11 @@ func (s3store *S3Store) DeleteObjects(ctx context.Context, bucketName string, ob
217215
return err
218216
}
219217

220-
func (s3store *S3Store) ListObjects(ctx context.Context, s3uri string) (*s3.ListObjectsV2Output, error) {
221-
parsedUri, err := uri.ParseS3Url(s3uri)
222-
if err != nil {
223-
return nil, err
224-
}
218+
func (s3store *S3Store) ListObjects(ctx context.Context, bucket, key string) (*s3.ListObjectsV2Output, error) {
225219

226220
return s3store.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
227-
Bucket: aws.String(parsedUri.GetBucket()),
228-
Prefix: aws.String(parsedUri.GetKey()),
221+
Bucket: aws.String(bucket),
222+
Prefix: aws.String(key),
229223
})
230224
}
231225

@@ -276,6 +270,15 @@ func (s3store *S3Store) PutObject(ctx context.Context, r io.Reader, bucketName s
276270
return output, err
277271
}
278272

273+
func (s3store *S3Store) UploadObject(ctx context.Context, reader io.Reader, bucketName string, key string) (*manager.UploadOutput, error) {
274+
return s3store.uploader.Upload(ctx, &s3.PutObjectInput{
275+
Bucket: aws.String(bucketName),
276+
Key: aws.String(key),
277+
Body: reader,
278+
})
279+
280+
}
281+
279282
func (s3store *S3Store) HeadObject(ctx context.Context, bucketName string, objectKey string) (*s3.HeadObjectOutput, error) {
280283
return s3store.client.HeadObject(ctx, &s3.HeadObjectInput{
281284
Bucket: aws.String(bucketName),

storage/s3/s3store.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/aws/aws-sdk-go-v2/aws"
99
"github.com/aws/aws-sdk-go-v2/config"
1010
"github.com/aws/aws-sdk-go-v2/credentials"
11+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1112
"github.com/aws/aws-sdk-go-v2/service/s3"
1213
)
1314

@@ -30,7 +31,8 @@ type Options struct {
3031
}
3132

3233
type S3Store struct {
33-
client *s3.Client
34+
client *s3.Client
35+
uploader *manager.Uploader
3436
}
3537

3638
func NewS3Client(ctx context.Context) (*S3Store, error) {
@@ -57,26 +59,20 @@ func NewS3Client(ctx context.Context) (*S3Store, error) {
5759
// provider := NewAwsS3Provider(envCredential)
5860

5961
// Load default config with custom endpoint resolver
60-
conf, err := config.LoadDefaultConfig(ctx,
61-
config.WithRegion("cn-hangzhou"),
62-
// config.WithEndpointResolverWithOptions(customResolver),
63-
// config.WithCredentialsProvider(provider),
64-
)
62+
conf, err := config.LoadDefaultConfig(ctx) // config.WithRegion("cn-hangzhou"),
63+
// config.WithEndpointResolverWithOptions(customResolver),
64+
// config.WithCredentialsProvider(provider),
65+
6566
if err != nil {
6667
return nil, err
6768
}
6869

6970
client := s3.NewFromConfig(conf)
7071

71-
// sdkConfig, err := s3.LoadDefaultConfig(ctx, s3.WithEndpointResolverV2(customResolver))
72-
// if err != nil {
73-
// return nil, err
74-
// }
75-
76-
// client := s3.NewFromConfig(sdkConfig)
77-
72+
uploader := manager.NewUploader(client)
7873
return &S3Store{
79-
client: client,
74+
client: client,
75+
uploader: uploader,
8076
}, nil
8177
}
8278

storage/storage.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
fsstore "github.com/LinPr/s6cmd/storage/fs"
1212
s3store "github.com/LinPr/s6cmd/storage/s3"
13+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1314
"github.com/aws/aws-sdk-go-v2/service/s3"
1415
)
1516

@@ -137,6 +138,11 @@ func (s *Storage) DownloadFile(ctx context.Context, bucketName string, objectKey
137138
}
138139
defer result.Body.Close()
139140

141+
if localFile == "" || localFile == "-" {
142+
_, err = io.Copy(os.Stdout, result.Body)
143+
return err
144+
}
145+
140146
file, err := s.local.Create(localFile)
141147
if err != nil {
142148
log.Printf("Couldn't create file %v. err: %v\n", localFile, err)
@@ -148,14 +154,33 @@ func (s *Storage) DownloadFile(ctx context.Context, bucketName string, objectKey
148154
return err
149155
}
150156

151-
func (s *Storage) UploadFile(ctx context.Context, fileName string, bucketName string, objectKey string) (*s3.PutObjectOutput, error) {
157+
// stdin is an io.Reader adapter for os.File, enabling it to function solely as
158+
// an io.Reader. The AWS SDK, which accepts an io.Reader for multipart uploads,
159+
// will attempt to use io.Seek if the reader supports it. However, os.Stdin is
160+
// a specific type of file that can not seekable.
161+
type stdin struct {
162+
file *os.File
163+
}
164+
165+
func (s *stdin) Read(p []byte) (n int, err error) {
166+
return s.file.Read(p)
167+
}
152168

169+
// func (s *stdin) Seek(offset int64, whence int) (int64, error) {
170+
// return s.file.Seek(offset, whence)
171+
// }
172+
173+
func (s *Storage) UploadFile(ctx context.Context, fileName string, bucketName string, objectKey string) (*s3.PutObjectOutput, error) {
153174
file, err := s.local.Create(fileName)
154175
if err != nil {
155176
return nil, err
156177
}
157-
158178
defer file.Close()
179+
159180
return s.remote.PutObject(ctx, file, bucketName, objectKey)
181+
}
160182

183+
func (s *Storage) UploadFromStdin(ctx context.Context, bucketName string, objectKey string) (*manager.UploadOutput, error) {
184+
stdinReader := &stdin{file: os.Stdin}
185+
return s.remote.UploadObject(ctx, stdinReader, bucketName, objectKey)
161186
}

0 commit comments

Comments
 (0)