Skip to content

Commit a38cdd1

Browse files
Support replica materialized view in resource_bigquery_table in the beta provider (#9773) (#6865)
* support replica materialized view in resource_bigquery_table * fix typo * add unit tests * fix white spaces * fix the condition check for table_replication_info in Update * create source materialized view in unit tests using DDL instead of table API * add missing google-beta provider declarations in unit test * create source table using DDL in unit test * Revert "create source table using DDL in unit test" This reverts commit 232a7ac431114cd405f0cec858bb0496d48fd07c. * reorder input args in unit tests * remove wrong project declaration in unit tests * remove update check, make the new fields forcenew, and add default to replication_interval_ms * add ForceNew to table_replication_info [upstream:256f7f56789ee965c2e7582735140de383270400] Signed-off-by: Modular Magician <[email protected]>
1 parent e11d829 commit a38cdd1

File tree

3 files changed

+426
-48
lines changed

3 files changed

+426
-48
lines changed

.changelog/9773.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
bigquery: support replica materialized view in resource_bigquery_table (beta)
3+
```

google-beta/services/bigquery/resource_bigquery_table.go

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func ResourceBigQueryTable() *schema.Resource {
530530
Default: "NONE",
531531
Description: `The compression type of the data source. Valid values are "NONE" or "GZIP".`,
532532
},
533-
// Schema: Optional] The schema for the data.
533+
// Schema: [Optional] The schema for the data.
534534
// Schema is required for CSV and JSON formats if autodetect is not on.
535535
// Schema is disallowed for Google Cloud Bigtable, Cloud Datastore backups, Avro, Iceberg, ORC, and Parquet formats.
536536
"schema": {
@@ -889,7 +889,7 @@ func ResourceBigQueryTable() *schema.Resource {
889889
Type: schema.TypeInt,
890890
Default: 1800000,
891891
Optional: true,
892-
Description: `Specifies maximum frequency at which this materialized view will be refreshed. The default is 1800000`,
892+
Description: `Specifies maximum frequency at which this materialized view will be refreshed. The default is 1800000.`,
893893
},
894894

895895
"allow_non_incremental_definition": {
@@ -1252,6 +1252,43 @@ func ResourceBigQueryTable() *schema.Resource {
12521252
},
12531253
},
12541254
},
1255+
// TableReplicationInfo: [Optional] Replication info of a table created using `AS REPLICA` DDL like: `CREATE MATERIALIZED VIEW mv1 AS REPLICA OF src_mv`.
1256+
"table_replication_info": {
1257+
Type: schema.TypeList,
1258+
Optional: true,
1259+
ForceNew: true,
1260+
MaxItems: 1,
1261+
Description: `Replication info of a table created using "AS REPLICA" DDL like: "CREATE MATERIALIZED VIEW mv1 AS REPLICA OF src_mv".`,
1262+
Elem: &schema.Resource{
1263+
Schema: map[string]*schema.Schema{
1264+
"source_project_id": {
1265+
Type: schema.TypeString,
1266+
Required: true,
1267+
ForceNew: true,
1268+
Description: `The ID of the source project.`,
1269+
},
1270+
"source_dataset_id": {
1271+
Type: schema.TypeString,
1272+
Required: true,
1273+
ForceNew: true,
1274+
Description: `The ID of the source dataset.`,
1275+
},
1276+
"source_table_id": {
1277+
Type: schema.TypeString,
1278+
Required: true,
1279+
ForceNew: true,
1280+
Description: `The ID of the source materialized view.`,
1281+
},
1282+
"replication_interval_ms": {
1283+
Type: schema.TypeInt,
1284+
Default: 300000,
1285+
Optional: true,
1286+
ForceNew: true,
1287+
Description: `The interval at which the source materialized view is polled for updates. The default is 300000.`,
1288+
},
1289+
},
1290+
},
1291+
},
12551292
},
12561293
UseJSONNumber: true,
12571294
}
@@ -1388,9 +1425,49 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error
13881425

13891426
datasetID := d.Get("dataset_id").(string)
13901427

1428+
if v, ok := d.GetOk("table_replication_info"); ok {
1429+
if table.Schema != nil || table.View != nil || table.MaterializedView != nil {
1430+
return errors.New("Schema, view, or materialized view cannot be specified when table replication info is present")
1431+
}
1432+
1433+
replicationDDL := fmt.Sprintf("CREATE MATERIALIZED VIEW %s.%s.%s", d.Get("project").(string), d.Get("dataset_id").(string), d.Get("table_id").(string))
1434+
1435+
tableReplicationInfo := expandTableReplicationInfo(v)
1436+
replicationIntervalMs := tableReplicationInfo["replication_interval_ms"].(int64)
1437+
if replicationIntervalMs > 0 {
1438+
replicationIntervalSeconds := replicationIntervalMs / 1000
1439+
replicationDDL = fmt.Sprintf("%s OPTIONS(replication_interval_seconds=%d)", replicationDDL, replicationIntervalSeconds)
1440+
}
1441+
1442+
replicationDDL = fmt.Sprintf("%s AS REPLICA OF %s.%s.%s", replicationDDL, tableReplicationInfo["source_project_id"], tableReplicationInfo["source_dataset_id"], tableReplicationInfo["source_table_id"])
1443+
useLegacySQL := false
1444+
1445+
req := &bigquery.QueryRequest{
1446+
Query: replicationDDL,
1447+
UseLegacySql: &useLegacySQL,
1448+
}
1449+
1450+
log.Printf("[INFO] Creating a replica materialized view with DDL: '%s'", replicationDDL)
1451+
1452+
_, err := config.NewBigQueryClient(userAgent).Jobs.Query(project, req).Do()
1453+
1454+
id := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, datasetID, d.Get("table_id").(string))
1455+
if err != nil {
1456+
if deleteErr := resourceBigQueryTableDelete(d, meta); deleteErr != nil {
1457+
log.Printf("[INFO] Unable to clean up table %s: %s", id, deleteErr)
1458+
}
1459+
return err
1460+
}
1461+
1462+
log.Printf("[INFO] BigQuery table %s has been created", id)
1463+
d.SetId(id)
1464+
1465+
return resourceBigQueryTableRead(d, meta)
1466+
}
1467+
13911468
if table.View != nil && table.Schema != nil {
13921469

1393-
log.Printf("[INFO] Removing schema from table definition because big query does not support setting schema on view creation")
1470+
log.Printf("[INFO] Removing schema from table definition because BigQuery does not support setting schema on view creation")
13941471
schemaBack := table.Schema
13951472
table.Schema = nil
13961473

@@ -1410,7 +1487,7 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error
14101487
return err
14111488
}
14121489

1413-
log.Printf("[INFO] BigQuery table %s has been update with schema", res.Id)
1490+
log.Printf("[INFO] BigQuery table %s has been updated with schema", res.Id)
14141491
} else {
14151492
log.Printf("[INFO] Creating BigQuery table: %s", table.TableReference.TableId)
14161493

@@ -1599,6 +1676,32 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
15991676
}
16001677
}
16011678

1679+
// TODO: Update when the Get API fields for TableReplicationInfo are available in the client library.
1680+
url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/tables/{{table_id}}")
1681+
if err != nil {
1682+
return err
1683+
}
1684+
1685+
log.Printf("[INFO] Reading BigQuery table through API: %s", url)
1686+
1687+
getRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
1688+
Config: config,
1689+
Method: "GET",
1690+
RawURL: url,
1691+
UserAgent: userAgent,
1692+
})
1693+
if err != nil {
1694+
return err
1695+
}
1696+
1697+
if v, ok := getRes["tableReplicationInfo"]; ok {
1698+
tableReplicationInfo := flattenTableReplicationInfo(v.(map[string]interface{}))
1699+
1700+
if err := d.Set("table_replication_info", tableReplicationInfo); err != nil {
1701+
return fmt.Errorf("Error setting table replication info: %s", err)
1702+
}
1703+
}
1704+
16021705
return nil
16031706
}
16041707

@@ -2389,6 +2492,56 @@ func flattenTableConstraints(edc *bigquery.TableConstraints) []map[string]interf
23892492
return []map[string]interface{}{result}
23902493
}
23912494

2495+
func expandTableReplicationInfo(cfg interface{}) map[string]interface{} {
2496+
raw := cfg.([]interface{})[0].(map[string]interface{})
2497+
2498+
result := map[string]interface{}{}
2499+
2500+
if v, ok := raw["source_project_id"]; ok {
2501+
result["source_project_id"] = v.(string)
2502+
}
2503+
2504+
if v, ok := raw["source_dataset_id"]; ok {
2505+
result["source_dataset_id"] = v.(string)
2506+
}
2507+
2508+
if v, ok := raw["source_table_id"]; ok {
2509+
result["source_table_id"] = v.(string)
2510+
}
2511+
2512+
if v, ok := raw["replication_interval_ms"]; ok {
2513+
result["replication_interval_ms"] = int64(v.(int))
2514+
}
2515+
2516+
return result
2517+
}
2518+
2519+
func flattenTableReplicationInfo(tableReplicationInfo map[string]interface{}) []map[string]interface{} {
2520+
result := map[string]interface{}{}
2521+
2522+
if v, ok := tableReplicationInfo["sourceTable"]; ok {
2523+
sourceTable := v.(map[string]interface{})
2524+
if v, ok := sourceTable["projectId"]; ok {
2525+
result["source_project_id"] = v.(string)
2526+
}
2527+
if v, ok := sourceTable["datasetId"]; ok {
2528+
result["source_dataset_id"] = v.(string)
2529+
}
2530+
if v, ok := sourceTable["tableId"]; ok {
2531+
result["source_table_id"] = v.(string)
2532+
}
2533+
}
2534+
2535+
if v, ok := tableReplicationInfo["replicationIntervalMs"]; ok {
2536+
replicationIntervalMs := v.(string)
2537+
if i, err := strconv.Atoi(replicationIntervalMs); err == nil {
2538+
result["replication_interval_ms"] = int64(i)
2539+
}
2540+
}
2541+
2542+
return []map[string]interface{}{result}
2543+
}
2544+
23922545
func resourceBigQueryTableImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
23932546
config := meta.(*transport_tpg.Config)
23942547
if err := tpgresource.ParseImportId([]string{

0 commit comments

Comments
 (0)