Skip to content

Commit 1754477

Browse files
author
Dave Brotherstone
committed
Add parallel uploads
This adds a parallel upload flag that enables uploading multiple files in parallel. This is considerably faster for uploading many files.
1 parent 665db05 commit 1754477

File tree

2 files changed

+112
-81
lines changed

2 files changed

+112
-81
lines changed

main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ func main() {
151151
Usage: "OIDC token for assuming role via web identity",
152152
EnvVar: "PLUGIN_OIDC_TOKEN_ID",
153153
},
154+
cli.IntFlag{
155+
Name: "parallel-uploads",
156+
Usage: "number of parallel uploads",
157+
EnvVar: "PLUGIN_PARALLEL_UPLOADS",
158+
},
154159
}
155160

156161
if err := app.Run(os.Args); err != nil {
@@ -163,7 +168,6 @@ func run(c *cli.Context) error {
163168
_ = godotenv.Load(c.String("env-file"))
164169
}
165170

166-
167171
plugin := Plugin{
168172
Endpoint: c.String("endpoint"),
169173
Key: c.String("access-key"),
@@ -189,8 +193,8 @@ func run(c *cli.Context) error {
189193
DryRun: c.Bool("dry-run"),
190194
ExternalID: c.String("external-id"),
191195
IdToken: c.String("oidc-token-id"),
196+
ParallelUploads: c.Int("parallel-uploads"),
192197
}
193198

194199
return plugin.Exec()
195200
}
196-

plugin.go

Lines changed: 106 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"regexp"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/aws/aws-sdk-go/aws"
@@ -102,6 +103,94 @@ type Plugin struct {
102103

103104
// set OIDC ID Token to retrieve temporary credentials
104105
IdToken string
106+
107+
ParallelUploads int
108+
}
109+
110+
func (p *Plugin) uploadFile(client *s3.S3, match string) error {
111+
112+
target := resolveKey(p.Target, match, p.StripPrefix)
113+
114+
contentType := matchExtension(match, p.ContentType)
115+
contentEncoding := matchExtension(match, p.ContentEncoding)
116+
cacheControl := matchExtension(match, p.CacheControl)
117+
118+
if contentType == "" {
119+
contentType = mime.TypeByExtension(filepath.Ext(match))
120+
121+
if contentType == "" {
122+
contentType = "application/octet-stream"
123+
}
124+
}
125+
126+
// log file for debug purposes.
127+
log.WithFields(log.Fields{
128+
"name": match,
129+
"bucket": p.Bucket,
130+
"target": target,
131+
}).Info("Uploading file")
132+
133+
// when executing a dry-run we exit because we don't actually want to
134+
// upload the file to S3.
135+
if p.DryRun {
136+
return nil
137+
}
138+
139+
f, err := os.Open(match)
140+
if err != nil {
141+
log.WithFields(log.Fields{
142+
"error": err,
143+
"file": match,
144+
}).Error("Problem opening file")
145+
return err
146+
}
147+
defer f.Close()
148+
149+
putObjectInput := &s3.PutObjectInput{
150+
Body: f,
151+
Bucket: &(p.Bucket),
152+
Key: &target,
153+
}
154+
155+
if contentType != "" {
156+
putObjectInput.ContentType = aws.String(contentType)
157+
}
158+
159+
if contentEncoding != "" {
160+
putObjectInput.ContentEncoding = aws.String(contentEncoding)
161+
}
162+
163+
if cacheControl != "" {
164+
putObjectInput.CacheControl = aws.String(cacheControl)
165+
}
166+
167+
if p.Encryption != "" {
168+
putObjectInput.ServerSideEncryption = aws.String(p.Encryption)
169+
}
170+
171+
if p.StorageClass != "" {
172+
putObjectInput.StorageClass = &(p.StorageClass)
173+
}
174+
175+
if p.Access != "" {
176+
putObjectInput.ACL = &(p.Access)
177+
}
178+
179+
_, err = client.PutObject(putObjectInput)
180+
181+
if err != nil {
182+
log.WithFields(log.Fields{
183+
"name": match,
184+
"bucket": p.Bucket,
185+
"target": target,
186+
"error": err,
187+
}).Error("Could not upload file")
188+
189+
return err
190+
}
191+
f.Close()
192+
193+
return nil
105194
}
106195

107196
// Exec runs the plugin
@@ -137,96 +226,34 @@ func (p *Plugin) Exec() error {
137226
}).Error("Could not match files")
138227
return err
139228
}
229+
sem := make(chan bool, max(1, p.ParallelUploads))
230+
errChan := make(chan error)
231+
var wg sync.WaitGroup
140232

141233
for _, match := range matches {
142234
// skip directories
143235
if isDir(match, matches) {
144236
continue
145237
}
146-
147-
target := resolveKey(p.Target, match, p.StripPrefix)
148-
149-
contentType := matchExtension(match, p.ContentType)
150-
contentEncoding := matchExtension(match, p.ContentEncoding)
151-
cacheControl := matchExtension(match, p.CacheControl)
152-
153-
if contentType == "" {
154-
contentType = mime.TypeByExtension(filepath.Ext(match))
155-
156-
if contentType == "" {
157-
contentType = "application/octet-stream"
158-
}
159-
}
160-
161-
// log file for debug purposes.
162-
log.WithFields(log.Fields{
163-
"name": match,
164-
"bucket": p.Bucket,
165-
"target": target,
166-
}).Info("Uploading file")
167-
168-
// when executing a dry-run we exit because we don't actually want to
169-
// upload the file to S3.
170-
if p.DryRun {
171-
continue
172-
}
173-
174-
f, err := os.Open(match)
175-
if err != nil {
176-
log.WithFields(log.Fields{
177-
"error": err,
178-
"file": match,
179-
}).Error("Problem opening file")
238+
select {
239+
case err := <-errChan:
180240
return err
241+
case sem <- true:
181242
}
182-
defer f.Close()
183243

184-
putObjectInput := &s3.PutObjectInput{
185-
Body: f,
186-
Bucket: &(p.Bucket),
187-
Key: &target,
188-
}
189-
190-
if contentType != "" {
191-
putObjectInput.ContentType = aws.String(contentType)
192-
}
193-
194-
if contentEncoding != "" {
195-
putObjectInput.ContentEncoding = aws.String(contentEncoding)
196-
}
197-
198-
if cacheControl != "" {
199-
putObjectInput.CacheControl = aws.String(cacheControl)
200-
}
201-
202-
if p.Encryption != "" {
203-
putObjectInput.ServerSideEncryption = aws.String(p.Encryption)
204-
}
205-
206-
if p.StorageClass != "" {
207-
putObjectInput.StorageClass = &(p.StorageClass)
208-
}
209-
210-
if p.Access != "" {
211-
putObjectInput.ACL = &(p.Access)
212-
}
213-
214-
_, err = client.PutObject(putObjectInput)
215-
216-
if err != nil {
217-
log.WithFields(log.Fields{
218-
"name": match,
219-
"bucket": p.Bucket,
220-
"target": target,
221-
"error": err,
222-
}).Error("Could not upload file")
223-
224-
return err
225-
}
226-
f.Close()
244+
wg.Add(1)
245+
go func() {
246+
defer wg.Done()
247+
defer func() { <-sem }() // release
248+
err := p.uploadFile(client, match)
249+
if err != nil {
250+
errChan <- err
251+
}
252+
}()
227253
}
228-
254+
wg.Wait()
229255
return nil
256+
230257
}
231258

232259
// matches is a helper function that returns a list of all files matching the

0 commit comments

Comments
 (0)