Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions blob/azureblob/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,14 @@
BlobContentType: &contentType,
},
}
if opts.IfNotExist {
etagAny := azcore.ETagAny
uploadOpts.AccessConditions = &azblob.AccessConditions{
ModifiedAccessConditions: &azblobblob.ModifiedAccessConditions{
IfNoneMatch: &etagAny,
},

Check warning on line 921 in blob/azureblob/azureblob.go

View check run for this annotation

Codecov / codecov/patch

blob/azureblob/azureblob.go#L916-L921

Added lines #L916 - L921 were not covered by tests
}
}
if opts.BeforeWrite != nil {
asFunc := func(i any) bool {
p, ok := i.(**azblob.UploadStreamOptions)
Expand Down
8 changes: 8 additions & 0 deletions blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,7 @@ func (b *Bucket) NewWriter(ctx context.Context, key string, opts *WriterOptions)
MaxConcurrency: opts.MaxConcurrency,
BeforeWrite: opts.BeforeWrite,
DisableContentTypeDetection: opts.DisableContentTypeDetection,
IfNotExist: opts.IfNotExist,
}
if len(opts.Metadata) > 0 {
// Services are inconsistent, but at least some treat keys
Expand Down Expand Up @@ -1422,6 +1423,13 @@ type WriterOptions struct {
// asFunc converts its argument to driver-specific types.
// See https://gocloud.dev/concepts/as/ for background information.
BeforeWrite func(asFunc func(any) bool) error

// IfNotExist is used for conditional writes. When set to 'true',
// if a blob exists for the same key in the bucket, the write
// operation won't succeed and the current blob for the key will
// be left untouched. An error for which gcerrors.Code will return
// gcerrors.PreconditionFailed will be returned by Write or Close.
IfNotExist bool
}

// CopyOptions sets options for Copy.
Expand Down
5 changes: 5 additions & 0 deletions blob/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type WriterOptions struct {
// asFunc allows drivers to expose driver-specific types;
// see Bucket.As for more details.
BeforeWrite func(asFunc func(any) bool) error

// IfNotExist is used for conditional writes.
// When set to true, if a blob exists for the same key in the bucket, the write operation
// won't take place.
IfNotExist bool
}

// CopyOptions controls options for Copy.
Expand Down
70 changes: 70 additions & 0 deletions blob/drivertest/drivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest
t.Run("TestSignedURL", func(t *testing.T) {
testSignedURL(t, newHarness)
})
//t.Run("TestIfNotExist", func(t *testing.T) {
// testIfNotExist(t, newHarness)
//})
asTests = append(asTests, verifyAsFailsOnNil{})
t.Run("TestAs", func(t *testing.T) {
for _, st := range asTests {
Expand Down Expand Up @@ -2734,6 +2737,73 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
}
}

func testIfNotExist(t *testing.T, newHarness HarnessMaker) {
t.Helper()

const key = "blob-for-if-not-exist"
const contents = "up and down"

ctx := context.Background()
h, err := newHarness(ctx, t)
if err != nil {
t.Fatal(err)
}
defer h.Close()
drv, err := h.MakeDriver(ctx)
if err != nil {
t.Fatal(err)
}
b := blob.NewBucket(drv)
defer func() { _ = b.Close() }()

opts := blob.WriterOptions{
ContentType: "text",
IfNotExist: true,
}

// Create one file for the key
w1, err := b.NewWriter(ctx, key, &opts)
if err != nil {
t.Fatal(err)
}

defer func() {
_ = b.Delete(ctx, key)
}()

// Write to the file
if _, err := w1.Write([]byte(contents)); err != nil {
t.Fatal(err)
}

// Closing the file (ie: thus writing the content to the file)
// should not return an error
if err := w1.Close(); err != nil {
t.Fatal(err)
}

// Create a new writer for the same key
w2, err := b.NewWriter(ctx, key, &opts)
if err != nil {
t.Fatal(err)
}

// Write to the file
// We expect an error either from `Write` or `Close`
if _, err = w2.Write([]byte(contents)); err == nil {
err = w2.Close()
} else {
_ = w2.Close()
}

if err == nil {
t.Error("expected error rewriting key with IfNotExist, got nil")
}
if code := gcerrors.Code(err); code != gcerrors.FailedPrecondition {
t.Errorf("expected FailedPrecondition error, got %v", code)
}
}

func benchmarkRead(b *testing.B, bkt *blob.Bucket) {
b.Helper()

Expand Down
39 changes: 33 additions & 6 deletions blob/fileblob/fileblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"gocloud.dev/blob"
Expand Down Expand Up @@ -739,9 +740,11 @@

if b.opts.Metadata == MetadataDontWrite {
w := &writer{
ctx: ctx,
File: f,
path: path,
ctx: ctx,
File: f,
path: path,
ifNotExist: opts.IfNotExist,
mu: &sync.Mutex{},
}
return w, nil
}
Expand All @@ -765,6 +768,8 @@
attrs: attrs,
contentMD5: opts.ContentMD5,
md5hash: md5.New(),
ifNotExist: opts.IfNotExist,
mu: &sync.Mutex{},
}
return w, nil
}
Expand All @@ -778,7 +783,9 @@
contentMD5 []byte
// We compute the MD5 hash so that we can store it with the file attributes,
// not for verification.
md5hash hash.Hash
md5hash hash.Hash
ifNotExist bool
mu *sync.Mutex
}

func (w *writerWithSidecar) Write(p []byte) (n int, err error) {
Expand Down Expand Up @@ -817,6 +824,15 @@
if err := setAttrs(w.path, w.attrs); err != nil {
return err
}

if w.ifNotExist {
w.mu.Lock()
defer w.mu.Unlock()
_, err = os.Stat(w.path)
if err == nil {
return gcerr.New(gcerrors.FailedPrecondition, err, 1, "File already exist")

Check warning on line 833 in blob/fileblob/fileblob.go

View check run for this annotation

Codecov / codecov/patch

blob/fileblob/fileblob.go#L829-L833

Added lines #L829 - L833 were not covered by tests
}
}
// Rename the temp file to path.
if err := os.Rename(w.f.Name(), w.path); err != nil {
_ = os.Remove(w.path + attrsExt)
Expand All @@ -831,8 +847,10 @@
// which is why it is not folded into writerWithSidecar.
type writer struct {
*os.File
ctx context.Context
path string
ctx context.Context
path string
ifNotExist bool
mu *sync.Mutex
}

func (w *writer) Upload(r io.Reader) error {
Expand All @@ -855,6 +873,15 @@
return err
}

if w.ifNotExist {
w.mu.Lock()
defer w.mu.Unlock()
_, err = os.Stat(w.path)
if err == nil {
return gcerr.New(gcerrors.FailedPrecondition, err, 1, "File already exist")

Check warning on line 881 in blob/fileblob/fileblob.go

View check run for this annotation

Codecov / codecov/patch

blob/fileblob/fileblob.go#L877-L881

Added lines #L877 - L881 were not covered by tests
}
}

// Rename the temp file to path.
if err := os.Rename(tempname, w.path); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions blob/gcsblob/gcsblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@
bkt := b.client.Bucket(b.name)
obj := bkt.Object(key)

if opts.IfNotExist {
obj = obj.If(storage.Conditions{DoesNotExist: true})

Check warning on line 630 in blob/gcsblob/gcsblob.go

View check run for this annotation

Codecov / codecov/patch

blob/gcsblob/gcsblob.go#L630

Added line #L630 was not covered by tests
}
// Add an extra level of indirection so that BeforeWrite can replace obj
// if needed. For example, ObjectHandle.If returns a new ObjectHandle.
// Also, make the Writer lazily in case this replacement happens.
Expand Down
9 changes: 8 additions & 1 deletion blob/memblob/memblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"crypto/md5"
"errors"
"fmt"
"gocloud.dev/internal/gcerr"
"hash"
"io"
"net/url"
Expand Down Expand Up @@ -299,6 +300,7 @@
metadata: md,
opts: opts,
md5hash: md5.New(),
ifNotExist: opts.IfNotExist,
}, nil
}

Expand All @@ -312,7 +314,8 @@
buf bytes.Buffer
// We compute the MD5 hash so that we can store it with the file attributes,
// not for verification.
md5hash hash.Hash
md5hash hash.Hash
ifNotExist bool
}

func (w *writer) Write(p []byte) (n int, err error) {
Expand Down Expand Up @@ -355,6 +358,10 @@
w.b.mu.Lock()
defer w.b.mu.Unlock()
if prev := w.b.blobs[w.key]; prev != nil {
if w.ifNotExist {
err := errors.New("file already exist")
return gcerr.New(gcerrors.FailedPrecondition, err, 1, "a blob already exist for key")

Check warning on line 363 in blob/memblob/memblob.go

View check run for this annotation

Codecov / codecov/patch

blob/memblob/memblob.go#L362-L363

Added lines #L362 - L363 were not covered by tests
}
entry.Attributes.CreateTime = prev.Attributes.CreateTime
}
w.b.blobs[w.key] = entry
Expand Down
5 changes: 5 additions & 0 deletions blob/s3blob/s3blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,11 @@
Key: aws.String(key),
Metadata: md,
}

if opts.IfNotExist {
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html
req.IfNoneMatch = aws.String("*")

Check warning on line 754 in blob/s3blob/s3blob.go

View check run for this annotation

Codecov / codecov/patch

blob/s3blob/s3blob.go#L754

Added line #L754 was not covered by tests
}
if opts.CacheControl != "" {
req.CacheControl = aws.String(opts.CacheControl)
}
Expand Down