Skip to content

Commit 026f997

Browse files
GCP Storage Bucket binding: Bulk file transfer (#3811)
Signed-off-by: nelson.parente <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent 132f562 commit 026f997

File tree

2 files changed

+107
-2
lines changed

2 files changed

+107
-2
lines changed

bindings/gcp/bucket/bucket.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
"net/url"
2626
"reflect"
2727
"strconv"
28+
"sync"
2829
"time"
2930

3031
"cloud.google.com/go/storage"
3132
"github.com/google/uuid"
33+
"go.uber.org/multierr"
3234
"google.golang.org/api/googleapi"
3335
"google.golang.org/api/iterator"
3436
"google.golang.org/api/option"
@@ -49,8 +51,9 @@ const (
4951
metadataKey = "key"
5052
maxResults = 1000
5153

52-
metadataKeyBC = "name"
53-
signOperation = "sign"
54+
metadataKeyBC = "name"
55+
signOperation = "sign"
56+
bulkGetOperation = "bulkGet"
5457
)
5558

5659
// GCPStorage allows saving data to GCP bucket storage.
@@ -138,6 +141,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
138141
bindings.DeleteOperation,
139142
bindings.ListOperation,
140143
signOperation,
144+
bulkGetOperation,
141145
}
142146
}
143147

@@ -155,6 +159,8 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
155159
return g.list(ctx, req)
156160
case signOperation:
157161
return g.sign(ctx, req)
162+
case bulkGetOperation:
163+
return g.bulkGet(ctx, req)
158164
default:
159165
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
160166
}
@@ -404,3 +410,91 @@ func (g *GCPStorage) signObject(bucket, object, ttl string) (string, error) {
404410
}
405411
return u, nil
406412
}
413+
414+
type objectData struct {
415+
Name string `json:"name"`
416+
Data []byte `json:"data"`
417+
Attrs storage.ObjectAttrs `json:"attrs"`
418+
}
419+
420+
func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
421+
metadata, err := g.metadata.mergeWithRequestMetadata(req)
422+
if err != nil {
423+
return nil, fmt.Errorf("gcp binding error while merging metadata : %w", err)
424+
}
425+
426+
if g.metadata.Bucket == "" {
427+
return nil, errors.New("gcp bucket binding error: bucket is required")
428+
}
429+
430+
var allObjs []*storage.ObjectAttrs
431+
it := g.client.Bucket(g.metadata.Bucket).Objects(ctx, nil)
432+
for {
433+
attrs, err2 := it.Next()
434+
if err2 == iterator.Done {
435+
break
436+
}
437+
allObjs = append(allObjs, attrs)
438+
}
439+
440+
var wg sync.WaitGroup
441+
objectsCh := make(chan objectData, len(allObjs))
442+
errCh := make(chan error, len(allObjs))
443+
444+
for i, obj := range allObjs {
445+
wg.Add(1)
446+
go func(idx int, object *storage.ObjectAttrs) {
447+
defer wg.Done()
448+
449+
rc, err3 := g.client.Bucket(g.metadata.Bucket).Object(object.Name).NewReader(ctx)
450+
if err3 != nil {
451+
errCh <- err3
452+
return
453+
}
454+
defer rc.Close()
455+
456+
data, readErr := io.ReadAll(rc)
457+
if readErr != nil {
458+
errCh <- readErr
459+
return
460+
}
461+
462+
if metadata.EncodeBase64 {
463+
encoded := b64.StdEncoding.EncodeToString(data)
464+
data = []byte(encoded)
465+
}
466+
467+
objectsCh <- objectData{
468+
Name: object.Name,
469+
Data: data,
470+
Attrs: *object,
471+
}
472+
}(i, obj)
473+
}
474+
475+
wg.Wait()
476+
close(errCh)
477+
478+
var multiErr error
479+
for err := range errCh {
480+
multierr.AppendInto(&multiErr, err)
481+
}
482+
483+
if multiErr != nil {
484+
return nil, multiErr
485+
}
486+
487+
response := make([]objectData, 0, len(allObjs))
488+
for obj := range objectsCh {
489+
response = append(response, obj)
490+
}
491+
492+
jsonResponse, err := json.Marshal(response)
493+
if err != nil {
494+
return nil, fmt.Errorf("gcp bucket binding error while marshalling bulk get response: %w", err)
495+
}
496+
497+
return &bindings.InvokeResponse{
498+
Data: jsonResponse,
499+
}, nil
500+
}

bindings/gcp/bucket/bucket_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,14 @@ func TestDeleteOption(t *testing.T) {
254254
require.Error(t, err)
255255
})
256256
}
257+
258+
func TestBulkGetOption(t *testing.T) {
259+
gs := GCPStorage{logger: logger.NewLogger("test")}
260+
gs.metadata = &gcpMetadata{}
261+
262+
t.Run("return error if bucket is missing", func(t *testing.T) {
263+
r := bindings.InvokeRequest{}
264+
_, err := gs.bulkGet(t.Context(), &r)
265+
require.Error(t, err)
266+
})
267+
}

0 commit comments

Comments
 (0)