Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blob/service/api/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ func (r *Router) GetDeviceLogsContent(res rest.ResponseWriter, req *rest.Request

content, err := blobClient.GetDeviceLogsContent(req.Context(), *deviceLogMetadata.ID)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
responder.CodedError(err)
return
}
if content == nil || content.Body == nil {
responder.Error(http.StatusNotFound, request.ErrorResourceNotFoundWithID(deviceLogID))
responder.CodedError(request.ErrorResourceNotFoundWithID(deviceLogID))
return
}
defer content.Body.Close()
Expand Down
27 changes: 27 additions & 0 deletions blob/service/api/v1/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,20 @@ var _ = Describe("V1", func() {
errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0])
})

It("responds with not found error when the client returns a blob but the blob has no content", func() {
// This has happened when blob creation succeeded but the request context canceled before the contents finished uploading to S3 so there is a "stray" blob w/o content
deviceLogsBlob := blobTest.RandomDeviceLogsBlob()
client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}}
client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}}
res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}}

handlerFunc(res, req)
Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound}))
Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}}))
Expect(res.WriteInputs).To(HaveLen(1))
errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0])
})

It("responds successfully with headers", func() {
deviceLogsBlob := blobTest.RandomDeviceLogsBlob()
content := blob.NewDeviceLogsContent()
Expand Down Expand Up @@ -607,6 +621,19 @@ var _ = Describe("V1", func() {
errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0])
})

It("responds with not found error when the client returns a blob but the blob has no content", func() {
deviceLogsBlob := blobTest.RandomDeviceLogsBlob()
deviceLogsBlob.UserID = pointer.FromString(userID)
client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}}
client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}}
res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}}

handlerFunc(res, req)
Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound}))
Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}}))
Expect(res.WriteInputs).To(HaveLen(1))
errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0])
})
It("responds successfully with headers for user's own logs content", func() {
deviceLogsBlob := blobTest.RandomDeviceLogsBlob()
deviceLogsBlob.UserID = pointer.FromString(userID)
Expand Down
104 changes: 74 additions & 30 deletions blob/service/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/md5"
"encoding/base64"
stdErrs "errors"
"io"
"time"

"github.com/tidepool-org/platform/blob"
blobStoreStructured "github.com/tidepool-org/platform/blob/store/structured"
Expand All @@ -19,6 +21,11 @@ import (
structureValidator "github.com/tidepool-org/platform/structure/validator"
)

const (
// arbritrary timeout value for cleanup operations such as deleting from S3 or Repo
defaultCleanupTimeout = time.Second * 5
)

type Provider interface {
BlobStructuredStore() blobStoreStructured.Store
BlobUnstructuredStore() blobStoreUnstructured.Store
Expand Down Expand Up @@ -70,31 +77,40 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten
options.MediaType = content.MediaType
err = c.BlobUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options)
if err != nil {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
return nil
})
return nil, err
}

size := sizer.Size
if size > blob.SizeMaximum {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
return nil
})
return nil, request.ErrorResourceTooLarge()
}

digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
return nil
})
return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5"))
}

Expand Down Expand Up @@ -129,31 +145,41 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b
options.MediaType = content.MediaType
err = c.DeviceLogsUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options)
if err != nil {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
return nil
})
return nil, err
}

size := sizer.Size
if size > blob.SizeMaximum {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
return nil
})
return nil, request.ErrorResourceTooLarge()
}

digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
return nil
})

return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5"))
}

Expand Down Expand Up @@ -287,3 +313,21 @@ func (s *SizeWriter) Write(bites []byte) (int, error) {
s.Size += length
return length, nil
}

// doDoneCtx performs an action given a context even if the context is
// canceled or timed out. This is used if we have any cleanup functions that we
// still want to perform and passing the parent context would time out any
// child contexts.
func doDoneCtx(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error {
newContext := ctx
var cancel context.CancelFunc
select {
case <-ctx.Done():
if stdErrs.Is(ctx.Err(), context.Canceled) || stdErrs.Is(ctx.Err(), context.DeadlineExceeded) {
newContext, cancel = context.WithTimeout(context.WithoutCancel(ctx), timeout)
defer cancel()
}
default:
}
return fn(newContext)
}
4 changes: 4 additions & 0 deletions request/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (r *Responder) Error(statusCode int, err error, mutators ...ResponseMutator
}
}

func (r *Responder) CodedError(err error, mutators ...ResponseMutator) {
r.Error(StatusCodeForError(err), err, mutators...)
}

func (r *Responder) InternalServerError(err error, mutators ...ResponseMutator) {
if err == nil {
err = ErrorInternalServerError(errors.New("error is missing"))
Expand Down