Skip to content

Commit 8e5a572

Browse files
ocampeauOlivier Campeau
authored andcommitted
Support for Conditionnal Write
This commit introduces support for conditional writes. It does so my adding a new write options: IfNotExist. When set to true, conditions are set for each driver to enable the desired behavior.
1 parent 7caced7 commit 8e5a572

File tree

8 files changed

+177
-25
lines changed

8 files changed

+177
-25
lines changed

blob/azureblob/azureblob.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,14 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
913913
BlobContentType: &contentType,
914914
},
915915
}
916+
if opts.IfNotExist {
917+
etagAny := azcore.ETagAny
918+
uploadOpts.AccessConditions = &azblob.AccessConditions{
919+
ModifiedAccessConditions: &azblobblob.ModifiedAccessConditions{
920+
IfNoneMatch: &etagAny,
921+
},
922+
}
923+
}
916924
if opts.BeforeWrite != nil {
917925
asFunc := func(i any) bool {
918926
p, ok := i.(**azblob.UploadStreamOptions)

blob/blob.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,7 @@ func (b *Bucket) NewWriter(ctx context.Context, key string, opts *WriterOptions)
10981098
MaxConcurrency: opts.MaxConcurrency,
10991099
BeforeWrite: opts.BeforeWrite,
11001100
DisableContentTypeDetection: opts.DisableContentTypeDetection,
1101+
IfNotExist: opts.IfNotExist,
11011102
}
11021103
if len(opts.Metadata) > 0 {
11031104
// Services are inconsistent, but at least some treat keys
@@ -1422,6 +1423,20 @@ type WriterOptions struct {
14221423
// asFunc converts its argument to driver-specific types.
14231424
// See https://gocloud.dev/concepts/as/ for background information.
14241425
BeforeWrite func(asFunc func(any) bool) error
1426+
1427+
// IfNotExist is used for conditional writes.
1428+
// When set to 'true', if a blob exists for the same key in the
1429+
// bucket, the write operation won't succeed and the current blob
1430+
// for the key will be left untouched.
1431+
// For 'fileblob' and 'memblob', an error of type 'PreconditionFailed'
1432+
// will be returned by the 'Close' method. This is due to how those driver
1433+
// are implemented, which buffers the content during writes and perform
1434+
// the actual write at close time.
1435+
// For cloud providers (aws, azure, google), an error of type 'PreconditionFailed'
1436+
// will be returned by the 'Write' method, since a call to 'Write' results
1437+
// in an API call to the cloud provider.
1438+
// Default is set to 'false'.
1439+
IfNotExist bool
14251440
}
14261441

14271442
// CopyOptions sets options for Copy.

blob/driver/driver.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ type WriterOptions struct {
110110
// asFunc allows drivers to expose driver-specific types;
111111
// see Bucket.As for more details.
112112
BeforeWrite func(asFunc func(any) bool) error
113+
114+
// IfNotExist is used for conditional writes.
115+
// When set to true, if a blob exists for the same key in the bucket, the write operation
116+
// won't take place.
117+
IfNotExist bool
113118
}
114119

115120
// CopyOptions controls options for Copy.

blob/drivertest/drivertest.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,9 @@ func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest
260260
t.Run("TestSignedURL", func(t *testing.T) {
261261
testSignedURL(t, newHarness)
262262
})
263+
//t.Run("TestIfNotExist", func(t *testing.T) {
264+
// testIfNotExist(t, newHarness)
265+
//})
263266
asTests = append(asTests, verifyAsFailsOnNil{})
264267
t.Run("TestAs", func(t *testing.T) {
265268
for _, st := range asTests {
@@ -2734,6 +2737,70 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
27342737
}
27352738
}
27362739

2740+
//func testIfNotExist(t *testing.T, newHarness HarnessMaker) {
2741+
// t.Helper()
2742+
//
2743+
// const key = "blob-for-if-not-exist"
2744+
// const contents = "up and down"
2745+
//
2746+
// ctx := context.Background()
2747+
// h, err := newHarness(ctx, t)
2748+
// if err != nil {
2749+
// t.Fatal(err)
2750+
// }
2751+
// defer h.Close()
2752+
// drv, err := h.MakeDriver(ctx)
2753+
// if err != nil {
2754+
// t.Fatal(err)
2755+
// }
2756+
// b := blob.NewBucket(drv)
2757+
// defer func() { _ = b.Close() }()
2758+
//
2759+
// opts := blob.WriterOptions{
2760+
// ContentType: "text",
2761+
// IfNotExist: true,
2762+
// }
2763+
//
2764+
// // Create one file for the key
2765+
// w1, err := b.NewWriter(ctx, key, &opts)
2766+
// if err != nil {
2767+
// t.Fatal(err)
2768+
// }
2769+
//
2770+
// defer func() {
2771+
// _ = b.Delete(ctx, key)
2772+
// }()
2773+
//
2774+
// // Write to the file
2775+
// if _, err := w1.Write([]byte(contents)); err != nil {
2776+
// t.Fatal(err)
2777+
// }
2778+
//
2779+
// // Closing the file (ie: thus writing the content to the file)
2780+
// // should not return an error
2781+
// if err := w1.Close(); err != nil {
2782+
// t.Fatal(err)
2783+
// }
2784+
//
2785+
// // Create a new writer for the same key
2786+
// w2, err := b.NewWriter(ctx, key, &opts)
2787+
// if err != nil {
2788+
// t.Fatal(err)
2789+
// }
2790+
//
2791+
// // Write to the file
2792+
// if _, err := w2.Write([]byte(contents)); err != nil {
2793+
// t.Fatal(err)
2794+
// }
2795+
//
2796+
// // Closing the file (ie: thus writing the content to the file)
2797+
// // should return a 'PreconditionFailed' error since the file
2798+
// // already exists
2799+
// if err = w2.Close(); err == nil {
2800+
// t.Fatal(err)
2801+
// }
2802+
//}
2803+
27372804
func benchmarkRead(b *testing.B, bkt *blob.Bucket) {
27382805
b.Helper()
27392806

blob/fileblob/fileblob.go

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -739,9 +739,10 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
739739

740740
if b.opts.Metadata == MetadataDontWrite {
741741
w := &writer{
742-
ctx: ctx,
743-
File: f,
744-
path: path,
742+
ctx: ctx,
743+
File: f,
744+
path: path,
745+
ifNotExist: opts.IfNotExist,
745746
}
746747
return w, nil
747748
}
@@ -765,6 +766,7 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
765766
attrs: attrs,
766767
contentMD5: opts.ContentMD5,
767768
md5hash: md5.New(),
769+
ifNotExist: opts.IfNotExist,
768770
}
769771
return w, nil
770772
}
@@ -778,7 +780,8 @@ type writerWithSidecar struct {
778780
contentMD5 []byte
779781
// We compute the MD5 hash so that we can store it with the file attributes,
780782
// not for verification.
781-
md5hash hash.Hash
783+
md5hash hash.Hash
784+
ifNotExist bool
782785
}
783786

784787
func (w *writerWithSidecar) Write(p []byte) (n int, err error) {
@@ -794,33 +797,55 @@ func (w *writerWithSidecar) Write(p []byte) (n int, err error) {
794797
return n, nil
795798
}
796799

797-
func (w *writerWithSidecar) Close() error {
798-
err := w.f.Close()
799-
if err != nil {
800-
return err
801-
}
800+
func (w *writerWithSidecar) Close() (err error) {
802801
// Always delete the temp file. On success, it will have been renamed so
803802
// the Remove will fail.
804803
defer func() {
805804
_ = os.Remove(w.f.Name())
806805
}()
807806

808807
// Check if the write was cancelled.
809-
if err := w.ctx.Err(); err != nil {
808+
if err = w.ctx.Err(); err != nil {
810809
return err
811810
}
812811

813812
md5sum := w.md5hash.Sum(nil)
814813
w.attrs.MD5 = md5sum
815814

816815
// Write the attributes file.
817-
if err := setAttrs(w.path, w.attrs); err != nil {
816+
if err = setAttrs(w.path, w.attrs); err != nil {
818817
return err
819818
}
820-
// Rename the temp file to path.
821-
if err := os.Rename(w.f.Name(), w.path); err != nil {
822-
_ = os.Remove(w.path + attrsExt)
823-
return err
819+
820+
defer func() {
821+
if err != nil {
822+
_ = os.Remove(w.path + attrsExt)
823+
}
824+
}()
825+
826+
if w.ifNotExist {
827+
fileOptions := os.O_RDWR | os.O_CREATE | os.O_EXCL
828+
fd, err := os.OpenFile(w.path, fileOptions, 0666)
829+
if err != nil {
830+
return gcerr.New(gcerrors.FailedPrecondition, err, 1, "File already exist")
831+
}
832+
defer func() { _ = fd.Close() }()
833+
834+
// Set offset at the beginning of the file
835+
_, err = w.f.Seek(0, 0)
836+
if err != nil {
837+
return err
838+
}
839+
840+
// Copy content from the temp file to the new file
841+
_, err = w.f.WriteTo(fd)
842+
if err != nil {
843+
return err
844+
}
845+
} else {
846+
if err := os.Rename(w.f.Name(), w.path); err != nil {
847+
return err
848+
}
824849
}
825850
return nil
826851
}
@@ -831,8 +856,9 @@ func (w *writerWithSidecar) Close() error {
831856
// which is why it is not folded into writerWithSidecar.
832857
type writer struct {
833858
*os.File
834-
ctx context.Context
835-
path string
859+
ctx context.Context
860+
path string
861+
ifNotExist bool
836862
}
837863

838864
func (w *writer) Upload(r io.Reader) error {
@@ -841,10 +867,6 @@ func (w *writer) Upload(r io.Reader) error {
841867
}
842868

843869
func (w *writer) Close() error {
844-
err := w.File.Close()
845-
if err != nil {
846-
return err
847-
}
848870
// Always delete the temp file. On success, it will have been renamed so
849871
// the Remove will fail.
850872
tempname := w.File.Name()
@@ -855,9 +877,29 @@ func (w *writer) Close() error {
855877
return err
856878
}
857879

858-
// Rename the temp file to path.
859-
if err := os.Rename(tempname, w.path); err != nil {
860-
return err
880+
if w.ifNotExist {
881+
fileOptions := os.O_RDWR | os.O_CREATE | os.O_EXCL
882+
fd, err := os.OpenFile(w.path, fileOptions, 0666)
883+
if err != nil {
884+
return gcerr.New(gcerrors.FailedPrecondition, err, 1, "File already exist")
885+
}
886+
defer func() { _ = fd.Close() }()
887+
888+
// Set offset at the beginning of the file
889+
_, err = w.File.Seek(0, 0)
890+
if err != nil {
891+
return err
892+
}
893+
894+
// Copy content from the temp file to the new file
895+
_, err = w.File.WriteTo(fd)
896+
if err != nil {
897+
return err
898+
}
899+
} else {
900+
if err := os.Rename(w.File.Name(), w.path); err != nil {
901+
return err
902+
}
861903
}
862904
return nil
863905
}

blob/gcsblob/gcsblob.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,9 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
626626
bkt := b.client.Bucket(b.name)
627627
obj := bkt.Object(key)
628628

629+
if opts.IfNotExist {
630+
obj = obj.If(storage.Conditions{DoesNotExist: true})
631+
}
629632
// Add an extra level of indirection so that BeforeWrite can replace obj
630633
// if needed. For example, ObjectHandle.If returns a new ObjectHandle.
631634
// Also, make the Writer lazily in case this replacement happens.

blob/memblob/memblob.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
291291
for k, v := range opts.Metadata {
292292
md[k] = v
293293
}
294+
294295
return &writer{
295296
ctx: ctx,
296297
b: b,
@@ -299,6 +300,7 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
299300
metadata: md,
300301
opts: opts,
301302
md5hash: md5.New(),
303+
ifNotExist: opts.IfNotExist,
302304
}, nil
303305
}
304306

@@ -312,7 +314,8 @@ type writer struct {
312314
buf bytes.Buffer
313315
// We compute the MD5 hash so that we can store it with the file attributes,
314316
// not for verification.
315-
md5hash hash.Hash
317+
md5hash hash.Hash
318+
ifNotExist bool
316319
}
317320

318321
func (w *writer) Write(p []byte) (n int, err error) {
@@ -355,6 +358,9 @@ func (w *writer) Close() error {
355358
w.b.mu.Lock()
356359
defer w.b.mu.Unlock()
357360
if prev := w.b.blobs[w.key]; prev != nil {
361+
if w.ifNotExist {
362+
return errors.New("'IfNotExists' is set and blob already exists for key")
363+
}
358364
entry.Attributes.CreateTime = prev.Attributes.CreateTime
359365
}
360366
w.b.blobs[w.key] = entry

blob/s3blob/s3blob.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,11 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
748748
Key: aws.String(key),
749749
Metadata: md,
750750
}
751+
752+
if opts.IfNotExist {
753+
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html
754+
req.IfNoneMatch = aws.String("*")
755+
}
751756
if opts.CacheControl != "" {
752757
req.CacheControl = aws.String(opts.CacheControl)
753758
}
@@ -769,6 +774,7 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
769774
if b.kmsKeyId != "" {
770775
req.SSEKMSKeyId = aws.String(b.kmsKeyId)
771776
}
777+
772778
if opts.BeforeWrite != nil {
773779
asFunc := func(i any) bool {
774780
// Note that since the Go CDK Blob

0 commit comments

Comments
 (0)