Skip to content

Commit 4158bac

Browse files
authored
feat(bigtable): Row key schema support in admin client (googleapis#11777)
* feat: Row key schema support in Bigtable admin client * Fix integration tests and nil checks in admin.
1 parent 0fc40bc commit 4158bac

File tree

5 files changed

+635
-3
lines changed

5 files changed

+635
-3
lines changed

bigtable/admin.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,8 @@ type TableConf struct {
326326
ChangeStreamRetention ChangeStreamRetention
327327
// Configure an automated backup policy for the table
328328
AutomatedBackupConfig TableAutomatedBackupConfig
329+
// Configure a row key schema for the table
330+
RowKeySchema *StructType
329331
}
330332

331333
// CreateTable creates a new table in the instance.
@@ -374,6 +376,11 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
374376
tbl.AutomatedBackupConfig = proto
375377
}
376378

379+
if conf.RowKeySchema != nil {
380+
proto := conf.RowKeySchema.proto()
381+
tbl.RowKeySchema = proto.GetStructType()
382+
}
383+
377384
if conf.Families != nil && conf.ColumnFamilies != nil {
378385
return errors.New("only one of Families or ColumnFamilies may be set, not both")
379386
}
@@ -458,6 +465,7 @@ const (
458465
automatedBackupPolicyFieldMask = "automated_backup_policy"
459466
retentionPeriodFieldMaskPath = "retention_period"
460467
frequencyFieldMaskPath = "frequency"
468+
rowKeySchemaMaskPath = "row_key_schema"
461469
)
462470

463471
func (ac *AdminClient) newUpdateTableRequestProto(tableID string) (*btapb.UpdateTableRequest, error) {
@@ -565,6 +573,28 @@ func (ac *AdminClient) UpdateTableWithAutomatedBackupPolicy(ctx context.Context,
565573
return ac.updateTableAndWait(ctx, req)
566574
}
567575

576+
// UpdateTableWithRowKeySchema updates a table with RowKeySchema.
577+
func (ac *AdminClient) UpdateTableWithRowKeySchema(ctx context.Context, tableID string, rowKeySchema StructType) error {
578+
req, err := ac.newUpdateTableRequestProto(tableID)
579+
if err != nil {
580+
return err
581+
}
582+
req.UpdateMask.Paths = append(req.UpdateMask.Paths, rowKeySchemaMaskPath)
583+
req.Table.RowKeySchema = rowKeySchema.proto().GetStructType()
584+
return ac.updateTableAndWait(ctx, req)
585+
}
586+
587+
// UpdateTableRemoveRowKeySchema removes a RowKeySchema from a table.
588+
func (ac *AdminClient) UpdateTableRemoveRowKeySchema(ctx context.Context, tableID string) error {
589+
req, err := ac.newUpdateTableRequestProto(tableID)
590+
if err != nil {
591+
return err
592+
}
593+
req.UpdateMask.Paths = append(req.UpdateMask.Paths, rowKeySchemaMaskPath)
594+
req.IgnoreWarnings = true
595+
return ac.updateTableAndWait(ctx, req)
596+
}
597+
568598
// DeleteTable deletes a table and all of its data.
569599
func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
570600
ctx = mergeOutgoingMetadata(ctx, ac.md)

bigtable/admin_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,32 @@ func TestTableAdmin_CreateTableFromConf_AutomatedBackupPolicy_Valid(t *testing.T
205205
}
206206
}
207207

208+
func TestTableAdmin_CreateTableFromConf_WithRowKeySchema(t *testing.T) {
209+
mock := &mockTableAdminClock{}
210+
c := setupTableClient(t, mock)
211+
212+
err := c.CreateTableFromConf(context.Background(), &TableConf{TableID: "my-table", RowKeySchema: &StructType{
213+
Fields: []StructField{
214+
{FieldName: "key1", FieldType: Int64Type{Encoding: Int64OrderedCodeBytesEncoding{}}},
215+
{FieldName: "key2", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
216+
},
217+
Encoding: StructDelimitedBytesEncoding{Delimiter: []byte{'#', '#'}},
218+
}})
219+
if err != nil {
220+
t.Fatalf("CreateTableFromConf failed: %v", err)
221+
}
222+
createTableReq := mock.createTableReq
223+
if !cmp.Equal(createTableReq.TableId, "my-table") {
224+
t.Errorf("Unexpected tableID: %v, want: %v", createTableReq.TableId, "my-table")
225+
}
226+
if createTableReq.Table.RowKeySchema == nil {
227+
t.Errorf("Unexpected nil RowKeySchema in request")
228+
}
229+
if len(createTableReq.Table.RowKeySchema.Fields) != 2 {
230+
t.Errorf("Unexpected field length in row key schema: %v, want: %v", createTableReq.Table.RowKeySchema, 2)
231+
}
232+
}
233+
208234
func TestTableAdmin_CreateBackupWithOptions_NoExpiryTime(t *testing.T) {
209235
mock := &mockTableAdminClock{}
210236
c := setupTableClient(t, mock)
@@ -375,6 +401,59 @@ func TestTableAdmin_UpdateTableDisableChangeStream(t *testing.T) {
375401
}
376402
}
377403

404+
func TestTableAdmin_UpdateTableWithRowKeySchema(t *testing.T) {
405+
mock := &mockTableAdminClock{}
406+
c := setupTableClient(t, mock)
407+
rks := StructType{
408+
Fields: []StructField{
409+
{FieldName: "key1", FieldType: Int64Type{Encoding: Int64OrderedCodeBytesEncoding{}}},
410+
{FieldName: "key2", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
411+
},
412+
Encoding: StructDelimitedBytesEncoding{Delimiter: []byte{'#', '#'}}}
413+
err := c.UpdateTableWithRowKeySchema(context.Background(), "my-table", rks)
414+
if err != nil {
415+
t.Fatalf("UpdateTableWithRowKeySchema error: %v", err)
416+
}
417+
req := mock.updateTableReq
418+
419+
expectedTableName := "projects/my-cool-project/instances/my-cool-instance/tables/my-table"
420+
if !cmp.Equal(req.Table.Name, expectedTableName) {
421+
t.Errorf("Unexpected table name: %v, want: %v", req.Table.Name, expectedTableName)
422+
}
423+
if !cmp.Equal(len(req.UpdateMask.Paths), 1) {
424+
t.Errorf("Unexpected mask size: %v, want: %v", len(req.UpdateMask.Paths), 1)
425+
}
426+
if !cmp.Equal(req.UpdateMask.Paths[0], "row_key_schema") {
427+
t.Errorf("Unexpected mask path: %v, want: %v", req.UpdateMask.Paths[0], "row_key_schema")
428+
}
429+
}
430+
431+
func TestTableAdmin_UpdateTableWithClearRowKeySchema(t *testing.T) {
432+
mock := &mockTableAdminClock{}
433+
c := setupTableClient(t, mock)
434+
err := c.UpdateTableRemoveRowKeySchema(context.Background(), "my-table")
435+
if err != nil {
436+
t.Fatalf("UpdateTableWithRowKeySchema error: %v", err)
437+
}
438+
req := mock.updateTableReq
439+
expectedTableName := "projects/my-cool-project/instances/my-cool-instance/tables/my-table"
440+
if !cmp.Equal(req.Table.Name, expectedTableName) {
441+
t.Errorf("Unexpected table name: %v, want: %v", req.Table.Name, expectedTableName)
442+
}
443+
if !cmp.Equal(len(req.UpdateMask.Paths), 1) {
444+
t.Errorf("Unexpected mask size: %v, want: %v", len(req.UpdateMask.Paths), 1)
445+
}
446+
if !cmp.Equal(req.UpdateMask.Paths[0], "row_key_schema") {
447+
t.Errorf("Unexpected mask path: %v, want: %v", req.UpdateMask.Paths[0], "row_key_schema")
448+
}
449+
if req.Table.RowKeySchema != nil {
450+
t.Errorf("Unexpected row key schema in table during clear schema request: %v", req.Table.RowKeySchema)
451+
}
452+
if !req.IgnoreWarnings {
453+
t.Errorf("Expect IgnoreWarnings set to true when clearing row key schema")
454+
}
455+
}
456+
378457
func TestTableAdmin_SetGcPolicy(t *testing.T) {
379458
for _, test := range []struct {
380459
desc string

bigtable/integration_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,6 +2064,217 @@ func TestIntegration_AutomatedBackups(t *testing.T) {
20642064
}
20652065
}
20662066

2067+
func TestIntegration_CreateTableWithRowKeySchema(t *testing.T) {
2068+
testEnv, err := NewIntegrationEnv()
2069+
if err != nil {
2070+
t.Fatalf("IntegrationEnv: %v", err)
2071+
}
2072+
defer testEnv.Close()
2073+
2074+
if !testEnv.Config().UseProd {
2075+
t.Skip("emulator doesn't support Automated Backups")
2076+
}
2077+
2078+
timeout := 5 * time.Minute
2079+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
2080+
defer cancel()
2081+
2082+
adminClient, err := testEnv.NewAdminClient()
2083+
if err != nil {
2084+
t.Fatalf("NewAdminClient: %v", err)
2085+
}
2086+
defer adminClient.Close()
2087+
2088+
myTableName := myTableNameSpace.New()
2089+
tableConf := TableConf{
2090+
TableID: myTableName,
2091+
Families: map[string]GCPolicy{
2092+
"fam1": MaxVersionsPolicy(1),
2093+
"fam2": MaxVersionsPolicy(2),
2094+
},
2095+
}
2096+
2097+
testCases := []struct {
2098+
desc string
2099+
rks StructType
2100+
errorExpected bool
2101+
}{
2102+
{
2103+
desc: "Create fail with conflict family and row key column",
2104+
rks: StructType{
2105+
Fields: []StructField{{FieldName: "fam1", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
2106+
Encoding: StructOrderedCodeBytesEncoding{},
2107+
},
2108+
errorExpected: true,
2109+
},
2110+
{
2111+
desc: "Create fail with missing encoding in struct type",
2112+
rks: StructType{
2113+
Fields: []StructField{{FieldName: "myfield", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
2114+
},
2115+
errorExpected: true,
2116+
},
2117+
{
2118+
desc: "Create fail on DelimitedBytes missing delimiter",
2119+
rks: StructType{
2120+
Fields: []StructField{{FieldName: "myfield", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}}},
2121+
Encoding: StructDelimitedBytesEncoding{},
2122+
},
2123+
errorExpected: true,
2124+
},
2125+
{
2126+
desc: "Create with Singleton failed with more than 1 field",
2127+
rks: StructType{
2128+
Fields: []StructField{
2129+
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
2130+
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
2131+
},
2132+
Encoding: StructSingletonEncoding{},
2133+
},
2134+
errorExpected: true,
2135+
},
2136+
{
2137+
desc: "Create with Singleton ok",
2138+
rks: StructType{
2139+
Fields: []StructField{
2140+
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
2141+
},
2142+
Encoding: StructSingletonEncoding{},
2143+
},
2144+
},
2145+
{
2146+
desc: "Create with OrderedCode ok",
2147+
rks: StructType{
2148+
Fields: []StructField{
2149+
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
2150+
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
2151+
},
2152+
Encoding: StructOrderedCodeBytesEncoding{},
2153+
},
2154+
},
2155+
{
2156+
desc: "Create with DelimitedBytes ok",
2157+
rks: StructType{
2158+
Fields: []StructField{
2159+
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
2160+
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
2161+
},
2162+
Encoding: StructDelimitedBytesEncoding{
2163+
Delimiter: []byte{'#'},
2164+
},
2165+
},
2166+
},
2167+
}
2168+
2169+
for _, tc := range testCases {
2170+
tableConf.RowKeySchema = &tc.rks
2171+
err := adminClient.CreateTableFromConf(ctx, &tableConf)
2172+
2173+
if tc.errorExpected && err == nil {
2174+
t.Errorf("Want error from test: '%v', got nil", tc.desc)
2175+
}
2176+
2177+
if !tc.errorExpected && err != nil {
2178+
t.Errorf("Unexpected error: %v", err)
2179+
}
2180+
2181+
// get the table and see the new schema is updated
2182+
tbl, err := adminClient.getTable(ctx, tableConf.TableID, btapb.Table_SCHEMA_VIEW)
2183+
if !tc.errorExpected && tbl.RowKeySchema == nil {
2184+
t.Errorf("Expecting row key schema %v to be updated in table, got: %v", tc.rks, tbl)
2185+
}
2186+
2187+
if tbl != nil {
2188+
// clean up table
2189+
err = adminClient.DeleteTable(ctx, tableConf.TableID)
2190+
if err != nil {
2191+
t.Fatalf("Unexpected error trying to clean up table: %v", err)
2192+
}
2193+
}
2194+
}
2195+
}
2196+
2197+
func TestIntegration_UpdateRowKeySchemaInTable(t *testing.T) {
2198+
testEnv, err := NewIntegrationEnv()
2199+
if err != nil {
2200+
t.Fatalf("IntegrationEnv: %v", err)
2201+
}
2202+
defer testEnv.Close()
2203+
2204+
if !testEnv.Config().UseProd {
2205+
t.Skip("emulator doesn't support Automated Backups")
2206+
}
2207+
2208+
timeout := 5 * time.Minute
2209+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
2210+
defer cancel()
2211+
2212+
adminClient, err := testEnv.NewAdminClient()
2213+
if err != nil {
2214+
t.Fatalf("NewAdminClient: %v", err)
2215+
}
2216+
defer adminClient.Close()
2217+
2218+
myTableName := myTableNameSpace.New()
2219+
tableConf := TableConf{
2220+
TableID: myTableName,
2221+
Families: map[string]GCPolicy{
2222+
"fam1": MaxVersionsPolicy(1),
2223+
},
2224+
}
2225+
2226+
if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
2227+
t.Fatalf("Unexpected error trying to create table: %v", err)
2228+
}
2229+
2230+
testCases := []struct {
2231+
desc string
2232+
rks StructType
2233+
errorExpected bool
2234+
}{
2235+
{desc: "Update fail with conflicting family name",
2236+
rks: StructType{
2237+
Fields: []StructField{{FieldName: "fam1", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
2238+
Encoding: StructSingletonEncoding{},
2239+
},
2240+
errorExpected: true,
2241+
},
2242+
{
2243+
desc: "Update ok",
2244+
rks: StructType{
2245+
Fields: []StructField{
2246+
{FieldName: "myfield", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
2247+
{FieldName: "myfield2", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}}},
2248+
Encoding: StructDelimitedBytesEncoding{
2249+
Delimiter: []byte{'#'},
2250+
},
2251+
},
2252+
},
2253+
}
2254+
2255+
for _, tc := range testCases {
2256+
err := adminClient.UpdateTableWithRowKeySchema(ctx, tableConf.TableID, tc.rks)
2257+
if tc.errorExpected && err == nil {
2258+
t.Errorf("Expecting error from test '%v', got nil", tc.desc)
2259+
}
2260+
2261+
if !tc.errorExpected && err != nil {
2262+
t.Errorf("Unexpected error from test '%v': %v", tc.desc, err)
2263+
}
2264+
2265+
// Get the table to check if the schema is updated
2266+
tbl, err := adminClient.getTable(ctx, tableConf.TableID, btapb.Table_SCHEMA_VIEW)
2267+
if !tc.errorExpected && tbl.RowKeySchema == nil {
2268+
t.Errorf("Expecting row key schema %v to be updated in table, got: %v", tc.rks, tbl)
2269+
}
2270+
2271+
// Clear schema ok
2272+
if err = adminClient.UpdateTableRemoveRowKeySchema(ctx, tableConf.TableID); err != nil {
2273+
t.Errorf("Unexpected error trying to clear row key schema: %v", err)
2274+
}
2275+
}
2276+
}
2277+
20672278
func TestIntegration_Admin(t *testing.T) {
20682279
testEnv, err := NewIntegrationEnv()
20692280
if err != nil {

0 commit comments

Comments
 (0)