Skip to content

Commit a2fc441

Browse files
authored
Adding Google Storage Requester pays feature to Golang SDK. (#33236)
* Adding Google Storage Requester pays feature to Golang SDK. Setting UserProject on Google Storage Bucket operations to enable requester pays feature. Requester pays project ID will come from environment variable named `BILLING_PROJECT_ID` More information about Google storage requester pays feature here https://cloud.google.com/storage/docs/requester-pays * Adding new entry to CHANGES.md
1 parent 34ba184 commit a2fc441

File tree

3 files changed

+75
-12
lines changed

3 files changed

+75
-12
lines changed

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7575

7676
## New Features / Improvements
77-
77+
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
7878
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7979
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
8080
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support

sdks/go/pkg/beam/io/filesystem/gcs/gcs.go

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"cloud.google.com/go/storage"
28+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
2829
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
2930
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
3031
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
@@ -33,8 +34,30 @@ import (
3334
"google.golang.org/api/iterator"
3435
)
3536

37+
const (
38+
projectBillingHook = "beam:go:hook:filesystem:billingproject"
39+
)
40+
41+
var billingProject string = ""
42+
3643
func init() {
3744
filesystem.Register("gs", New)
45+
hf := func(opts []string) hooks.Hook {
46+
return hooks.Hook{
47+
Init: func(ctx context.Context) (context.Context, error) {
48+
if len(opts) == 0 {
49+
return ctx, nil
50+
}
51+
if len(opts) > 1 {
52+
return ctx, fmt.Errorf("expected 1 option, got %v: %v", len(opts), opts)
53+
}
54+
55+
billingProject = opts[0]
56+
return ctx, nil
57+
},
58+
}
59+
}
60+
hooks.RegisterHook(projectBillingHook, hf)
3861
}
3962

4063
type fs struct {
@@ -44,6 +67,7 @@ type fs struct {
4467
// New creates a new Google Cloud Storage filesystem using application
4568
// default credentials. If it fails, it falls back to unauthenticated
4669
// access.
70+
// It will use the environment variable named `BILLING_PROJECT_ID` as requester payer bucket attribute.
4771
func New(ctx context.Context) filesystem.Interface {
4872
client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
4973
if err != nil {
@@ -54,7 +78,23 @@ func New(ctx context.Context) filesystem.Interface {
5478
panic(errors.Wrapf(err, "failed to create GCS client"))
5579
}
5680
}
57-
return &fs{client: client}
81+
return &fs{
82+
client: client,
83+
}
84+
}
85+
86+
func SetRequesterBillingProject(project string) {
87+
billingProject = project
88+
}
89+
90+
// RequesterBillingProject configure project to be used in google storage operations
91+
// with requester pays actived. More informaiton about requester pays in https://cloud.google.com/storage/docs/requester-pays
92+
func RequesterBillingProject(project string) error {
93+
if project == "" {
94+
return fmt.Errorf("project cannot be empty, got %v", project)
95+
}
96+
// The hook itself is defined in beam/core/runtime/harness/file_system_hooks.go
97+
return hooks.EnableHook(projectBillingHook, project)
5898
}
5999

60100
func (f *fs) Close() error {
@@ -73,7 +113,7 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
73113
// For now, we assume * is the first matching character to make a
74114
// prefix listing and not list the entire bucket.
75115
prefix := fsx.GetPrefix(object)
76-
it := f.client.Bucket(bucket).Objects(ctx, &storage.Query{
116+
it := f.client.Bucket(bucket).UserProject(billingProject).Objects(ctx, &storage.Query{
77117
Prefix: prefix,
78118
})
79119
for {
@@ -107,7 +147,7 @@ func (f *fs) OpenRead(ctx context.Context, filename string) (io.ReadCloser, erro
107147
return nil, err
108148
}
109149

110-
return f.client.Bucket(bucket).Object(object).NewReader(ctx)
150+
return f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewReader(ctx)
111151
}
112152

113153
// TODO(herohde) 7/12/2017: should we create the bucket in OpenWrite? For now, "no".
@@ -118,7 +158,7 @@ func (f *fs) OpenWrite(ctx context.Context, filename string) (io.WriteCloser, er
118158
return nil, err
119159
}
120160

121-
return f.client.Bucket(bucket).Object(object).NewWriter(ctx), nil
161+
return f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewWriter(ctx), nil
122162
}
123163

124164
func (f *fs) Size(ctx context.Context, filename string) (int64, error) {
@@ -127,7 +167,7 @@ func (f *fs) Size(ctx context.Context, filename string) (int64, error) {
127167
return -1, err
128168
}
129169

130-
obj := f.client.Bucket(bucket).Object(object)
170+
obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object)
131171
attrs, err := obj.Attrs(ctx)
132172
if err != nil {
133173
return -1, err
@@ -143,7 +183,7 @@ func (f *fs) LastModified(ctx context.Context, filename string) (time.Time, erro
143183
return time.Time{}, err
144184
}
145185

146-
obj := f.client.Bucket(bucket).Object(object)
186+
obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object)
147187
attrs, err := obj.Attrs(ctx)
148188
if err != nil {
149189
return time.Time{}, err
@@ -159,7 +199,7 @@ func (f *fs) Remove(ctx context.Context, filename string) error {
159199
return err
160200
}
161201

162-
obj := f.client.Bucket(bucket).Object(object)
202+
obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object)
163203
return obj.Delete(ctx)
164204
}
165205

@@ -169,13 +209,13 @@ func (f *fs) Copy(ctx context.Context, srcpath, dstpath string) error {
169209
if err != nil {
170210
return err
171211
}
172-
srcobj := f.client.Bucket(bucket).Object(src)
212+
srcobj := f.client.Bucket(bucket).UserProject(billingProject).Object(src)
173213

174214
bucket, dst, err := gcsx.ParseObject(dstpath)
175215
if err != nil {
176216
return err
177217
}
178-
dstobj := f.client.Bucket(bucket).Object(dst)
218+
dstobj := f.client.Bucket(bucket).UserProject(billingProject).Object(dst)
179219

180220
cp := dstobj.CopierFrom(srcobj)
181221
_, err = cp.Run(ctx)
@@ -188,13 +228,13 @@ func (f *fs) Rename(ctx context.Context, srcpath, dstpath string) error {
188228
if err != nil {
189229
return err
190230
}
191-
srcobj := f.client.Bucket(bucket).Object(src)
231+
srcobj := f.client.Bucket(bucket).UserProject(billingProject).Object(src)
192232

193233
bucket, dst, err := gcsx.ParseObject(dstpath)
194234
if err != nil {
195235
return err
196236
}
197-
dstobj := f.client.Bucket(bucket).Object(dst)
237+
dstobj := f.client.Bucket(bucket).UserProject(billingProject).Object(dst)
198238

199239
cp := dstobj.CopierFrom(srcobj)
200240
_, err = cp.Run(ctx)

sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
2526
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
2627
"github.com/fsouza/fake-gcs-server/fakestorage"
2728
"github.com/google/go-cmp/cmp"
@@ -43,6 +44,28 @@ func TestGCS_FilesystemNew(t *testing.T) {
4344
}
4445

4546
func TestGCS_direct(t *testing.T) {
47+
testGCS_direct(t)
48+
}
49+
50+
func TestGCS_BillingProjectHookEnable(t *testing.T) {
51+
billingProject := "whatever"
52+
RequesterBillingProject(billingProject)
53+
_, err := hooks.RunInitHooks(context.Background())
54+
if err != nil {
55+
t.Errorf("error to init hooks = %v", err)
56+
}
57+
projectBillingHook := "beam:go:hook:filesystem:billingproject"
58+
projectBillingHookIsEnable, hookValue := hooks.IsEnabled(projectBillingHook)
59+
if !projectBillingHookIsEnable {
60+
t.Error("project billing hook isn't enable")
61+
}
62+
if hookValue[0] != billingProject {
63+
t.Errorf("projectBillingHook value wrong / want {%s} got {%s}", billingProject, hookValue[0])
64+
}
65+
66+
}
67+
68+
func testGCS_direct(t *testing.T) {
4669
ctx := context.Background()
4770
dirPath := "gs://beamgogcsfilesystemtest"
4871
filePath := dirPath + "/file.txt"

0 commit comments

Comments
 (0)