Skip to content

Commit 7e8768a

Browse files
Fix deadlock when erroring writes are slow (Azure#16937)
* Fix deadlock when erroring writes are slow * cancel copy on first error Co-authored-by: Joel Hendrix <[email protected]>
1 parent e248e40 commit 7e8768a

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

sdk/storage/azblob/blockblob/chunkwriting.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ import (
1313
"encoding/binary"
1414
"errors"
1515
"fmt"
16-
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
1716
"io"
1817
"sync"
1918
"sync/atomic"
2019

2120
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
21+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
2222
)
2323

2424
// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
@@ -181,8 +181,12 @@ func (c *copier) write(chunk copierChunk) {
181181
stageBlockOptions := c.o.getStageBlockOptions()
182182
_, err := c.to.StageBlock(c.ctx, chunk.id, shared.NopCloser(bytes.NewReader(chunk.buffer[:chunk.length])), stageBlockOptions)
183183
if err != nil {
184-
c.errCh <- fmt.Errorf("write error: %w", err)
185-
return
184+
select {
185+
case c.errCh <- err:
186+
// failed to stage block, cancel the copy
187+
default:
188+
// don't block the goroutine if there's a pending error
189+
}
186190
}
187191
}
188192

sdk/storage/azblob/blockblob/chunkwriting_test.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type fakeBlockWriter struct {
3030
path string
3131
block int32
3232
errOnBlock int32
33+
stageDelay time.Duration
3334
}
3435

3536
func newFakeBlockWriter() *fakeBlockWriter {
@@ -49,7 +50,12 @@ func newFakeBlockWriter() *fakeBlockWriter {
4950

5051
func (f *fakeBlockWriter) StageBlock(_ context.Context, blockID string, body io.ReadSeekCloser, _ *StageBlockOptions) (StageBlockResponse, error) {
5152
n := atomic.AddInt32(&f.block, 1)
52-
if n == f.errOnBlock {
53+
54+
if f.stageDelay > 0 {
55+
time.Sleep(f.stageDelay)
56+
}
57+
58+
if f.errOnBlock > -1 && n >= f.errOnBlock {
5359
return StageBlockResponse{}, io.ErrNoProgress
5460
}
5561

@@ -192,6 +198,45 @@ func TestGetErr(t *testing.T) {
192198
}
193199
}
194200

201+
func TestSlowDestCopyFrom(t *testing.T) {
202+
p, err := createSrcFile(_1MiB + 500*1024) //This should cause 2 reads
203+
if err != nil {
204+
panic(err)
205+
}
206+
defer func(name string) {
207+
_ = os.Remove(name)
208+
}(p)
209+
210+
from, err := os.Open(p)
211+
if err != nil {
212+
panic(err)
213+
}
214+
defer from.Close()
215+
216+
br := newFakeBlockWriter()
217+
defer br.cleanup()
218+
219+
br.stageDelay = 200 * time.Millisecond
220+
br.errOnBlock = 0
221+
222+
errs := make(chan error, 1)
223+
go func() {
224+
_, err := copyFromReader(context.Background(), from, br, UploadStreamOptions{})
225+
errs <- err
226+
}()
227+
228+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
229+
defer cancel()
230+
231+
select {
232+
case <-ctx.Done():
233+
failMsg := "TestSlowDestCopyFrom(slow writes shouldn't cause deadlock) failed: Context expired, copy deadlocked"
234+
t.Error(failMsg)
235+
case <-errs:
236+
return
237+
}
238+
}
239+
195240
func TestCopyFromReader(t *testing.T) {
196241
t.Parallel()
197242

0 commit comments

Comments
 (0)