Skip to content

Commit 3aa8436

Browse files
craig[bot]jeffswenson
andcommitted
Merge #153056
153056: cloud: create and use an objstorage.Writable writer wrapper r=jeffswenson a=jeffswenson Previously, CRDB code interacting with the pebble SST writer used a wrapper that no-op'd Finish/Abort calls on the cloud object. This caused us to write invalid SSTs to cloud storage when we would have otherwise aborted the write. Implementing Finish/Abort also lets us clean up some of the lifetime handling. Creating an SST writer normally takes ownership of the object. But since we no-op'd the cleanup methods, we had to hold onto the object and manually Close it to trigger the flush. Issue: #153055 Epic: CRDB-53946 Release note: none Co-authored-by: Jeff Swenson <[email protected]>
2 parents 0163245 + d52eb81 commit 3aa8436

31 files changed

+227
-181
lines changed

pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ go_test(
343343
"@com_github_cockroachdb_datadriven//:datadriven",
344344
"@com_github_cockroachdb_errors//:errors",
345345
"@com_github_cockroachdb_errors//oserror",
346+
"@com_github_cockroachdb_pebble//objstorage",
346347
"@com_github_cockroachdb_pebble//sstable",
347348
"@com_github_cockroachdb_pebble//vfs",
348349
"@com_github_cockroachdb_redact//:redact",

pkg/backup/backupinfo/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ go_library(
5151
"//pkg/util/timeutil",
5252
"//pkg/util/tracing",
5353
"@com_github_cockroachdb_errors//:errors",
54+
"@com_github_cockroachdb_pebble//objstorage",
5455
"@com_github_klauspost_compress//gzip",
5556
],
5657
)

pkg/backup/backupinfo/desc_sst.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,15 @@ func WriteDescsSST(
4949
if err != nil {
5050
return err
5151
}
52-
defer w.Close()
5352
descSST := storage.MakeTransportSSTWriter(ctx, dest.Settings(), w)
5453
defer descSST.Close()
5554

56-
if err := writeDescsToMetadata(ctx, descSST, m); err != nil {
57-
return err
58-
}
59-
60-
if err := descSST.Finish(); err != nil {
55+
err = writeDescsToMetadata(ctx, descSST, m)
56+
if err != nil {
6157
return err
6258
}
6359

64-
return w.Close()
60+
return descSST.Finish()
6561
}
6662

6763
func DescChangesLess(

pkg/backup/backupinfo/external_sst_util.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backupinfo
88
import (
99
"bytes"
1010
"context"
11-
"io"
1211

1312
"github.com/cockroachdb/cockroach/pkg/backup/backupencryption"
1413
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
@@ -17,6 +16,7 @@ import (
1716
"github.com/cockroachdb/cockroach/pkg/keys"
1817
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1918
"github.com/cockroachdb/cockroach/pkg/storage"
19+
"github.com/cockroachdb/pebble/objstorage"
2020
)
2121

2222
func makeWriter(
@@ -25,8 +25,9 @@ func makeWriter(
2525
filename string,
2626
enc *jobspb.BackupEncryptionOptions,
2727
kmsEnv cloud.KMSEnv,
28-
) (io.WriteCloser, error) {
29-
w, err := dest.Writer(ctx, filename)
28+
) (objstorage.Writable, error) {
29+
var w objstorage.Writable
30+
w, err := cloud.OpenAbortableWriter(ctx, dest, filename)
3031
if err != nil {
3132
return nil, err
3233
}

pkg/backup/backupinfo/file_sst.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ func writeFilesSST(
5555
if err != nil {
5656
return err
5757
}
58-
defer w.Close()
5958
fileSST := storage.MakeTransportSSTWriter(ctx, dest.Settings(), w)
6059
defer fileSST.Close()
6160

@@ -75,11 +74,7 @@ func writeFilesSST(
7574
}
7675
}
7776

78-
err = fileSST.Finish()
79-
if err != nil {
80-
return err
81-
}
82-
return w.Close()
77+
return fileSST.Finish()
8378
}
8479

8580
func encodeFileSSTKey(spanStart roachpb.Key, filename string) roachpb.Key {

pkg/backup/backupsink/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ go_library(
2525
"//pkg/util/log",
2626
"//pkg/util/unique",
2727
"@com_github_cockroachdb_errors//:errors",
28+
"@com_github_cockroachdb_pebble//objstorage",
2829
"@com_github_gogo_protobuf//types",
29-
"@com_github_kr_pretty//:pretty",
3030
],
3131
)
3232

@@ -56,6 +56,7 @@ go_test(
5656
"//pkg/util/leaktest",
5757
"//pkg/util/log",
5858
"@com_github_cockroachdb_errors//:errors",
59+
"@com_github_cockroachdb_pebble//objstorage",
5960
"@com_github_gogo_protobuf//types",
6061
"@com_github_stretchr_testify//require",
6162
],

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backupsink
88
import (
99
"bytes"
1010
"context"
11-
io "io"
1211

1312
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1413
"github.com/cockroachdb/cockroach/pkg/base"
@@ -24,8 +23,8 @@ import (
2423
hlc "github.com/cockroachdb/cockroach/pkg/util/hlc"
2524
"github.com/cockroachdb/cockroach/pkg/util/log"
2625
"github.com/cockroachdb/errors"
26+
"github.com/cockroachdb/pebble/objstorage"
2727
gogotypes "github.com/gogo/protobuf/types"
28-
"github.com/kr/pretty"
2928
)
3029

3130
var (
@@ -58,10 +57,11 @@ type FileSSTSink struct {
5857
conf SSTSinkConf
5958
pacer *admission.Pacer
6059

61-
sst storage.SSTWriter
60+
isOpen bool
61+
sst storage.SSTWriter
62+
6263
ctx context.Context
6364
cancel func()
64-
out io.WriteCloser
6565
outName string
6666

6767
flushedFiles []backuppb.BackupManifest_File
@@ -133,7 +133,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key
133133
}
134134

135135
// Initialize the writer if needed.
136-
if s.out == nil {
136+
if !s.isOpen {
137137
if err := s.open(ctx); err != nil {
138138
return nil, err
139139
}
@@ -232,16 +232,14 @@ func (s *FileSSTSink) Close() error {
232232
s.cancel()
233233
}
234234

235-
var err error
236-
if s.out != nil {
237-
err = s.out.Close()
238-
}
239235
s.sst.Close()
240-
return err
236+
s.isOpen = false
237+
238+
return nil
241239
}
242240

243241
func (s *FileSSTSink) Flush(ctx context.Context) error {
244-
if s.out == nil {
242+
if !s.isOpen {
245243
// If the writer was not initialized but the sink has reported completed
246244
// spans then there were empty ExportRequests that were processed by the
247245
// owner of this sink. These still need to reported to the coordinator as
@@ -279,13 +277,10 @@ func (s *FileSSTSink) Flush(ctx context.Context) error {
279277
if err := s.sst.Finish(); err != nil {
280278
return err
281279
}
282-
if err := s.out.Close(); err != nil {
283-
log.Dev.Warningf(ctx, "failed to close write in FileSSTSink: % #v", pretty.Formatter(err))
284-
return errors.Wrap(err, "writing SST")
285-
}
280+
286281
wroteSize := s.sst.Meta.Size
287282
s.outName = ""
288-
s.out = nil
283+
s.isOpen = false
289284

290285
for i := range s.flushedFiles {
291286
s.flushedFiles[i].BackingFileSize = wroteSize
@@ -322,30 +317,30 @@ func (s *FileSSTSink) open(ctx context.Context) error {
322317
if s.ctx == nil {
323318
s.ctx, s.cancel = context.WithCancel(ctx)
324319
}
325-
w, err := s.dest.Writer(s.ctx, s.outName)
320+
var w objstorage.Writable
321+
w, err := cloud.OpenAbortableWriter(s.ctx, s.dest, s.outName)
326322
if err != nil {
327323
return err
328324
}
329-
s.out = w
330325
if s.conf.Enc != nil {
331-
e, err := storageccl.EncryptingWriter(w, s.conf.Enc.Key)
326+
w, err = storageccl.EncryptingWriter(w, s.conf.Enc.Key)
332327
if err != nil {
333328
return err
334329
}
335-
s.out = e
336330
}
337-
// TODO(dt): make ExternalStorage.Writer return objstorage.Writable.
338-
//
331+
// NOTE: the sst writer takes ownership of the object writer. So we don't
332+
// hold on to it to close it.
339333
// Value blocks are disabled since such SSTs can be huge (e.g. 750MB in the
340334
// mixed_version_backup.go roachtest), which can cause OOMs due to value
341335
// block buffering.
342336
s.sst = storage.MakeIngestionSSTWriterWithOverrides(
343-
ctx, s.dest.Settings(), storage.NoopFinishAbortWritable(s.out),
337+
ctx, s.dest.Settings(), w,
344338
storage.WithValueBlocksDisabled,
345339
storage.WithCompressionFromClusterSetting(
346340
ctx, s.dest.Settings(), storage.CompressionAlgorithmBackupStorage,
347341
),
348342
)
343+
s.isOpen = true
349344

350345
return nil
351346
}

pkg/backup/backupsink/file_sst_sink_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package backupsink
77

88
import (
9-
"bytes"
109
"context"
1110
"fmt"
1211
"reflect"
@@ -29,6 +28,7 @@ import (
2928
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3029
"github.com/cockroachdb/cockroach/pkg/util/log"
3130
"github.com/cockroachdb/errors"
31+
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/gogo/protobuf/types"
3333
"github.com/stretchr/testify/require"
3434
)
@@ -43,13 +43,13 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) {
4343
ctx := context.Background()
4444

4545
getKeys := func(prefix string, n int) []byte {
46-
var b bytes.Buffer
46+
var b objstorage.MemObj
4747
sst := storage.MakeTransportSSTWriter(ctx, cluster.MakeTestingClusterSettings(), &b)
4848
for i := 0; i < n; i++ {
4949
require.NoError(t, sst.PutUnversioned([]byte(fmt.Sprintf("%s%08d", prefix, i)), nil))
5050
}
5151
sst.Close()
52-
return b.Bytes()
52+
return b.Data()
5353
}
5454

5555
exportResponse1 := ExportedSpan{
@@ -661,8 +661,8 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) {
661661
},
662662
} {
663663
t.Run(tt.name, func(t *testing.T) {
664-
buf := &bytes.Buffer{}
665-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
664+
buf := objstorage.MemObj{}
665+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
666666
sink := FileSSTSink{sst: sst}
667667
compareSST := true
668668

@@ -713,7 +713,7 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) {
713713
UpperBound: keys.MaxKey,
714714
}
715715

716-
iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts)
716+
iter, err := storage.NewMemSSTIterator(buf.Data(), false, iterOpts)
717717
if err != nil {
718718
t.Fatal(err)
719719
}
@@ -840,8 +840,8 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) {
840840
},
841841
} {
842842
t.Run(tt.name, func(t *testing.T) {
843-
buf := &bytes.Buffer{}
844-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
843+
buf := objstorage.MemObj{}
844+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
845845
sink := FileSSTSink{sst: sst}
846846
compareSST := true
847847

@@ -887,7 +887,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) {
887887
UpperBound: keys.MaxKey,
888888
}
889889

890-
iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts)
890+
iter, err := storage.NewMemSSTIterator(buf.Data(), false, iterOpts)
891891
if err != nil {
892892
t.Fatal(err)
893893
}
@@ -1029,8 +1029,8 @@ func (b *exportedSpanBuilder) build() ExportedSpan {
10291029
func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb.Key) ExportedSpan {
10301030
ctx := context.Background()
10311031
settings := cluster.MakeTestingClusterSettings()
1032-
buf := &bytes.Buffer{}
1033-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
1032+
buf := objstorage.MemObj{}
1033+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
10341034
for _, d := range b.keyValues {
10351035
v := roachpb.Value{}
10361036
v.SetBytes(d.value)
@@ -1058,7 +1058,7 @@ func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb
10581058

10591059
sst.Close()
10601060

1061-
b.es.DataSST = buf.Bytes()
1061+
b.es.DataSST = buf.Data()
10621062

10631063
return *b.es
10641064
}

pkg/backup/backupsink/sst_sink_key_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (s *SSTSinkKeyWriter) Reset(ctx context.Context, newSpan roachpb.Span) erro
135135
return err
136136
}
137137
}
138-
if s.out == nil {
138+
if !s.isOpen {
139139
if err := s.open(ctx); err != nil {
140140
return err
141141
}

pkg/backup/bench_test.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backup
88
import (
99
"context"
1010
"fmt"
11-
"io"
1211
"math/rand"
1312
"testing"
1413

@@ -29,6 +28,7 @@ import (
2928
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
3029
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3130
"github.com/cockroachdb/cockroach/pkg/util/log"
31+
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/stretchr/testify/require"
3333
)
3434

@@ -59,24 +59,22 @@ func BenchmarkIteratorMemory(b *testing.B) {
5959
makeWriter := func(
6060
store cloud.ExternalStorage,
6161
filename string,
62-
enc *jobspb.BackupEncryptionOptions) (io.WriteCloser, error) {
63-
w, err := store.Writer(ctx, filename)
62+
enc *jobspb.BackupEncryptionOptions) (objstorage.Writable, error) {
63+
w, err := cloud.OpenAbortableWriter(ctx, store, filename)
6464
if err != nil {
6565
return nil, err
6666
}
6767

68-
if enc != nil {
69-
key, err := backupencryption.GetEncryptionKey(ctx, enc, nil)
70-
if err != nil {
71-
return nil, err
72-
}
73-
encW, err := storageccl.EncryptingWriter(w, key)
74-
if err != nil {
75-
return nil, err
76-
}
77-
w = encW
68+
if enc == nil {
69+
return w, nil
7870
}
79-
return w, nil
71+
72+
key, err := backupencryption.GetEncryptionKey(ctx, enc, nil)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
return storageccl.EncryptingWriter(w, key)
8078
}
8179

8280
getRandomPayload := func(buf []byte) {
@@ -87,7 +85,7 @@ func BenchmarkIteratorMemory(b *testing.B) {
8785
}
8886
}
8987

90-
writeSST := func(w io.WriteCloser, store cloud.ExternalStorage, payloadSize int, numKeys int) {
88+
writeSST := func(w objstorage.Writable, store cloud.ExternalStorage, payloadSize int, numKeys int) {
9189
sst := storage.MakeTransportSSTWriter(ctx, store.Settings(), w)
9290

9391
buf := make([]byte, payloadSize)
@@ -156,7 +154,7 @@ func BenchmarkIteratorMemory(b *testing.B) {
156154
require.NoError(b, err)
157155

158156
writeSST(w, store, 100, rows)
159-
require.NoError(b, w.Close())
157+
require.NoError(b, w.Finish())
160158

161159
sz, err := store.Size(ctx, filename)
162160
require.NoError(b, err)

0 commit comments

Comments
 (0)