Skip to content

Commit 189bcdb

Browse files
authored
improvements to streaming sink archive (#467)
* improve docs and error handling * fix deletion protection * improve archive config
1 parent c0f787a commit 189bcdb

File tree

3 files changed

+36
-23
lines changed

3 files changed

+36
-23
lines changed

docs/resources/streaming_sink.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ resource "astra_streaming_topic" "streaming_topic" {
4646
# https://docs.datastax.com/en/streaming/streaming-learning/pulsar-io/connectors/index.html
4747
resource "astra_streaming_sink" "streaming_sink" {
4848
# Required
49+
cluster = astra_streaming_tenant.streaming_tenant.cluster_name
4950
tenant_name = astra_streaming_tenant.streaming_tenant.tenant_name
50-
region = "us-central1"
51-
cloud_provider = astra_streaming_tenant.streaming_tenant.cloud_provider
5251
namespace = astra_streaming_namespace.streaming_namespace.namespace
52+
sink_name = "sink1"
53+
archive = "builtin://jdbc-clickhouse"
5354
topic = astra_streaming_topic.streaming_topic.topic_fqn
5455
auto_ack = true
5556
parallelism = 1
5657
retain_ordering = false
5758
processing_guarantees = "ATLEAST_ONCE"
58-
sink_name = "jdbc-clickhouse"
5959
sink_configs = jsonencode({
6060
"userName" : "clickhouse",
6161
"password" : "password",
@@ -88,7 +88,7 @@ resource "astra_streaming_sink" "streaming_sink" {
8888

8989
### Optional
9090

91-
- `archive` (String) Name of the sink archive type to use. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse'
91+
- `archive` (String) Name of the sink archive type to use, e.g. 'builtin://kafka'. It is recommended to set this field even though it is marked optional. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse'
9292
- `cloud_provider` (String, Deprecated) Cloud provider (deprecated, use `cluster` instead)
9393
- `cluster` (String) Name of the pulsar cluster in which to create the sink. If left blank, the name will be inferred from the cloud provider and region.
9494
- `deletion_protection` (Boolean) Whether or not to allow Terraform to destroy this streaming sink. Unless this field is set to false in Terraform state, a `terraform destroy` or `terraform apply` command that deletes the instance will fail. Defaults to `true`.

examples/resources/astra_streaming_sink/resource.tf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ resource "astra_streaming_topic" "streaming_topic" {
3131
# https://docs.datastax.com/en/streaming/streaming-learning/pulsar-io/connectors/index.html
3232
resource "astra_streaming_sink" "streaming_sink" {
3333
# Required
34+
cluster = astra_streaming_tenant.streaming_tenant.cluster_name
3435
tenant_name = astra_streaming_tenant.streaming_tenant.tenant_name
35-
region = "us-central1"
36-
cloud_provider = astra_streaming_tenant.streaming_tenant.cloud_provider
3736
namespace = astra_streaming_namespace.streaming_namespace.namespace
37+
sink_name = "sink1"
38+
archive = "builtin://jdbc-clickhouse"
3839
topic = astra_streaming_topic.streaming_topic.topic_fqn
3940
auto_ack = true
4041
parallelism = 1
4142
retain_ordering = false
4243
processing_guarantees = "ATLEAST_ONCE"
43-
sink_name = "jdbc-clickhouse"
4444
sink_configs = jsonencode({
4545
"userName" : "clickhouse",
4646
"password" : "password",

internal/provider/resource_streaming_sink.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,12 @@ func (r *StreamingSinkResource) Schema(_ context.Context, _ resource.SchemaReque
141141
},
142142
},
143143
"archive": schema.StringAttribute{
144-
Description: "Name of the sink archive type to use. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse'",
145-
Optional: true,
144+
Description: "Name of the sink archive type to use, e.g. 'builtin://kafka'. It is recommended to set this field even though it is marked optional. " +
145+
"Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse'",
146+
Optional: true,
147+
Computed: true,
146148
PlanModifiers: []planmodifier.String{
147-
stringplanmodifier.RequiresReplace(),
149+
stringplanmodifier.RequiresReplaceIfConfigured(),
148150
},
149151
},
150152
"topic": schema.StringAttribute{
@@ -244,12 +246,16 @@ func (r *StreamingSinkResource) Create(ctx context.Context, req resource.CreateR
244246
tenantName := plan.TenantName.ValueString()
245247
namespace := plan.Namespace.ValueString()
246248
sinkName := plan.SinkName.ValueString()
247-
archive := plan.Archive.ValueString()
248-
retainOrdering := plan.RetainOrdering.ValueBool()
249-
processingGuarantees := plan.ProcessingGuarantees.ValueString()
250-
parallelism := plan.Parallelism.ValueInt32()
251-
topic := plan.Topic.ValueString()
252-
autoAck := plan.AutoAck.ValueBool()
249+
if plan.Archive.ValueString() == "" {
250+
// Use sink_name as archive value if not set
251+
archive := fmt.Sprintf("builtin://%s", sinkName)
252+
plan.Archive = types.StringValue(archive)
253+
resp.Diagnostics.AddWarning(
254+
"archive is not set",
255+
fmt.Sprintf("Defaulting sink_name as archive value '%s'. It is recommended to set the 'archive'"+
256+
" field explicitly to avoid issues with sink creation.", archive),
257+
)
258+
}
253259

254260
var configs map[string]interface{}
255261
if err := json.Unmarshal([]byte(plan.SinkConfigs.ValueString()), &configs); err != nil {
@@ -265,10 +271,10 @@ func (r *StreamingSinkResource) Create(ctx context.Context, req resource.CreateR
265271
Authorization: r.clients.token,
266272
}
267273

268-
sinkInputs := []string{topic}
274+
sinkInputs := []string{plan.Topic.ValueString()}
269275
createSinkBody := astrastreaming.CreateSinkJSONJSONRequestBody{
270-
Archive: &archive,
271-
AutoAck: &autoAck,
276+
Archive: plan.Archive.ValueStringPointer(),
277+
AutoAck: plan.AutoAck.ValueBoolPointer(),
272278
ClassName: nil,
273279
CleanupSubscription: nil,
274280
Configs: &configs,
@@ -280,11 +286,11 @@ func (r *StreamingSinkResource) Create(ctx context.Context, req resource.CreateR
280286
Name: &sinkName,
281287
Namespace: &namespace,
282288
NegativeAckRedeliveryDelayMs: nil,
283-
Parallelism: &parallelism,
284-
ProcessingGuarantees: (*astrastreaming.SinkConfigProcessingGuarantees)(&processingGuarantees),
289+
Parallelism: plan.Parallelism.ValueInt32Pointer(),
290+
ProcessingGuarantees: (*astrastreaming.SinkConfigProcessingGuarantees)(plan.ProcessingGuarantees.ValueStringPointer()),
285291
Resources: nil,
286292
RetainKeyOrdering: nil,
287-
RetainOrdering: &retainOrdering,
293+
RetainOrdering: plan.RetainOrdering.ValueBoolPointer(),
288294
RuntimeFlags: nil,
289295
Secrets: nil,
290296
SinkType: nil,
@@ -398,6 +404,13 @@ func (r *StreamingSinkResource) Delete(ctx context.Context, req resource.DeleteR
398404
return
399405
}
400406

407+
if state.DeletionProtection.ValueBool() {
408+
resp.Diagnostics.AddError(
409+
"failed to delete sink",
410+
"deletion protection is enabled. Set `deletion_protection` to `false` to allow deletion of the sink resource.")
411+
return
412+
}
413+
401414
astraStreamingClient := r.clients.astraStreamingClient
402415

403416
streamingClusterName := state.getClusterName()
@@ -417,7 +430,7 @@ func (r *StreamingSinkResource) Delete(ctx context.Context, req resource.DeleteR
417430
if err != nil {
418431
errMsg := fmt.Sprintf("failed to get sink, error: %v", err)
419432
resp.Diagnostics.AddError(
420-
"failed to get sink",
433+
"failed to get current sink status",
421434
errMsg)
422435
return
423436
} else if deleteSinkResponse.StatusCode() > 299 {

0 commit comments

Comments
 (0)