Skip to content

Commit 8f78102

Browse files
authored
astra_cdc: handle changing order of regions/tables (#461)
Change the CDC regions and tables from list to set to avoid issues with order returned from rest API. Change the data topic output to a map of maps for easier access to specific topics.
1 parent d12cae0 commit 8f78102

File tree

3 files changed

+65
-43
lines changed

3 files changed

+65
-43
lines changed

docs/resources/cdc_v3.md

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ resource "astra_streaming_tenant" "streaming_tenant2" {
3939
# Create a new database
4040
resource "astra_database" "db_database" {
4141
name = "mydb"
42-
keyspace = "click_data" # 48 characters max
42+
keyspace = "ks1" # 48 characters max
4343
cloud_provider = "gcp" # this must match the cloud_provider of the streaming tenants
4444
regions = ["us-central1", "us-east1"] # this must match the regions of the streaming tenants
4545
deletion_protection = false
@@ -50,7 +50,7 @@ resource "astra_table" "db_table1" {
5050
keyspace = astra_database.db_database.keyspace
5151
database_id = astra_database.db_database.id
5252
region = astra_database.db_database.regions[0]
53-
table = "all_product_clicks"
53+
table = "table1"
5454
clustering_columns = "click_timestamp"
5555
partition_keys = "visitor_id:click_url"
5656
column_definitions = [
@@ -84,19 +84,23 @@ resource "astra_cdc_v3" "db_cdc" {
8484
]
8585
regions = [
8686
{
87-
region = "us-central-1"
87+
region = "us-central1"
8888
datacenter_id = "${astra_database.db_database.id}-1"
8989
streaming_cluster = astra_streaming_tenant.streaming_tenant1.cluster_name
9090
streaming_tenant = astra_streaming_tenant.streaming_tenant1.tenant_name
9191
},
9292
{
93-
region = "us-east-1"
93+
region = "us-east1"
9494
datacenter_id = "${astra_database.db_database.id}-2"
9595
streaming_cluster = astra_streaming_tenant.streaming_tenant2.cluster_name
9696
streaming_tenant = astra_streaming_tenant.streaming_tenant2.tenant_name
9797
},
9898
]
9999
}
100+
101+
output "streaming_data_topic1" {
102+
value = astra_cdc_v3.example1.data_topics["us-east1"]["ks1.table1"]
103+
}
100104
```
101105

102106
<!-- schema generated by tfplugindocs -->
@@ -106,8 +110,12 @@ resource "astra_cdc_v3" "db_cdc" {
106110

107111
- `database_id` (String) Astra database to create the keyspace.
108112
- `database_name` (String) Astra database name.
109-
- `regions` (Attributes List) Mapping between datacenter regions and streaming tenants. (see [below for nested schema](#nestedatt--regions))
110-
- `tables` (Attributes List) List of tables to enable CDC. Must include at least 1. (see [below for nested schema](#nestedatt--tables))
113+
- `regions` (Attributes Set) Mapping between datacenter regions and streaming tenants. (see [below for nested schema](#nestedatt--regions))
114+
- `tables` (Attributes Set) List of tables to enable CDC. Must include at least 1. (see [below for nested schema](#nestedatt--tables))
115+
116+
### Read-Only
117+
118+
- `data_topics` (Map of Map of String) Map of CDC data topics for each table in each region. Key is the region in the format `<region>`,
111119

112120
<a id="nestedatt--regions"></a>
113121
### Nested Schema for `regions`
@@ -128,10 +136,6 @@ Required:
128136
- `keyspace` (String)
129137
- `table` (String)
130138

131-
Read-Only:
132-
133-
- `data_topics` (List of String) List of Pulsar topics to which CDC data is published. One data topic per region, in the same order of regions.
134-
135139
## Import
136140

137141
Import is supported using the following syntax:

examples/resources/astra_cdc_v3/resource.tf

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ resource "astra_streaming_tenant" "streaming_tenant2" {
2424
# Create a new database
2525
resource "astra_database" "db_database" {
2626
name = "mydb"
27-
keyspace = "click_data" # 48 characters max
27+
keyspace = "ks1" # 48 characters max
2828
cloud_provider = "gcp" # this must match the cloud_provider of the streaming tenants
2929
regions = ["us-central1", "us-east1"] # this must match the regions of the streaming tenants
3030
deletion_protection = false
@@ -35,7 +35,7 @@ resource "astra_table" "db_table1" {
3535
keyspace = astra_database.db_database.keyspace
3636
database_id = astra_database.db_database.id
3737
region = astra_database.db_database.regions[0]
38-
table = "all_product_clicks"
38+
table = "table1"
3939
clustering_columns = "click_timestamp"
4040
partition_keys = "visitor_id:click_url"
4141
column_definitions = [
@@ -69,16 +69,20 @@ resource "astra_cdc_v3" "db_cdc" {
6969
]
7070
regions = [
7171
{
72-
region = "us-central-1"
72+
region = "us-central1"
7373
datacenter_id = "${astra_database.db_database.id}-1"
7474
streaming_cluster = astra_streaming_tenant.streaming_tenant1.cluster_name
7575
streaming_tenant = astra_streaming_tenant.streaming_tenant1.tenant_name
7676
},
7777
{
78-
region = "us-east-1"
78+
region = "us-east1"
7979
datacenter_id = "${astra_database.db_database.id}-2"
8080
streaming_cluster = astra_streaming_tenant.streaming_tenant2.cluster_name
8181
streaming_tenant = astra_streaming_tenant.streaming_tenant2.tenant_name
8282
},
8383
]
8484
}
85+
86+
output "streaming_data_topic1" {
87+
value = astra_cdc_v3.example1.data_topics["us-east1"]["ks1.table1"]
88+
}

internal/provider/resource_cdc_v3.go

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@ func (r *CDCResource) Configure(_ context.Context, req resource.ConfigureRequest
3939
r.clients = req.ProviderData.(*astraClients2)
4040
}
4141

42-
// CDCResourceModel represents the resource data model
42+
// CDCResourceModel represents data used to configure CDC
4343
type CDCResourceModel struct {
4444
DatabaseID types.String `tfsdk:"database_id"`
4545
DatabaseName types.String `tfsdk:"database_name"`
4646
Tables []KeyspaceTable `tfsdk:"tables"`
4747
Regions []DatacenterToStreamingMap `tfsdk:"regions"`
48+
DataTopics types.Map `tfsdk:"data_topics"`
4849
}
4950

5051
type KeyspaceTable struct {
51-
Keyspace types.String `tfsdk:"keyspace"`
52-
Table types.String `tfsdk:"table"`
53-
DataTopics types.List `tfsdk:"data_topics"`
52+
Keyspace types.String `tfsdk:"keyspace"`
53+
Table types.String `tfsdk:"table"`
5454
}
5555

5656
type DatacenterToStreamingMap struct {
@@ -60,20 +60,6 @@ type DatacenterToStreamingMap struct {
6060
StreamingTenant types.String `tfsdk:"streaming_tenant"`
6161
}
6262

63-
// setDataTopics updates the data topics field for each table based on caculateCDCDataTopicName.
64-
func (m *CDCResourceModel) setDataTopics() {
65-
66-
for i := range m.Tables {
67-
dataTopics := []attr.Value{}
68-
69-
for _, region := range m.Regions {
70-
topicFQDN := calculateCDCDataTopicName(region.StreamingTenant.ValueString(), m.DatabaseID.ValueString(), m.Tables[i].Keyspace.ValueString(), m.Tables[i].Table.ValueString())
71-
dataTopics = append(dataTopics, types.StringValue(topicFQDN))
72-
}
73-
m.Tables[i].DataTopics, _ = types.ListValue(types.StringType, dataTopics)
74-
}
75-
}
76-
7763
func (r *CDCResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) {
7864
resp.TypeName = "astra_cdc_v3"
7965
}
@@ -93,7 +79,7 @@ func (r *CDCResource) Schema(_ context.Context, req resource.SchemaRequest, resp
9379
Description: "Astra database name.",
9480
Required: true,
9581
},
96-
"tables": schema.ListNestedAttribute{
82+
"tables": schema.SetNestedAttribute{
9783
Description: "List of tables to enable CDC. Must include at least 1.",
9884
Required: true,
9985
NestedObject: schema.NestedAttributeObject{
@@ -104,17 +90,11 @@ func (r *CDCResource) Schema(_ context.Context, req resource.SchemaRequest, resp
10490
"table": schema.StringAttribute{
10591
Required: true,
10692
},
107-
"data_topics": schema.ListAttribute{
108-
Description: "List of Pulsar topics to which CDC data is published. " +
109-
"One data topic per region, in the same order of regions.",
110-
Computed: true,
111-
ElementType: types.StringType,
112-
},
11393
},
11494
},
11595
},
11696

117-
"regions": schema.ListNestedAttribute{
97+
"regions": schema.SetNestedAttribute{
11898
Description: "Mapping between datacenter regions and streaming tenants.",
11999
Required: true,
120100
NestedObject: schema.NestedAttributeObject{
@@ -138,6 +118,14 @@ func (r *CDCResource) Schema(_ context.Context, req resource.SchemaRequest, resp
138118
},
139119
},
140120
},
121+
"data_topics": schema.MapAttribute{
122+
Description: "Map of CDC data topics for each table in each region. " +
123+
"Key is the region in the format `<region>`, ",
124+
Computed: true,
125+
ElementType: types.MapType{
126+
ElemType: types.StringType,
127+
},
128+
},
141129
},
142130
}
143131
}
@@ -167,6 +155,7 @@ func (r *CDCResource) Create(ctx context.Context, req resource.CreateRequest, re
167155
}
168156

169157
plan.setDataTopics()
158+
170159
diags = resp.State.Set(ctx, &plan)
171160
resp.Diagnostics.Append(diags...)
172161
}
@@ -194,7 +183,9 @@ func (r *CDCResource) Read(ctx context.Context, req resource.ReadRequest, resp *
194183
return
195184
}
196185

197-
updateStateForCDCReadRequest(&state, cdcResponse.JSON200)
186+
copyResponseDataToResourceState(&state, cdcResponse.JSON200)
187+
state.setDataTopics()
188+
198189
diags = resp.State.Set(ctx, &state)
199190
resp.Diagnostics.Append(diags...)
200191
}
@@ -287,7 +278,8 @@ func createEnableCDCRequestBody(tfData *CDCResourceModel) astra.EnableCDCJSONReq
287278
return reqData
288279
}
289280

290-
func updateStateForCDCReadRequest(tfData *CDCResourceModel, respData *astra.ListCDCResponse) {
281+
// copyResponseDataToResourceState copies the data from the REST endpoing response to the Terraform resource state model.
282+
func copyResponseDataToResourceState(tfData *CDCResourceModel, respData *astra.ListCDCResponse) {
291283
tfData.DatabaseID = types.StringValue(respData.DatabaseID)
292284
tfData.DatabaseName = types.StringValue(respData.DatabaseName)
293285
var tables []KeyspaceTable
@@ -309,7 +301,6 @@ func updateStateForCDCReadRequest(tfData *CDCResourceModel, respData *astra.List
309301
})
310302
}
311303
tfData.Regions = regions
312-
tfData.setDataTopics()
313304
}
314305

315306
func createDeleteCDCRequestBody(tfData *CDCResourceModel) astra.DeleteCDCJSONRequestBody {
@@ -333,3 +324,26 @@ const AstraCDCPulsarNamespace = "astracdc"
333324
func calculateCDCDataTopicName(streamingTenant, databaseID, keyspace, tableName string) string {
334325
return fmt.Sprintf("persistent://%s/%s/data-%s-%s.%s", streamingTenant, AstraCDCPulsarNamespace, databaseID, keyspace, tableName)
335326
}
327+
328+
// getDataTopicsList uses the region and table config to create the two dimensional (region and table) map of data topics.
329+
func (m *CDCResourceModel) setDataTopics() {
330+
331+
dataTopicsMap := map[string]attr.Value{}
332+
333+
for _, region := range m.Regions {
334+
regionName := region.Region.ValueString()
335+
regionDataTopics := make(map[string]attr.Value)
336+
337+
for _, table := range m.Tables {
338+
keyspaceTable := fmt.Sprintf("%s.%s", table.Keyspace.ValueString(), table.Table.ValueString())
339+
topicFQDN := calculateCDCDataTopicName(region.StreamingTenant.ValueString(), m.DatabaseID.ValueString(), table.Keyspace.ValueString(), table.Table.ValueString())
340+
regionDataTopics[keyspaceTable] = types.StringValue(topicFQDN)
341+
}
342+
dataTopicsMap[regionName] = types.MapValueMust(
343+
types.StringType,
344+
regionDataTopics,
345+
)
346+
}
347+
348+
m.DataTopics = types.MapValueMust(types.MapType{ElemType: types.StringType}, dataTopicsMap)
349+
}

0 commit comments

Comments
 (0)