Skip to content

Commit 1a9bac7

Browse files
authored
add support for CDC config udpates (#469)
1 parent 396ff84 commit 1a9bac7

File tree

3 files changed

+117
-3
lines changed

3 files changed

+117
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.23.0
55
toolchain go1.24.2
66

77
require (
8-
github.com/datastax/astra-client-go/v2 v2.2.62
8+
github.com/datastax/astra-client-go/v2 v2.2.63
99
github.com/datastax/pulsar-admin-client-go v0.0.2
1010
github.com/google/uuid v1.6.0
1111
github.com/hashicorp/go-cty v1.5.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ github.com/cyphar/filepath-securejoin v0.4.1 h1:JyxxyPEaktOD+GAnqIqTf9A8tHyAG22r
107107
github.com/cyphar/filepath-securejoin v0.4.1/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI=
108108
github.com/datastax/astra-client-go/v2 v2.2.62 h1:HpOxYgZ7ng9ASz0R0MYh1du2VkWt6R5GM0xof83j4p0=
109109
github.com/datastax/astra-client-go/v2 v2.2.62/go.mod h1:piBMy3fnDcLN8+MDQfvjZr8ejcP2oh+CXkQYwNy8blA=
110+
github.com/datastax/astra-client-go/v2 v2.2.63-0.20250728163038-0284f8fe8db9 h1:3jWqDKmNunRvBSUeEYpBanxtkoArKtqUzDjiwauJOZQ=
111+
github.com/datastax/astra-client-go/v2 v2.2.63-0.20250728163038-0284f8fe8db9/go.mod h1:piBMy3fnDcLN8+MDQfvjZr8ejcP2oh+CXkQYwNy8blA=
112+
github.com/datastax/astra-client-go/v2 v2.2.63 h1:kIPQRUOGVGkwG76cwjiKI1MmoLJv6wTUA08URMJg6CU=
113+
github.com/datastax/astra-client-go/v2 v2.2.63/go.mod h1:piBMy3fnDcLN8+MDQfvjZr8ejcP2oh+CXkQYwNy8blA=
110114
github.com/datastax/pulsar-admin-client-go v0.0.2 h1:CValQbSLI6q1PuCzkM4Tr3tAgwSi6RYDJ7PxvcBLyhw=
111115
github.com/datastax/pulsar-admin-client-go v0.0.2/go.mod h1:GOBpX6jwznrSlECwGeOZE6sNFJJ+FtYkwlv8PgFCZng=
112116
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

internal/provider/resource_cdc_v3.go

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"time"
78

89
"github.com/datastax/astra-client-go/v2/astra"
910
"github.com/hashicorp/terraform-plugin-framework/attr"
@@ -13,6 +14,7 @@ import (
1314
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
1415
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
1516
"github.com/hashicorp/terraform-plugin-framework/types"
17+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
1618
)
1719

1820
type CDCResource struct {
@@ -155,6 +157,14 @@ func (r *CDCResource) Create(ctx context.Context, req resource.CreateRequest, re
155157
return
156158
}
157159

160+
// wait for the database to be active before after CDC
161+
if err := waitForDatabaseActive(ctx, astraClient, plan.DatabaseID.ValueString()); err != nil {
162+
resp.Diagnostics.AddError(
163+
fmt.Sprintf("failed to wait for database '%s' to be active", plan.DatabaseID.ValueString()),
164+
err.Error())
165+
return
166+
}
167+
158168
plan.setDataTopics()
159169

160170
diags = resp.State.Set(ctx, &plan)
@@ -192,8 +202,49 @@ func (r *CDCResource) Read(ctx context.Context, req resource.ReadRequest, resp *
192202
}
193203

194204
func (r *CDCResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
195-
// CDC resources are typically immutable, so this might not be implemented.
196-
resp.Diagnostics.AddError("Not Implemented", "Update is not supported for CDC resources.")
205+
var plan CDCResourceModel
206+
diags := req.Plan.Get(ctx, &plan)
207+
resp.Diagnostics.Append(diags...)
208+
if resp.Diagnostics.HasError() {
209+
return
210+
}
211+
212+
astraClient := r.clients.astraClient
213+
214+
// wait for the database to be active before updating CDC
215+
if err := waitForDatabaseActive(ctx, astraClient, plan.DatabaseID.ValueString()); err != nil {
216+
resp.Diagnostics.AddError(
217+
fmt.Sprintf("failed to wait for database '%s' to be active", plan.DatabaseID.ValueString()),
218+
err.Error())
219+
return
220+
}
221+
222+
cdcRequestBody := createUpdateCDCRequestBody(&plan)
223+
cdcResponse, err := astraClient.UpdateCDCWithResponse(ctx, cdcRequestBody.DatabaseID, cdcRequestBody)
224+
if err != nil {
225+
resp.Diagnostics.AddError(
226+
"failed to enable CDC",
227+
err.Error())
228+
return
229+
} else if cdcResponse.StatusCode() != http.StatusNoContent {
230+
errString := fmt.Sprintf("failed to update CDC for DB '%s' with status code '%v', message: '%s'",
231+
plan.DatabaseID.ValueString(), cdcResponse.StatusCode(), string(cdcResponse.Body))
232+
resp.Diagnostics.AddError("failed to update CDC", errString)
233+
return
234+
}
235+
236+
// wait for the database to be active before after CDC
237+
if err := waitForDatabaseActive(ctx, astraClient, plan.DatabaseID.ValueString()); err != nil {
238+
resp.Diagnostics.AddError(
239+
fmt.Sprintf("failed to wait for database '%s' to be active", plan.DatabaseID.ValueString()),
240+
err.Error())
241+
return
242+
}
243+
244+
plan.setDataTopics()
245+
246+
diags = resp.State.Set(ctx, &plan)
247+
resp.Diagnostics.Append(diags...)
197248
}
198249

199250
func (r *CDCResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
@@ -279,6 +330,30 @@ func createEnableCDCRequestBody(tfData *CDCResourceModel) astra.EnableCDCJSONReq
279330
return reqData
280331
}
281332

333+
func createUpdateCDCRequestBody(tfData *CDCResourceModel) astra.UpdateCDCJSONRequestBody {
334+
reqData := astra.UpdateCDCJSONRequestBody{
335+
DatabaseID: tfData.DatabaseID.ValueString(),
336+
DatabaseName: tfData.DatabaseName.ValueString(),
337+
}
338+
for _, table := range tfData.Tables {
339+
nextTable := TableJSON{
340+
KeyspaceName: table.Keyspace.ValueString(),
341+
TableName: table.Table.ValueString(),
342+
}
343+
reqData.Tables = append(reqData.Tables, nextTable)
344+
}
345+
for _, region := range tfData.Regions {
346+
nextRegion := RegionJSON{
347+
DatacenterRegion: region.Region.ValueString(),
348+
DatacenterID: region.DatacenterID.ValueString(),
349+
StreamingClusterName: region.StreamingCluster.ValueString(),
350+
StreamingTenantName: region.StreamingTenant.ValueString(),
351+
}
352+
reqData.Regions = append(reqData.Regions, nextRegion)
353+
}
354+
return reqData
355+
}
356+
282357
// copyResponseDataToResourceState copies the data from the REST endpoing response to the Terraform resource state model.
283358
func copyResponseDataToResourceState(tfData *CDCResourceModel, respData *astra.ListCDCResponse) {
284359
tfData.DatabaseID = types.StringValue(respData.DatabaseID)
@@ -344,3 +419,38 @@ func (m *CDCResourceModel) setDataTopics() {
344419

345420
m.DataTopics = types.MapValueMust(types.MapType{ElemType: types.StringType}, dataTopicsMap)
346421
}
422+
423+
var (
424+
cdcUpdateTimeout = time.Duration(2 * time.Minute)
425+
)
426+
427+
// waitForDatabaseActive waits for the database to reach the ACTIVE state. Will return an error if the database reaches a terminal state (ERROR, TERMINATED, or TERMINATING),
428+
// or if the request returns an unexpected HTTP status code, or if the request times out.
429+
func waitForDatabaseActive(ctx context.Context, client *astra.ClientWithResponses, databaseID string) error {
430+
return retry.RetryContext(ctx, cdcUpdateTimeout, func() *retry.RetryError {
431+
res, err := client.GetDatabaseWithResponse(ctx, astra.DatabaseIdParam(databaseID))
432+
// Errors sending request should be retried and are assumed to be transient
433+
if err != nil || res.StatusCode() >= http.StatusInternalServerError {
434+
return retry.RetryableError(fmt.Errorf("error getting database status: %s", string(res.Body)))
435+
}
436+
437+
// don't retry on unexpected HTTP errors
438+
if res.StatusCode() == http.StatusUnauthorized {
439+
return retry.NonRetryableError(fmt.Errorf("user not authorized. Effective role must have 'View DB' permission on the database (or on all DBs in the current org)"))
440+
} else if res.StatusCode() > http.StatusOK || res.JSON200 == nil {
441+
return retry.NonRetryableError(fmt.Errorf("unexpected response fetching database, status code: %d, message %s", res.StatusCode(), string(res.Body)))
442+
}
443+
444+
// Success fetching database
445+
dbStatus := res.JSON200.Status
446+
switch dbStatus {
447+
case astra.ERROR, astra.TERMINATED, astra.TERMINATING:
448+
// If the database reached a terminal state it will never become active
449+
return retry.NonRetryableError(fmt.Errorf("database failed to reach active status: status='%s'", dbStatus))
450+
case astra.ACTIVE:
451+
return nil
452+
default:
453+
return retry.RetryableError(fmt.Errorf("waiting database to be active but is '%s'", dbStatus))
454+
}
455+
})
456+
}

0 commit comments

Comments
 (0)