Skip to content

Commit 3267c0a

Browse files
sdks/go: panic only if type not registered
Co-authored-by: Danny McCormick <damccorm@users.noreply.github.com>"
1 parent e511c10 commit 3267c0a

File tree

2 files changed

+19
-24
lines changed

2 files changed

+19
-24
lines changed

sdks/go/pkg/beam/io/bigqueryio/bigquery.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func mustInferSchema(t reflect.Type) bigquery.Schema {
192192
panic(fmt.Sprintf("schema type must be struct: %v", t))
193193
}
194194

195-
registerTypeIfNeeded(t)
195+
checkTypeRegistered(t)
196196

197197
schema, err := bigquery.InferSchema(reflect.Zero(t).Interface())
198198
if err != nil {
@@ -201,22 +201,16 @@ func mustInferSchema(t reflect.Type) bigquery.Schema {
201201
return schema
202202
}
203203

204-
func registerTypeIfNeeded(t reflect.Type) {
204+
func checkTypeRegistered(t reflect.Type) {
205205
t = reflectx.SkipPtr(t)
206206
key, ok := runtime.TypeKey(t)
207207
if !ok {
208208
panic(fmt.Sprintf("type %v must be a named type (not anonymous) for registration", t))
209209
}
210210

211-
// Check if Beam has already been initialized.
212-
if beam.Initialized() {
213-
panic(fmt.Sprintf("Type %v must be registered before beam.Init() is called. "+
214-
"Use beam.RegisterType(%v) in your main setup.", t, t))
215-
}
216-
217-
// Register the type if not already registered.
218211
if _, registered := runtime.LookupType(key); !registered {
219-
runtime.RegisterType(t)
212+
panic(fmt.Sprintf("type %v is not registered. Ensure that beam.RegisterType(%v) "+
213+
"is called before beam.Init().", t, t))
220214
}
221215
}
222216

sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
package bigqueryio
1717

1818
import (
19-
"errors"
2019
"reflect"
2120
"testing"
2221

2322
"cloud.google.com/go/bigquery"
24-
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
23+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
2524
)
2625

2726
func TestNewQualifiedTableName(t *testing.T) {
@@ -93,28 +92,25 @@ func Test_mustInferSchema(t *testing.T) {
9392
name string
9493
input interface{}
9594
wantErr bool
95+
prep func(reflect.Type) error
9696
verify func(reflect.Type) error
9797
}{
9898
{
99-
name: "NewType_ShouldRegisterSuccessfully",
99+
name: "NotRegisteredType_ShouldPanic",
100100
input: TestSchema{},
101-
wantErr: false,
102-
verify: func(t reflect.Type) error {
103-
// Verify successful type registration in runtime registry.
104-
if key, ok := runtime.TypeKey(t); ok {
105-
if _, registered := runtime.LookupType(key); !registered {
106-
return errors.New("Type was not properly registered")
107-
}
108-
}
109-
return nil
110-
},
101+
wantErr: true,
102+
prep: func(t reflect.Type) error { return nil },
103+
verify: func(t reflect.Type) error { return nil },
111104
},
112105
{
113106
name: "AlreadyRegisteredType_ShouldNotPanic",
114107
input: TestSchema{},
115108
wantErr: false,
109+
prep: func(t reflect.Type) error {
110+
beam.RegisterType(t)
111+
return nil
112+
},
116113
verify: func(t reflect.Type) error {
117-
// Verify re-registration of existing type is handled correctly.
118114
mustInferSchema(t)
119115
return nil
120116
},
@@ -123,6 +119,7 @@ func Test_mustInferSchema(t *testing.T) {
123119
name: "AnonymousStruct_ShouldPanic",
124120
input: struct{}{},
125121
wantErr: true,
122+
prep: func(t reflect.Type) error { return nil },
126123
verify: func(t reflect.Type) error { return nil },
127124
},
128125
}
@@ -137,6 +134,10 @@ func Test_mustInferSchema(t *testing.T) {
137134
}()
138135

139136
typ := reflect.TypeOf(tt.input)
137+
if err := tt.prep(typ); err != nil {
138+
t.Fatalf("failed to prep test environment, got err: %v", err)
139+
}
140+
140141
mustInferSchema(typ)
141142
if tt.wantErr {
142143
t.Fatal("Expected panic did not occur")

0 commit comments

Comments
 (0)