Skip to content

Commit 2b617db

Browse files
authored
service/s3/s3manager: Port Improvements from V1 SDK (#499)
* Uploader should initialize pool size after determining optimized size. (#3030) * Fix resource leak on failed CreateMultipartUpload calls (#3069) * Fix resource leak on UploadPart failures (#3144) * Improve memory allocations by replacing sync.Pool (#3183)
1 parent dc3c876 commit 2b617db

File tree

9 files changed

+897
-55
lines changed

9 files changed

+897
-55
lines changed

CHANGELOG_PENDING.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ SDK Features
3131

3232
SDK Enhancements
3333
---
34+
* `service/s3/s3manager`: Improve memory allocation behavior by replacing sync.Pool with custom pool implementation
35+
* Improves memory allocations that occur when the provided `io.Reader` to upload does not satisfy both the `io.ReaderAt` and `io.ReadSeeker` interfaces.
3436

3537
SDK Bugs
3638
---
39+
* `service/s3/s3manager`: Fix resource leaks when the following occurred:
40+
* Failed CreateMultipartUpload call
41+
* Failed UploadPart call
42+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package s3testing
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
7+
"github.com/aws/aws-sdk-go-v2/internal/sdkio"
8+
)
9+
10+
var randBytes = func() []byte {
11+
b := make([]byte, 10*sdkio.MebiByte)
12+
13+
if _, err := rand.Read(b); err != nil {
14+
panic(fmt.Sprintf("failed to read random bytes, %v", err))
15+
}
16+
return b
17+
}()
18+
19+
// GetTestBytes returns a pseudo-random []byte of length size
20+
func GetTestBytes(size int) []byte {
21+
if len(randBytes) >= size {
22+
return randBytes[:size]
23+
}
24+
25+
b := append(randBytes, GetTestBytes(size-len(randBytes))...)
26+
return b
27+
}

service/s3/s3manager/download_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/aws/aws-sdk-go-v2/internal/awstesting/unit"
2424
"github.com/aws/aws-sdk-go-v2/internal/sdkio"
2525
"github.com/aws/aws-sdk-go-v2/service/s3"
26+
"github.com/aws/aws-sdk-go-v2/service/s3/internal/s3testing"
2627
"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
2728
)
2829

@@ -677,7 +678,7 @@ func TestDownloadBufferStrategy(t *testing.T) {
677678
for name, tCase := range cases {
678679
t.Logf("starting case: %v", name)
679680

680-
expected := getTestBytes(int(tCase.expectedSize))
681+
expected := s3testing.GetTestBytes(int(tCase.expectedSize))
681682

682683
svc, _, _ := dlLoggingSvc(expected)
683684

@@ -737,7 +738,7 @@ func (r *testErrReader) Read(p []byte) (int, error) {
737738
}
738739

739740
func TestDownloadBufferStrategy_Errors(t *testing.T) {
740-
expected := getTestBytes(int(10 * sdkio.MebiByte))
741+
expected := s3testing.GetTestBytes(int(10 * sdkio.MebiByte))
741742

742743
svc, _, _ := dlLoggingSvc(expected)
743744
strat := &recordedWriterReadFromProvider{
@@ -826,7 +827,7 @@ type badReader struct {
826827
}
827828

828829
func (b *badReader) Read(p []byte) (int, error) {
829-
tb := getTestBytes(len(p))
830+
tb := s3testing.GetTestBytes(len(p))
830831
copy(p, tb)
831832
return len(p), b.err
832833
}

service/s3/s3manager/pool.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package s3manager
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
)
8+
9+
type byteSlicePool interface {
10+
Get(context.Context) (*[]byte, error)
11+
Put(*[]byte)
12+
ModifyCapacity(int)
13+
SliceSize() int64
14+
Close()
15+
}
16+
17+
type maxSlicePool struct {
18+
// allocator is defined as a function pointer to allow
19+
// for test cases to instrument custom tracers when allocations
20+
// occur.
21+
allocator sliceAllocator
22+
23+
slices chan *[]byte
24+
allocations chan struct{}
25+
capacityChange chan struct{}
26+
27+
max int
28+
sliceSize int64
29+
30+
mtx sync.RWMutex
31+
}
32+
33+
func newMaxSlicePool(sliceSize int64) *maxSlicePool {
34+
p := &maxSlicePool{sliceSize: sliceSize}
35+
p.allocator = p.newSlice
36+
37+
return p
38+
}
39+
40+
var errZeroCapacity = fmt.Errorf("get called on zero capacity pool")
41+
42+
func (p *maxSlicePool) Get(ctx context.Context) (*[]byte, error) {
43+
// check if context is canceled before attempting to get a slice
44+
// this ensures priority is given to the cancel case first
45+
select {
46+
case <-ctx.Done():
47+
return nil, ctx.Err()
48+
default:
49+
}
50+
51+
p.mtx.RLock()
52+
53+
for {
54+
select {
55+
case bs, ok := <-p.slices:
56+
p.mtx.RUnlock()
57+
if !ok {
58+
// attempt to get on a zero capacity pool
59+
return nil, errZeroCapacity
60+
}
61+
return bs, nil
62+
case _, ok := <-p.allocations:
63+
p.mtx.RUnlock()
64+
if !ok {
65+
// attempt to get on a zero capacity pool
66+
return nil, errZeroCapacity
67+
}
68+
return p.allocator(), nil
69+
case <-ctx.Done():
70+
p.mtx.RUnlock()
71+
return nil, ctx.Err()
72+
default:
73+
// In the event that there are no slices or allocations available
74+
// This prevents some deadlock situations that can occur around sync.RWMutex
75+
// When a lock request occurs on ModifyCapacity, no new readers are allowed to acquire a read lock.
76+
// By releasing the read lock here and waiting for a notification, we prevent a deadlock situation where
77+
// Get could hold the read lock indefinitely waiting for capacity, ModifyCapacity is waiting for a write lock,
78+
// and a Put is blocked trying to get a read-lock which is blocked by ModifyCapacity.
79+
80+
// Short-circuit if the pool capacity is zero.
81+
if p.max == 0 {
82+
p.mtx.RUnlock()
83+
return nil, errZeroCapacity
84+
}
85+
86+
// Since we will be releasing the read-lock we need to take the reference to the channel.
87+
// Since channels are references we will still get notified if slices are added, or if
88+
// the channel is closed due to a capacity modification. This specifically avoids a data race condition
89+
// where ModifyCapacity both closes a channel and initializes a new one while we don't have a read-lock.
90+
c := p.capacityChange
91+
92+
p.mtx.RUnlock()
93+
94+
select {
95+
case _ = <-c:
96+
p.mtx.RLock()
97+
case <-ctx.Done():
98+
return nil, ctx.Err()
99+
}
100+
}
101+
}
102+
}
103+
104+
func (p *maxSlicePool) Put(bs *[]byte) {
105+
p.mtx.RLock()
106+
defer p.mtx.RUnlock()
107+
108+
if p.max == 0 {
109+
return
110+
}
111+
112+
select {
113+
case p.slices <- bs:
114+
p.notifyCapacity()
115+
default:
116+
// If the new channel when attempting to add the slice then we drop the slice.
117+
// The logic here is to prevent a deadlock situation if channel is already at max capacity.
118+
// Allows us to reap allocations that are returned and are no longer needed.
119+
}
120+
}
121+
122+
func (p *maxSlicePool) ModifyCapacity(delta int) {
123+
if delta == 0 {
124+
return
125+
}
126+
127+
p.mtx.Lock()
128+
defer p.mtx.Unlock()
129+
130+
p.max += delta
131+
132+
if p.max == 0 {
133+
p.empty()
134+
return
135+
}
136+
137+
if p.capacityChange != nil {
138+
close(p.capacityChange)
139+
}
140+
p.capacityChange = make(chan struct{}, p.max)
141+
142+
origAllocations := p.allocations
143+
p.allocations = make(chan struct{}, p.max)
144+
145+
newAllocs := len(origAllocations) + delta
146+
for i := 0; i < newAllocs; i++ {
147+
p.allocations <- struct{}{}
148+
}
149+
150+
if origAllocations != nil {
151+
close(origAllocations)
152+
}
153+
154+
origSlices := p.slices
155+
p.slices = make(chan *[]byte, p.max)
156+
if origSlices == nil {
157+
return
158+
}
159+
160+
close(origSlices)
161+
for bs := range origSlices {
162+
select {
163+
case p.slices <- bs:
164+
default:
165+
// If the new channel blocks while adding slices from the old channel
166+
// then we drop the slice. The logic here is to prevent a deadlock situation
167+
// if the new channel has a smaller capacity then the old.
168+
}
169+
}
170+
}
171+
172+
func (p *maxSlicePool) notifyCapacity() {
173+
select {
174+
case p.capacityChange <- struct{}{}:
175+
default:
176+
// This *shouldn't* happen as the channel is both buffered to the max pool capacity size and is resized
177+
// on capacity modifications. This is just a safety to ensure that a blocking situation can't occur.
178+
}
179+
}
180+
181+
func (p *maxSlicePool) SliceSize() int64 {
182+
return p.sliceSize
183+
}
184+
185+
func (p *maxSlicePool) Close() {
186+
p.mtx.Lock()
187+
defer p.mtx.Unlock()
188+
p.empty()
189+
}
190+
191+
func (p *maxSlicePool) empty() {
192+
p.max = 0
193+
194+
if p.capacityChange != nil {
195+
close(p.capacityChange)
196+
p.capacityChange = nil
197+
}
198+
199+
if p.allocations != nil {
200+
close(p.allocations)
201+
for range p.allocations {
202+
// drain channel
203+
}
204+
p.allocations = nil
205+
}
206+
207+
if p.slices != nil {
208+
close(p.slices)
209+
for range p.slices {
210+
// drain channel
211+
}
212+
p.slices = nil
213+
}
214+
}
215+
216+
func (p *maxSlicePool) newSlice() *[]byte {
217+
bs := make([]byte, p.sliceSize)
218+
return &bs
219+
}
220+
221+
type returnCapacityPoolCloser struct {
222+
byteSlicePool
223+
returnCapacity int
224+
}
225+
226+
func (n *returnCapacityPoolCloser) ModifyCapacity(delta int) {
227+
if delta > 0 {
228+
n.returnCapacity = -1 * delta
229+
}
230+
n.byteSlicePool.ModifyCapacity(delta)
231+
}
232+
233+
func (n *returnCapacityPoolCloser) Close() {
234+
if n.returnCapacity < 0 {
235+
n.byteSlicePool.ModifyCapacity(n.returnCapacity)
236+
}
237+
}
238+
239+
type sliceAllocator func() *[]byte
240+
241+
var newByteSlicePool = func(sliceSize int64) byteSlicePool {
242+
return newMaxSlicePool(sliceSize)
243+
}

0 commit comments

Comments
 (0)