Skip to content

Commit 8c5cbdc

Browse files
authored
feat(object): parallelize versions destruction (#1711)
1 parent 43c4a22 commit 8c5cbdc

File tree

5 files changed

+400
-179
lines changed

5 files changed

+400
-179
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/hashicorp/aws-sdk-go-base v1.1.0
1010
github.com/hashicorp/awspolicyequivalence v1.6.0
1111
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
12+
github.com/hashicorp/go-multierror v1.1.1
1213
github.com/hashicorp/go-retryablehttp v0.7.1
1314
github.com/hashicorp/terraform-plugin-log v0.7.0
1415
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1
@@ -33,7 +34,6 @@ require (
3334
github.com/hashicorp/go-checkpoint v0.5.0 // indirect
3435
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
3536
github.com/hashicorp/go-hclog v1.2.1 // indirect
36-
github.com/hashicorp/go-multierror v1.1.1 // indirect
3737
github.com/hashicorp/go-plugin v1.4.6 // indirect
3838
github.com/hashicorp/go-uuid v1.0.3 // indirect
3939
github.com/hashicorp/go-version v1.6.0 // indirect

internal/workerpool.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package internal
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type WorkerPoolTask func() error
8+
9+
type WorkerPool struct {
10+
tasksToDispatch chan WorkerPoolTask
11+
tasksToRun chan WorkerPoolTask
12+
errors []error
13+
errorsMutex sync.Mutex
14+
tasksWaitingGroup sync.WaitGroup
15+
}
16+
17+
func NewWorkerPool(size int) *WorkerPool {
18+
p := &WorkerPool{
19+
tasksToDispatch: make(chan WorkerPoolTask),
20+
tasksToRun: make(chan WorkerPoolTask, size),
21+
}
22+
23+
for i := 0; i < size; i++ {
24+
go p.worker()
25+
}
26+
27+
go p.dispatcher()
28+
29+
return p
30+
}
31+
32+
func (p *WorkerPool) dispatcher() {
33+
var pendingTasks []WorkerPoolTask
34+
35+
for {
36+
if len(pendingTasks) > 0 {
37+
select {
38+
case p.tasksToRun <- pendingTasks[0]:
39+
pendingTasks = pendingTasks[1:]
40+
default:
41+
}
42+
}
43+
44+
select {
45+
case task, ok := <-p.tasksToDispatch:
46+
if !ok {
47+
if len(pendingTasks) > 0 {
48+
continue
49+
}
50+
51+
close(p.tasksToRun)
52+
return
53+
}
54+
55+
select {
56+
case p.tasksToRun <- task:
57+
default:
58+
pendingTasks = append(pendingTasks, task)
59+
}
60+
default:
61+
}
62+
}
63+
}
64+
65+
func (p *WorkerPool) worker() {
66+
for task := range p.tasksToRun {
67+
err := task()
68+
if err != nil {
69+
p.errorsMutex.Lock()
70+
p.errors = append(p.errors, err)
71+
p.errorsMutex.Unlock()
72+
}
73+
74+
p.tasksWaitingGroup.Done()
75+
}
76+
}
77+
78+
func (p *WorkerPool) AddTask(task WorkerPoolTask) {
79+
p.tasksWaitingGroup.Add(1)
80+
p.tasksToDispatch <- task
81+
}
82+
83+
func (p *WorkerPool) CloseAndWait() []error {
84+
close(p.tasksToDispatch)
85+
p.tasksWaitingGroup.Wait()
86+
87+
return p.errors
88+
}

internal/workerpool_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package internal
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestWorkerPoolSimple(t *testing.T) {
13+
pool := NewWorkerPool(2)
14+
15+
pool.AddTask(func() error {
16+
return nil
17+
})
18+
19+
pool.AddTask(func() error {
20+
return errors.New("error")
21+
})
22+
23+
pool.AddTask(func() error {
24+
return nil
25+
})
26+
27+
errs := pool.CloseAndWait()
28+
29+
assert.Equal(t, 1, len(errs))
30+
assert.Equal(t, "error", errs[0].Error())
31+
}
32+
33+
func TestWorkerPoolWaitTime(t *testing.T) {
34+
pool := NewWorkerPool(2)
35+
36+
pool.AddTask(func() error {
37+
time.Sleep(50 * time.Millisecond)
38+
return nil
39+
})
40+
41+
pool.AddTask(func() error {
42+
time.Sleep(50 * time.Millisecond)
43+
return errors.New("error")
44+
})
45+
46+
pool.AddTask(func() error {
47+
time.Sleep(50 * time.Millisecond)
48+
return nil
49+
})
50+
51+
errs := pool.CloseAndWait()
52+
53+
assert.Equal(t, 1, len(errs))
54+
assert.Equal(t, "error", errs[0].Error())
55+
}
56+
57+
func TestWorkerPoolWaitTimeMultiple(t *testing.T) {
58+
pool := NewWorkerPool(5)
59+
iterations := 20
60+
61+
for i := 0; i < iterations; i++ {
62+
copyOfI := i
63+
64+
pool.AddTask(func() error {
65+
time.Sleep(100 * time.Millisecond)
66+
67+
if copyOfI%2 == 0 {
68+
return fmt.Errorf("error %d", copyOfI)
69+
}
70+
71+
return nil
72+
})
73+
}
74+
75+
errs := pool.CloseAndWait()
76+
77+
assert.Equal(t, iterations/2, len(errs))
78+
79+
for i := 0; i < iterations; i++ {
80+
if i%2 == 0 {
81+
found := false
82+
for _, err := range errs {
83+
if err.Error() == fmt.Sprintf("error %d", i) {
84+
found = true
85+
break
86+
}
87+
}
88+
89+
assert.True(t, found)
90+
}
91+
}
92+
}

scaleway/helpers_object.go

Lines changed: 71 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"hash/crc32"
99
"net/http"
1010
"os"
11+
"runtime"
1112
"strings"
1213
"time"
1314

@@ -18,15 +19,19 @@ import (
1819
"github.com/aws/aws-sdk-go/service/s3"
1920
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
2021
awspolicy "github.com/hashicorp/awspolicyequivalence"
22+
"github.com/hashicorp/go-multierror"
2123
"github.com/hashicorp/terraform-plugin-log/tflog"
2224
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
2325
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
2426
"github.com/scaleway/scaleway-sdk-go/scw"
27+
"github.com/scaleway/terraform-provider-scaleway/v2/internal"
2528
)
2629

2730
const (
2831
defaultObjectBucketTimeout = 10 * time.Minute
2932
retryOnAWSAPI = 2 * time.Minute
33+
34+
maxObjectVersionDeletionWorkers = 8
3035
)
3136

3237
func newS3Client(httpClient *http.Client, region, accessKey, secretKey string) (*s3.S3, error) {
@@ -305,58 +310,94 @@ func removeS3ObjectVersionLegalHold(conn *s3.S3, bucketName string, objectVersio
305310
}
306311

307312
func deleteS3ObjectVersions(ctx context.Context, conn *s3.S3, bucketName string, force bool) error {
308-
var err error
313+
var globalErr error
309314
listInput := &s3.ListObjectVersionsInput{
310315
Bucket: scw.StringPtr(bucketName),
311316
}
317+
318+
deletionWorkers := runtime.NumCPU()
319+
if deletionWorkers > maxObjectVersionDeletionWorkers {
320+
deletionWorkers = maxObjectVersionDeletionWorkers
321+
}
322+
312323
listErr := conn.ListObjectVersionsPagesWithContext(ctx, listInput, func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
324+
pool := internal.NewWorkerPool(deletionWorkers)
325+
313326
for _, objectVersion := range page.Versions {
314-
objectKey := aws.StringValue(objectVersion.Key)
315-
objectVersionID := aws.StringValue(objectVersion.VersionId)
316-
err = deleteS3ObjectVersion(conn, bucketName, objectKey, objectVersionID, force)
317-
318-
if isS3Err(err, ErrCodeAccessDenied, "") && force {
319-
legalHoldRemoved, errLegal := removeS3ObjectVersionLegalHold(conn, bucketName, objectVersion)
320-
if errLegal != nil {
321-
err = fmt.Errorf("failed to remove legal hold: %s", errLegal)
322-
return false
327+
objectVersion := objectVersion
328+
329+
pool.AddTask(func() error {
330+
objectKey := aws.StringValue(objectVersion.Key)
331+
objectVersionID := aws.StringValue(objectVersion.VersionId)
332+
err := deleteS3ObjectVersion(conn, bucketName, objectKey, objectVersionID, force)
333+
334+
if isS3Err(err, ErrCodeAccessDenied, "") && force {
335+
legalHoldRemoved, errLegal := removeS3ObjectVersionLegalHold(conn, bucketName, objectVersion)
336+
if errLegal != nil {
337+
return fmt.Errorf("failed to remove legal hold: %s", errLegal)
338+
}
339+
340+
if legalHoldRemoved {
341+
err = deleteS3ObjectVersion(conn, bucketName, objectKey, objectVersionID, force)
342+
}
323343
}
324-
if legalHoldRemoved {
325-
err = deleteS3ObjectVersion(conn, bucketName, objectKey, objectVersionID, force)
344+
345+
if err != nil {
346+
return fmt.Errorf("failed to delete S3 object: %s", err)
326347
}
327-
}
328-
if err != nil {
329-
err = fmt.Errorf("failed to delete S3 object: %s", err)
330-
return false
331-
}
348+
349+
return nil
350+
})
332351
}
352+
353+
errors := pool.CloseAndWait()
354+
if len(errors) > 0 {
355+
globalErr = multierror.Append(nil, errors...)
356+
return false
357+
}
358+
333359
return true
334360
})
335361
if listErr != nil {
336-
return fmt.Errorf("error listing S3 objects: %s", err)
362+
return fmt.Errorf("error listing S3 objects: %s", globalErr)
337363
}
338-
if err != nil {
339-
return err
364+
if globalErr != nil {
365+
return globalErr
340366
}
367+
341368
listErr = conn.ListObjectVersionsPagesWithContext(ctx, listInput, func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
369+
pool := internal.NewWorkerPool(deletionWorkers)
370+
342371
for _, deleteMarkerEntry := range page.DeleteMarkers {
343-
deleteMarkerKey := aws.StringValue(deleteMarkerEntry.Key)
344-
deleteMarkerVersionsID := aws.StringValue(deleteMarkerEntry.VersionId)
345-
err = deleteS3ObjectVersion(conn, bucketName, deleteMarkerKey, deleteMarkerVersionsID, force)
372+
deleteMarkerEntry := deleteMarkerEntry
373+
374+
pool.AddTask(func() error {
375+
deleteMarkerKey := aws.StringValue(deleteMarkerEntry.Key)
376+
deleteMarkerVersionsID := aws.StringValue(deleteMarkerEntry.VersionId)
377+
err := deleteS3ObjectVersion(conn, bucketName, deleteMarkerKey, deleteMarkerVersionsID, force)
378+
if err != nil {
379+
return fmt.Errorf("failed to delete S3 object delete marker: %s", err)
380+
}
346381

347-
if err != nil {
348-
err = fmt.Errorf("failed to delete S3 object delete marker: %s", err)
349-
return false
350-
}
382+
return nil
383+
})
384+
}
385+
386+
errors := pool.CloseAndWait()
387+
if len(errors) > 0 {
388+
globalErr = multierror.Append(nil, errors...)
389+
return false
351390
}
391+
352392
return true
353393
})
354394
if listErr != nil {
355-
return fmt.Errorf("error listing S3 objects for delete markers: %s", err)
395+
return fmt.Errorf("error listing S3 objects for delete markers: %s", globalErr)
356396
}
357-
if err != nil {
358-
return err
397+
if globalErr != nil {
398+
return globalErr
359399
}
400+
360401
return nil
361402
}
362403

0 commit comments

Comments
 (0)