Skip to content

Commit 0f67c75

Browse files
sdks/go: require type registrations in Native BigQuery IO (#33988)
* sdks/go: require type registration in BigQueryIO * sdks/go: test type register is required BigQueryIO * sdks/go: panic with clear err msg if already init * sdks/go: panic only if type not registered Co-authored-by: Danny McCormick <damccorm@users.noreply.github.com>"
1 parent 5ca14c1 commit 0f67c75

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

sdks/go/pkg/beam/io/bigqueryio/bigquery.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"cloud.google.com/go/bigquery"
2929
"github.com/apache/beam/sdks/v2/go/pkg/beam"
30+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
3031
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
3132
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
3233
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
@@ -190,13 +191,29 @@ func mustInferSchema(t reflect.Type) bigquery.Schema {
190191
if t.Kind() != reflect.Struct {
191192
panic(fmt.Sprintf("schema type must be struct: %v", t))
192193
}
194+
195+
checkTypeRegistered(t)
196+
193197
schema, err := bigquery.InferSchema(reflect.Zero(t).Interface())
194198
if err != nil {
195199
panic(errors.Wrapf(err, "invalid schema type: %v", t))
196200
}
197201
return schema
198202
}
199203

204+
func checkTypeRegistered(t reflect.Type) {
205+
t = reflectx.SkipPtr(t)
206+
key, ok := runtime.TypeKey(t)
207+
if !ok {
208+
panic(fmt.Sprintf("type %v must be a named type (not anonymous) for registration", t))
209+
}
210+
211+
if _, registered := runtime.LookupType(key); !registered {
212+
panic(fmt.Sprintf("type %v is not registered. Ensure that beam.RegisterType(%v) "+
213+
"is called before beam.Init().", t, t))
214+
}
215+
}
216+
200217
func mustParseTable(table string) QualifiedTableName {
201218
qn, err := NewQualifiedTableName(table)
202219
if err != nil {

sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package bigqueryio
1818
import (
1919
"reflect"
2020
"testing"
21+
22+
"cloud.google.com/go/bigquery"
23+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
2124
)
2225

2326
func TestNewQualifiedTableName(t *testing.T) {
@@ -76,3 +79,72 @@ func Test_constructSelectStatementPanic(t *testing.T) {
7679
constructSelectStatement(typ, tagKey, table)
7780
})
7881
}
82+
83+
func Test_mustInferSchema(t *testing.T) {
84+
type TestSchema struct {
85+
Name bigquery.NullString `bigquery:"name"`
86+
Active bigquery.NullBool `bigquery:"active"`
87+
Score bigquery.NullFloat64 `bigquery:"score"`
88+
Time bigquery.NullDateTime `bigquery:"time"`
89+
}
90+
91+
tests := []struct {
92+
name string
93+
input interface{}
94+
wantErr bool
95+
prep func(reflect.Type) error
96+
verify func(reflect.Type) error
97+
}{
98+
{
99+
name: "NotRegisteredType_ShouldPanic",
100+
input: TestSchema{},
101+
wantErr: true,
102+
prep: func(t reflect.Type) error { return nil },
103+
verify: func(t reflect.Type) error { return nil },
104+
},
105+
{
106+
name: "AlreadyRegisteredType_ShouldNotPanic",
107+
input: TestSchema{},
108+
wantErr: false,
109+
prep: func(t reflect.Type) error {
110+
beam.RegisterType(t)
111+
return nil
112+
},
113+
verify: func(t reflect.Type) error {
114+
mustInferSchema(t)
115+
return nil
116+
},
117+
},
118+
{
119+
name: "AnonymousStruct_ShouldPanic",
120+
input: struct{}{},
121+
wantErr: true,
122+
prep: func(t reflect.Type) error { return nil },
123+
verify: func(t reflect.Type) error { return nil },
124+
},
125+
}
126+
127+
for _, tt := range tests {
128+
t.Run(tt.name, func(t *testing.T) {
129+
defer func() {
130+
r := recover()
131+
if (r != nil) != tt.wantErr {
132+
t.Errorf("mustInferSchema() panic = %v, wantErr %v", r, tt.wantErr)
133+
}
134+
}()
135+
136+
typ := reflect.TypeOf(tt.input)
137+
if err := tt.prep(typ); err != nil {
138+
t.Fatalf("failed to prep test environment, got err: %v", err)
139+
}
140+
141+
mustInferSchema(typ)
142+
if tt.wantErr {
143+
t.Fatal("Expected panic did not occur")
144+
}
145+
if err := tt.verify(typ); err != nil {
146+
t.Fatal(err)
147+
}
148+
})
149+
}
150+
}

0 commit comments

Comments
 (0)