Skip to content

Commit f31ef7a

Browse files
authored
Remove double buffering in S3 reader (#17)
1 parent ac1ef23 commit f31ef7a

File tree

2 files changed

+81
-89
lines changed

2 files changed

+81
-89
lines changed

pkg/stream/s3/s3_reader.go

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ func newS3ReaderConfig() s3ReaderConfig {
3535
//Implements io.ReadSeeker
3636
type s3Reader struct {
3737
config s3ReaderConfig
38-
buffer []byte
39-
bufferStart int64
40-
bufferEnd int64
38+
resp *s3.GetObjectOutput
4139
bucket string
4240
key string
4341
s3 s3iface.S3API
@@ -48,9 +46,6 @@ type s3Reader struct {
4846

4947
func newS3Reader(s3Api s3iface.S3API, bucket string, key string, contentLength int64, etag string, config s3ReaderConfig) *s3Reader {
5048
return &s3Reader{
51-
buffer: make([]byte, config.BufferSize),
52-
bufferStart: 0,
53-
bufferEnd: 0,
5449
bucket: bucket,
5550
key: key,
5651
s3: s3Api,
@@ -115,62 +110,59 @@ func (r *s3Reader) Read(p []byte) (n int, err error) {
115110
}
116111

117112
func (r *s3Reader) read(p []byte) (n int, err error) {
118-
if r.bufferEnd != 0 && r.bufferStart != r.bufferEnd {
119-
return r.copyInto(p)
113+
if r.resp == nil {
114+
getObjectErr := r.makeNewS3Request()
115+
if getObjectErr != nil {
116+
return 0, getObjectErr
117+
}
120118
}
121119

122-
r.resetBuffer()
120+
bytesRead, readErr := r.resp.Body.Read(p)
123121

124-
bufferBytes := int64(len(r.buffer))
125-
bytesLeft := r.ContentLength - r.offset
126-
bytesToRead := min(bufferBytes, bytesLeft)
122+
if readErr != nil {
123+
// Error throwing from S3, close the current s3 socket
124+
defer r.closeS3Socket()
125+
if readErr == io.EOF {
126+
r.offset += int64(bytesRead)
127+
return bytesRead, io.EOF
128+
} else {
129+
return bytesRead, &ReadError{readErr}
130+
}
131+
} else {
132+
r.offset += int64(bytesRead)
133+
return bytesRead, nil
127134

135+
}
136+
}
137+
138+
func (r *s3Reader) makeNewS3Request() (err error) {
128139
resp, getObjectErr := r.s3.GetObjectWithContext(
129140
aws.BackgroundContext(),
130141
&s3.GetObjectInput{
131142
Bucket: aws.String(r.bucket),
132143
Key: aws.String(r.key),
133144
IfMatch: aws.String(r.Etag),
134-
Range: aws.String(fmt.Sprintf("bytes=%v-%v", r.offset, r.offset+bytesToRead-1)),
145+
// Always open a connection read from current position to end of file
146+
Range: aws.String(fmt.Sprintf("bytes=%v-%v", r.offset, r.ContentLength-1)),
135147
},
136-
request.WithResponseReadTimeout(5*time.Second)) //Sets a timeout on the underlying Body.Read() calls. By default, there is no timeout on this read
148+
// Sets a timeout on the underlying Body.Read() calls to avoid a S3 connection leaking
149+
// since this S3Reader keep connection open until read end of file. This is a client config,
150+
// and S3 Service will also terminate the idle connections, which we will rely on the retry
151+
// if two reads duration is too long and s3 service terminate the socket.
152+
request.WithResponseReadTimeout(10*time.Second))
137153

138154
if getObjectErr != nil {
139-
return 0, getObjectErr
140-
}
141-
142-
defer resp.Body.Close()
143-
for {
144-
bytesRead, readErr := resp.Body.Read(r.buffer[r.bufferEnd:])
145-
r.bufferEnd += int64(bytesRead)
146-
147-
if readErr == io.EOF {
148-
break
149-
}
150-
151-
if readErr != nil {
152-
return n, &ReadError{readErr}
153-
}
155+
return getObjectErr
154156
}
155-
156-
return r.copyInto(p)
157+
r.resp = resp
158+
return nil
157159
}
158160

159-
func (r *s3Reader) copyInto(p []byte) (n int, err error) {
160-
n = copy(p, r.buffer[r.bufferStart:r.bufferEnd])
161-
r.bufferStart += int64(n)
162-
163-
r.offset += int64(n)
164-
if r.offset == r.ContentLength {
165-
return n, io.EOF
161+
func (r *s3Reader) closeS3Socket() {
162+
if r.resp != nil {
163+
r.resp.Body.Close()
164+
r.resp = nil
166165
}
167-
168-
return n, nil
169-
}
170-
171-
func (r *s3Reader) resetBuffer() {
172-
r.bufferStart = 0
173-
r.bufferEnd = 0
174166
}
175167

176168
func (r *s3Reader) Seek(offset int64, whence int) (newOffset int64, err error) {
@@ -191,10 +183,10 @@ func (r *s3Reader) Seek(offset int64, whence int) (newOffset int64, err error) {
191183
r.offset = r.ContentLength
192184
}
193185

194-
//Reset buffer when seeking
195-
//Special case: Dont reset if the position hasn't changed
186+
//Close the socket when seeking
187+
//Special case: Dont close if the position hasn't changed
196188
if oldPos != r.offset {
197-
r.resetBuffer()
189+
r.closeS3Socket()
198190
}
199191

200192
return r.offset, nil

pkg/stream/s3/s3_reader_test.go

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -115,44 +115,53 @@ func TestS3Reader_Read_CallFails_ReturnsError(t *testing.T) {
115115
assert.True(t, err != nil)
116116
}
117117

118-
func TestS3Reader_Read_ShouldBuffer(t *testing.T) {
118+
func TestS3Reader_Seek_SeeksToCorrectPosition(t *testing.T) {
119119
ctrl := gomock.NewController(t)
120120
defer ctrl.Finish()
121121
mockS3Client := NewMockS3API(ctrl)
122-
setupS3MockExpects(mockS3Client)
123122

124-
mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.GetObjectOutput{
125-
Body: ioutil.NopCloser(strings.NewReader(testBodyContent[2:4])),
123+
mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{
124+
ContentLength: aws.Int64(int64(len(testBodyContent))),
125+
ETag: aws.String(testEtag),
126126
}, nil).Times(1)
127127

128-
config := newS3ReaderConfig()
129-
config.BufferSize = 2
130-
s3Reader, _ := newS3ReaderWithConfig(mockS3Client, testBucket, testKey, config)
128+
s3Reader, _ := newS3ReaderBucketAndKey(mockS3Client, testBucket, testKey)
129+
s3Reader.Seek(5, io.SeekCurrent)
130+
assert.True(t, s3Reader.offset == 5)
131131

132-
//First call will read from s3 since the buffer is empty.
133-
//Second call should read from internal buffer
134-
//Third call should read from s3 again
135-
s3Reader.Read(make([]byte, 1))
136-
s3Reader.Read(make([]byte, 1))
137-
s3Reader.Read(make([]byte, 1))
132+
s3Reader.Seek(1, io.SeekCurrent)
133+
assert.True(t, s3Reader.offset == 6)
134+
135+
s3Reader.Seek(0, io.SeekStart)
136+
assert.True(t, s3Reader.offset == 0)
137+
138+
s3Reader.Seek(0, io.SeekEnd)
139+
assert.True(t, s3Reader.offset == s3Reader.ContentLength)
140+
141+
s3Reader.Seek(100000, io.SeekCurrent)
142+
assert.True(t, s3Reader.offset == s3Reader.ContentLength)
138143
}
139144

140-
func TestS3Reader_Seek_ResetsBuffer(t *testing.T) {
145+
func TestS3Reader_Read_ShouldKeepS3SocketOpen(t *testing.T) {
141146
ctrl := gomock.NewController(t)
142147
defer ctrl.Finish()
143148
mockS3Client := NewMockS3API(ctrl)
144149
setupS3MockExpects(mockS3Client)
145-
s3Reader, _ := newS3ReaderBucketAndKey(mockS3Client, testBucket, testKey)
150+
config := newS3ReaderConfig()
151+
s3Reader, _ := newS3ReaderWithConfig(mockS3Client, testBucket, testKey, config)
146152

147-
//Read to fill the buffer
153+
//First call will read from s3 since the buffer is empty.
154+
//Second call should reuse the existing s3 socket
155+
//Third call should reuse the existing s3 socket
156+
s3Reader.Read(make([]byte, 1))
157+
s3Reader.Read(make([]byte, 1))
148158
s3Reader.Read(make([]byte, 1))
149-
assert.True(t, s3Reader.bufferEnd != 0)
150159

151-
s3Reader.Seek(1, io.SeekCurrent)
152-
assert.True(t, s3Reader.bufferEnd == 0)
160+
//After finish reading, socket closed
161+
assert.Nil(t, s3Reader.resp)
153162
}
154163

155-
func TestS3Reader_SeekNoop_DoesNotResetBuffer(t *testing.T) {
164+
func TestS3Reader_Seek_S3SocketClose(t *testing.T) {
156165
ctrl := gomock.NewController(t)
157166
defer ctrl.Finish()
158167
mockS3Client := NewMockS3API(ctrl)
@@ -161,34 +170,25 @@ func TestS3Reader_SeekNoop_DoesNotResetBuffer(t *testing.T) {
161170

162171
//Read to fill the buffer
163172
s3Reader.Read(make([]byte, 1))
164-
assert.True(t, s3Reader.bufferEnd != 0)
173+
assert.NotNil(t, s3Reader.resp)
165174

166-
s3Reader.Seek(0, io.SeekCurrent)
167-
assert.True(t, s3Reader.bufferEnd != 0)
175+
//seek and s3 socket close
176+
s3Reader.Seek(0, io.SeekStart)
177+
assert.Nil(t, s3Reader.resp)
168178
}
169179

170-
func TestS3Reader_Seek_SeeksToCorrectPosition(t *testing.T) {
180+
func TestS3Reader_SeekNoop_DoesNotCloseSocket(t *testing.T) {
171181
ctrl := gomock.NewController(t)
172182
defer ctrl.Finish()
173183
mockS3Client := NewMockS3API(ctrl)
174-
mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{
175-
ContentLength: aws.Int64(int64(len(testBodyContent))),
176-
ETag: aws.String(testEtag),
177-
}, nil).Times(1)
184+
setupS3MockExpects(mockS3Client)
178185
s3Reader, _ := newS3ReaderBucketAndKey(mockS3Client, testBucket, testKey)
179186

180-
s3Reader.Seek(5, io.SeekCurrent)
181-
assert.True(t, s3Reader.offset == 5)
182-
183-
s3Reader.Seek(1, io.SeekCurrent)
184-
assert.True(t, s3Reader.offset == 6)
185-
186-
s3Reader.Seek(0, io.SeekStart)
187-
assert.True(t, s3Reader.offset == 0)
188-
189-
s3Reader.Seek(0, io.SeekEnd)
190-
assert.True(t, s3Reader.offset == s3Reader.ContentLength)
187+
//Read to fill the buffer
188+
s3Reader.Read(make([]byte, 1))
189+
assert.NotNil(t, s3Reader.resp)
191190

192-
s3Reader.Seek(100000, io.SeekCurrent)
193-
assert.True(t, s3Reader.offset == s3Reader.ContentLength)
191+
//seek and s3 socket still open
192+
s3Reader.Seek(1, io.SeekStart)
193+
assert.NotNil(t, s3Reader.resp)
194194
}

0 commit comments

Comments
 (0)