Skip to content

Commit 25500a8

Browse files
authored
fix(storage): fix zero-byte flush (googleapis#12012)
Fix logic to ensure that a zero byte flush actually syncs data to GCS. Add integration test cases for this.
1 parent 98b8a82 commit 25500a8

File tree

3 files changed

+93
-18
lines changed

3 files changed

+93
-18
lines changed

storage/grpc_writer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,10 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs
316316
// We don't necessarily expect multiple responses for a single flush, but
317317
// this allows the server to send multiple responses if it wants to.
318318
flushOffset := s.flushOffset
319-
for flushOffset < offset+int64(len(buf)) {
319+
320+
// Await a response on the stream. Loop at least once or until the
321+
// persisted offset matches the flush offset.
322+
for {
320323
resp, ok := <-s.recvs
321324
if !ok {
322325
return nil, s.recvErr
@@ -332,6 +335,9 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs
332335
if resp.GetResource() != nil {
333336
obj = resp.GetResource()
334337
}
338+
if flushOffset <= offset+int64(len(buf)) {
339+
break
340+
}
335341
}
336342
if s.flushOffset < flushOffset {
337343
s.flushOffset = flushOffset

storage/integration_test.go

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3171,7 +3171,8 @@ func TestIntegration_WriterChunksize(t *testing.T) {
31713171
})
31723172
}
31733173

3174-
// Basic Writer test for appendable uploads with and without finalization.
3174+
// Writer test for appendable uploads with and without finalization,
3175+
// also validating Flush() at various offsets.
31753176
func TestIntegration_WriterAppend(t *testing.T) {
31763177
t.Skip("b/402283880")
31773178
ctx := skipAllButBidi(context.Background(), "ZB test")
@@ -3184,22 +3185,53 @@ func TestIntegration_WriterAppend(t *testing.T) {
31843185
defer h.mustDeleteBucket(bkt)
31853186

31863187
testCases := []struct {
3187-
name string
3188-
finalize bool
3189-
content []byte
3190-
chunkSize int
3188+
name string
3189+
finalize bool
3190+
content []byte
3191+
chunkSize int
3192+
flushOffset int64
31913193
}{
31923194
{
3193-
name: "finalized_object",
3194-
finalize: true,
3195-
content: randomBytes9MiB,
3196-
chunkSize: 4 * MiB,
3195+
name: "finalized_object",
3196+
finalize: true,
3197+
content: randomBytes9MiB,
3198+
chunkSize: 4 * MiB,
3199+
flushOffset: -1, // no flush
3200+
},
3201+
{
3202+
name: "unfinalized_object",
3203+
finalize: false,
3204+
content: randomBytes9MiB,
3205+
chunkSize: 4 * MiB,
3206+
flushOffset: -1,
3207+
},
3208+
{
3209+
name: "zero_byte_flush",
3210+
finalize: false,
3211+
content: randomBytes9MiB,
3212+
chunkSize: 4 * MiB,
3213+
flushOffset: 0,
3214+
},
3215+
{
3216+
name: "small_flush",
3217+
finalize: false,
3218+
content: randomBytes9MiB,
3219+
chunkSize: 4 * MiB,
3220+
flushOffset: 100,
3221+
},
3222+
{
3223+
name: "middle_chunk_flush",
3224+
finalize: false,
3225+
content: randomBytes9MiB,
3226+
chunkSize: 4 * MiB,
3227+
flushOffset: 5 * MiB,
31973228
},
31983229
{
3199-
name: "unfinalized_object",
3200-
finalize: false,
3201-
content: randomBytes9MiB,
3202-
chunkSize: 4 * MiB,
3230+
name: "last_byte_flush",
3231+
finalize: false,
3232+
content: randomBytes9MiB,
3233+
chunkSize: 4 * MiB,
3234+
flushOffset: 9 * MiB,
32033235
},
32043236
}
32053237
for _, tc := range testCases {
@@ -3211,8 +3243,43 @@ func TestIntegration_WriterAppend(t *testing.T) {
32113243
w.Append = true
32123244
w.FinalizeOnClose = tc.finalize
32133245
w.ChunkSize = tc.chunkSize
3246+
content := tc.content
32143247

3215-
h.mustWrite(w, tc.content)
3248+
// If flushOffset is 0, just do a flush and check the attributes.
3249+
if tc.flushOffset == 0 {
3250+
if _, err := w.Flush(); err != nil {
3251+
t.Fatalf("Writer.Flush: %v", err)
3252+
}
3253+
attrs, err := obj.Attrs(ctx)
3254+
if err != nil {
3255+
t.Fatalf("ObjectHandle.Attrs: %v", err)
3256+
}
3257+
if attrs.Size != 0 {
3258+
t.Errorf("attrs.Size: got %v, want 0", attrs.Size)
3259+
}
3260+
}
3261+
// If flushOffset > 0, write the first part of the data and then flush.
3262+
if tc.flushOffset > 0 {
3263+
if _, err := w.Write(content[:tc.flushOffset]); err != nil {
3264+
t.Fatalf("writing first part of data: %v", err)
3265+
}
3266+
content = content[tc.flushOffset:]
3267+
if _, err := w.Flush(); err != nil {
3268+
t.Fatalf("Writer.Flush: %v", err)
3269+
}
3270+
_, err := obj.Attrs(ctx)
3271+
if err != nil {
3272+
t.Fatalf("ObjectHandle.Attrs: %v", err)
3273+
}
3274+
// TODO: re-enable this check once Size is correctly populated
3275+
// server side for unfinalized objects.
3276+
// if attrs.Size != tc.flushOffset {
3277+
// t.Errorf("attrs.Size: got %v, want %v", attrs.Size, tc.flushOffset)
3278+
// }
3279+
}
3280+
3281+
// Write remaining data.
3282+
h.mustWrite(w, content)
32163283

32173284
// Download content again and validate.
32183285
// Disabled due to b/395944605; unskip after this is resolved.
@@ -3229,13 +3296,13 @@ func TestIntegration_WriterAppend(t *testing.T) {
32293296
if !tc.finalize && !attrs.Finalized.IsZero() {
32303297
t.Errorf("got object finalized at %v, want unfinalized", attrs.Finalized)
32313298
}
3232-
32333299
})
32343300
}
32353301
})
32363302
}
32373303

3238-
// Writer test for append takeover of unfinalized object.
3304+
// Writer test for append takeover of unfinalized object, including
3305+
// calls to Flush() on takeover.
32393306
func TestIntegration_WriterAppendTakeover(t *testing.T) {
32403307
t.Skip("b/402283880")
32413308
ctx := skipAllButBidi(context.Background(), "ZB test")

storage/writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,10 @@ func (w *Writer) Flush() (int64, error) {
224224
// at zero bytes. This will make the object visible with zero length data.
225225
if !w.opened {
226226
err := w.openWriter()
227+
if err != nil {
228+
return 0, err
229+
}
227230
w.progress(0)
228-
return 0, err
229231
}
230232

231233
return w.flush()

0 commit comments

Comments
 (0)