@@ -25,10 +25,12 @@ import (
2525	"net/url" 
2626	"reflect" 
2727	"strconv" 
28+ 	"sync" 
2829	"time" 
2930
3031	"cloud.google.com/go/storage" 
3132	"github.com/google/uuid" 
33+ 	"go.uber.org/multierr" 
3234	"google.golang.org/api/googleapi" 
3335	"google.golang.org/api/iterator" 
3436	"google.golang.org/api/option" 
@@ -37,7 +39,7 @@ import (
3739	"github.com/dapr/components-contrib/metadata" 
3840	"github.com/dapr/kit/logger" 
3941	kitmd "github.com/dapr/kit/metadata" 
40- 	"github.com/dapr/kit/utils " 
42+ 	"github.com/dapr/kit/strings " 
4143)
4244
4345const  (
@@ -49,8 +51,9 @@ const (
4951	metadataKey  =  "key" 
5052	maxResults   =  1000 
5153
52- 	metadataKeyBC  =  "name" 
53- 	signOperation  =  "sign" 
54+ 	metadataKeyBC     =  "name" 
55+ 	signOperation     =  "sign" 
56+ 	bulkGetOperation  =  "bulkGet" 
5457)
5558
5659// GCPStorage allows saving data to GCP bucket storage. 
@@ -138,6 +141,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
138141		bindings .DeleteOperation ,
139142		bindings .ListOperation ,
140143		signOperation ,
144+ 		bulkGetOperation ,
141145	}
142146}
143147
@@ -155,6 +159,8 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
155159		return  g .list (ctx , req )
156160	case  signOperation :
157161		return  g .sign (ctx , req )
162+ 	case  bulkGetOperation :
163+ 		return  g .bulkGet (ctx , req )
158164	default :
159165		return  nil , fmt .Errorf ("unsupported operation %s" , req .Operation )
160166	}
@@ -325,11 +331,11 @@ func (metadata gcpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest
325331	merged  :=  metadata 
326332
327333	if  val , ok  :=  req .Metadata [metadataDecodeBase64 ]; ok  &&  val  !=  ""  {
328- 		merged .DecodeBase64  =  utils .IsTruthy (val )
334+ 		merged .DecodeBase64  =  strings .IsTruthy (val )
329335	}
330336
331337	if  val , ok  :=  req .Metadata [metadataEncodeBase64 ]; ok  &&  val  !=  ""  {
332- 		merged .EncodeBase64  =  utils .IsTruthy (val )
338+ 		merged .EncodeBase64  =  strings .IsTruthy (val )
333339	}
334340	if  val , ok  :=  req .Metadata [metadataSignTTL ]; ok  &&  val  !=  ""  {
335341		merged .SignTTL  =  val 
@@ -404,3 +410,91 @@ func (g *GCPStorage) signObject(bucket, object, ttl string) (string, error) {
404410	}
405411	return  u , nil 
406412}
413+ 
414+ type  objectData  struct  {
415+ 	Name   string               `json:"name"` 
416+ 	Data   []byte               `json:"data"` 
417+ 	Attrs  storage.ObjectAttrs  `json:"attrs"` 
418+ }
419+ 
420+ func  (g  * GCPStorage ) bulkGet (ctx  context.Context , req  * bindings.InvokeRequest ) (* bindings.InvokeResponse , error ) {
421+ 	metadata , err  :=  g .metadata .mergeWithRequestMetadata (req )
422+ 	if  err  !=  nil  {
423+ 		return  nil , fmt .Errorf ("gcp binding error while merging metadata : %w" , err )
424+ 	}
425+ 
426+ 	if  g .metadata .Bucket  ==  ""  {
427+ 		return  nil , errors .New ("gcp bucket binding error: bucket is required" )
428+ 	}
429+ 
430+ 	var  allObjs  []* storage.ObjectAttrs 
431+ 	it  :=  g .client .Bucket (g .metadata .Bucket ).Objects (ctx , nil )
432+ 	for  {
433+ 		attrs , err2  :=  it .Next ()
434+ 		if  err2  ==  iterator .Done  {
435+ 			break 
436+ 		}
437+ 		allObjs  =  append (allObjs , attrs )
438+ 	}
439+ 
440+ 	var  wg  sync.WaitGroup 
441+ 	objectsCh  :=  make (chan  objectData , len (allObjs ))
442+ 	errCh  :=  make (chan  error , len (allObjs ))
443+ 
444+ 	for  i , obj  :=  range  allObjs  {
445+ 		wg .Add (1 )
446+ 		go  func (idx  int , object  * storage.ObjectAttrs ) {
447+ 			defer  wg .Done ()
448+ 
449+ 			rc , err3  :=  g .client .Bucket (g .metadata .Bucket ).Object (object .Name ).NewReader (ctx )
450+ 			if  err3  !=  nil  {
451+ 				errCh  <-  err3 
452+ 				return 
453+ 			}
454+ 			defer  rc .Close ()
455+ 
456+ 			data , readErr  :=  io .ReadAll (rc )
457+ 			if  readErr  !=  nil  {
458+ 				errCh  <-  readErr 
459+ 				return 
460+ 			}
461+ 
462+ 			if  metadata .EncodeBase64  {
463+ 				encoded  :=  b64 .StdEncoding .EncodeToString (data )
464+ 				data  =  []byte (encoded )
465+ 			}
466+ 
467+ 			objectsCh  <-  objectData {
468+ 				Name :  object .Name ,
469+ 				Data :  data ,
470+ 				Attrs : * object ,
471+ 			}
472+ 		}(i , obj )
473+ 	}
474+ 
475+ 	wg .Wait ()
476+ 	close (errCh )
477+ 
478+ 	var  multiErr  error 
479+ 	for  err  :=  range  errCh  {
480+ 		multierr .AppendInto (& multiErr , err )
481+ 	}
482+ 
483+ 	if  multiErr  !=  nil  {
484+ 		return  nil , multiErr 
485+ 	}
486+ 
487+ 	response  :=  make ([]objectData , 0 , len (allObjs ))
488+ 	for  obj  :=  range  objectsCh  {
489+ 		response  =  append (response , obj )
490+ 	}
491+ 
492+ 	jsonResponse , err  :=  json .Marshal (response )
493+ 	if  err  !=  nil  {
494+ 		return  nil , fmt .Errorf ("gcp bucket binding error while marshalling bulk get response: %w" , err )
495+ 	}
496+ 
497+ 	return  & bindings.InvokeResponse {
498+ 		Data : jsonResponse ,
499+ 	}, nil 
500+ }
0 commit comments