Skip to content

Commit c87e8be

Browse files
authored
feat: Long-running operation improvements for mongodbatlas_stream_processor resource (#3571)
* implement timeout on create * delete_on_create_timeout * delete_on_create_timeout test * docs * testing
1 parent 08c76e5 commit c87e8be

File tree

9 files changed

+113
-25
lines changed

9 files changed

+113
-25
lines changed

.changelog/3571.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
```release-note:enhancement
2+
resource/mongodbatlas_stream_processor: Adds `delete_on_create_timeout` attribute to indicate whether to delete the resource if its creation times out
3+
```
4+
5+
```release-note:enhancement
6+
resource/mongodbatlas_stream_processor: Adds `timeouts` attribute for create operation
7+
```

docs/guides/2.0.0-upgrade-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ The Terraform MongoDB Atlas Provider version 2.0.0 has the following new feature
2121
- `mongodbatlas_online_archive`
2222
- `mongodbatlas_privatelink_endpoint`
2323
- `mongodbatlas_privatelink_endpoint_service`
24+
- `mongodbatlas_push_based_log_export`
2425
- `mongodbatlas_search_deployment`
26+
- `mongodbatlas_stream_processor`
2527

2628
## Breaking Changes
2729

docs/resources/stream_processor.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,12 @@ output "stream_processors_results" {
132132

133133
### Optional
134134

135+
- `delete_on_create_timeout` (Boolean) Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to `true` and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to `false`, the timeout will not trigger resource deletion. If you suspect a transient error when the value is `true`, wait before retrying to allow resource deletion to finish. Default is `true`.
135136
- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options))
136137
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.
137138

138139
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
140+
- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))
139141

140142
### Read-Only
141143

@@ -158,6 +160,15 @@ Required:
158160
- `connection_name` (String) Name of the connection to write DLQ messages to. Must be an Atlas connection.
159161
- `db` (String) Name of the database to use for the DLQ.
160162

163+
164+
165+
<a id="nestedatt--timeouts"></a>
166+
### Nested Schema for `timeouts`
167+
168+
Optional:
169+
170+
- `create` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
171+
161172
## Import
162173
Stream Processor resource can be imported using the Project ID, Stream Instance name and Stream Processor name, in the format `INSTANCE_NAME-PROJECT_ID-PROCESSOR_NAME`, e.g.
163174
```

internal/service/streamprocessor/model.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66

77
"github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes"
8+
"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
89
"github.com/hashicorp/terraform-plugin-framework/diag"
910
"github.com/hashicorp/terraform-plugin-framework/types"
1011
"github.com/hashicorp/terraform-plugin-framework/types/basetypes"
@@ -79,7 +80,7 @@ func NewStreamProcessorUpdateReq(ctx context.Context, plan *TFStreamProcessorRSM
7980
return streamProcessorAPIParams, nil
8081
}
8182

82-
func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName string, apiResp *admin.StreamsProcessorWithStats) (*TFStreamProcessorRSModel, diag.Diagnostics) {
83+
func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName string, apiResp *admin.StreamsProcessorWithStats, timeout *timeouts.Value, deleteOnCreateTimeout *types.Bool) (*TFStreamProcessorRSModel, diag.Diagnostics) {
8384
if apiResp == nil {
8485
return nil, diag.Diagnostics{diag.NewErrorDiagnostic("streamProcessor API response is nil", "")}
8586
}
@@ -105,6 +106,12 @@ func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName st
105106
State: types.StringPointerValue(&apiResp.State),
106107
Stats: statsTF,
107108
}
109+
if timeout != nil {
110+
tfModel.Timeouts = *timeout
111+
}
112+
if deleteOnCreateTimeout != nil {
113+
tfModel.DeleteOnCreateTimeout = *deleteOnCreateTimeout
114+
}
108115
return tfModel, nil
109116
}
110117

internal/service/streamprocessor/model_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func TestSDKToTFModel(t *testing.T) {
230230
for _, tc := range testCases {
231231
t.Run(tc.name, func(t *testing.T) {
232232
sdkModel := tc.sdkModel
233-
resultModel, diags := streamprocessor.NewStreamProcessorWithStats(t.Context(), projectID, instanceName, sdkModel)
233+
resultModel, diags := streamprocessor.NewStreamProcessorWithStats(t.Context(), projectID, instanceName, sdkModel, nil, nil)
234234
if diags.HasError() {
235235
t.Fatalf("unexpected errors found: %s", diags.Errors()[0].Summary())
236236
}

internal/service/streamprocessor/resource.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/hashicorp/terraform-plugin-framework/path"
1111
"github.com/hashicorp/terraform-plugin-framework/resource"
1212

13+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/cleanup"
1314
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
1415
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/validate"
1516
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
@@ -86,7 +87,19 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque
8687
ProcessorName: processorName,
8788
}
8889

89-
streamProcessorResp, err := WaitStateTransition(ctx, streamProcessorParams, connV2.StreamsApi, []string{InitiatingState, CreatingState}, []string{CreatedState})
90+
createTimeout := cleanup.ResolveTimeout(ctx, &plan.Timeouts, cleanup.OperationCreate, &resp.Diagnostics)
91+
if resp.Diagnostics.HasError() {
92+
return
93+
}
94+
95+
streamProcessorResp, err := WaitStateTransitionWithTimeout(ctx, streamProcessorParams, connV2.StreamsApi, []string{InitiatingState, CreatingState}, []string{CreatedState}, createTimeout)
96+
err = cleanup.HandleCreateTimeout(cleanup.ResolveDeleteOnCreateTimeout(plan.DeleteOnCreateTimeout), err, func(ctxCleanup context.Context) error {
97+
_, err := connV2.StreamsApi.DeleteStreamProcessor(ctxCleanup, projectID, instanceName, processorName).Execute()
98+
if err != nil {
99+
return err
100+
}
101+
return nil
102+
})
90103
if err != nil {
91104
resp.Diagnostics.AddError("Error creating stream processor", err.Error())
92105
return
@@ -111,7 +124,7 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque
111124
}
112125
}
113126

114-
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp)
127+
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout)
115128
if diags.HasError() {
116129
resp.Diagnostics.Append(diags...)
117130
return
@@ -140,7 +153,7 @@ func (r *streamProcessorRS) Read(ctx context.Context, req resource.ReadRequest,
140153
return
141154
}
142155

143-
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessor)
156+
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessor, &state.Timeouts, &state.DeleteOnCreateTimeout)
144157
if diags.HasError() {
145158
resp.Diagnostics.Append(diags...)
146159
return
@@ -238,7 +251,7 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
238251
}
239252
}
240253

241-
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp)
254+
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout)
242255
if diags.HasError() {
243256
resp.Diagnostics.Append(diags...)
244257
return

internal/service/streamprocessor/resource_schema.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55

66
"github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes"
7+
"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
78
"github.com/hashicorp/terraform-plugin-framework/attr"
89
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
910
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
1011
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
1112
"github.com/hashicorp/terraform-plugin-framework/types"
13+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/customplanmodifier"
1214
)
1315

1416
func ResourceSchema(ctx context.Context) schema.Schema {
@@ -73,19 +75,31 @@ func ResourceSchema(ctx context.Context) schema.Schema {
7375
Computed: true,
7476
MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.",
7577
},
78+
"timeouts": timeouts.Attributes(ctx, timeouts.Opts{
79+
Create: true,
80+
}),
81+
"delete_on_create_timeout": schema.BoolAttribute{
82+
Optional: true,
83+
PlanModifiers: []planmodifier.Bool{
84+
customplanmodifier.CreateOnlyBoolPlanModifier(),
85+
},
86+
MarkdownDescription: "Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to `true` and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to `false`, the timeout will not trigger resource deletion. If you suspect a transient error when the value is `true`, wait before retrying to allow resource deletion to finish. Default is `true`.",
87+
},
7688
},
7789
}
7890
}
7991

8092
type TFStreamProcessorRSModel struct {
81-
InstanceName types.String `tfsdk:"instance_name"`
82-
Options types.Object `tfsdk:"options"`
83-
Pipeline jsontypes.Normalized `tfsdk:"pipeline"`
84-
ProcessorID types.String `tfsdk:"id"`
85-
ProcessorName types.String `tfsdk:"processor_name"`
86-
ProjectID types.String `tfsdk:"project_id"`
87-
State types.String `tfsdk:"state"`
88-
Stats types.String `tfsdk:"stats"`
93+
InstanceName types.String `tfsdk:"instance_name"`
94+
Options types.Object `tfsdk:"options"`
95+
Pipeline jsontypes.Normalized `tfsdk:"pipeline"`
96+
ProcessorID types.String `tfsdk:"id"`
97+
ProcessorName types.String `tfsdk:"processor_name"`
98+
ProjectID types.String `tfsdk:"project_id"`
99+
State types.String `tfsdk:"state"`
100+
Stats types.String `tfsdk:"stats"`
101+
Timeouts timeouts.Value `tfsdk:"timeouts"`
102+
DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"`
89103
}
90104

91105
type TFOptionsModel struct {

internal/service/streamprocessor/resource_test.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ func basicTestCase(t *testing.T) *resource.TestCase {
5757
CheckDestroy: checkDestroyStreamProcessor,
5858
Steps: []resource.TestStep{
5959
{
60-
Config: config(t, projectID, instanceName, processorName, "", randomSuffix, sampleSrcConfig, testLogDestConfig),
60+
Config: config(t, projectID, instanceName, processorName, "", randomSuffix, sampleSrcConfig, testLogDestConfig, "", nil),
6161
Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, false),
6262
ConfigStateChecks: pluralConfigStateChecks(processorName, streamprocessor.CreatedState, instanceName, false, false),
6363
},
6464
{
65-
Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, randomSuffix, sampleSrcConfig, testLogDestConfig),
65+
Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, randomSuffix, sampleSrcConfig, testLogDestConfig, "", nil),
6666
Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StartedState, true, false),
6767
ConfigStateChecks: pluralConfigStateChecks(processorName, streamprocessor.StartedState, instanceName, true, false),
6868
},
@@ -89,7 +89,7 @@ func TestAccStreamProcessor_JSONWhiteSpaceFormat(t *testing.T) {
8989
CheckDestroy: checkDestroyStreamProcessor,
9090
Steps: []resource.TestStep{
9191
{
92-
Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, randomSuffix, sampleSrcConfigExtraSpaces, testLogDestConfig),
92+
Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, randomSuffix, sampleSrcConfigExtraSpaces, testLogDestConfig, "", nil),
9393
Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, false),
9494
ConfigStateChecks: pluralConfigStateChecks(processorName, streamprocessor.CreatedState, instanceName, false, false),
9595
},
@@ -112,7 +112,7 @@ func TestAccStreamProcessor_withOptions(t *testing.T) {
112112
CheckDestroy: checkDestroyStreamProcessor,
113113
Steps: []resource.TestStep{
114114
{
115-
Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, randomSuffix, src, dest),
115+
Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, randomSuffix, src, dest, "", nil),
116116
Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, true),
117117
ConfigStateChecks: pluralConfigStateChecks(processorName, streamprocessor.CreatedState, instanceName, false, true),
118118
},
@@ -276,7 +276,7 @@ func TestAccStreamProcessor_clusterType(t *testing.T) {
276276
CheckDestroy: checkDestroyStreamProcessor,
277277
Steps: []resource.TestStep{
278278
{
279-
Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, randomSuffix, srcConfig, testLogDestConfig),
279+
Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, randomSuffix, srcConfig, testLogDestConfig, "", nil),
280280
Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StartedState, true, false),
281281
ConfigStateChecks: pluralConfigStateChecks(processorName, streamprocessor.StartedState, instanceName, true, false),
282282
},
@@ -297,16 +297,38 @@ func TestAccStreamProcessor_createErrors(t *testing.T) {
297297
CheckDestroy: checkDestroyStreamProcessor,
298298
Steps: []resource.TestStep{
299299
{
300-
Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, randomSuffix, invalidJSONConfig, testLogDestConfig),
300+
Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, randomSuffix, invalidJSONConfig, testLogDestConfig, "", nil),
301301
ExpectError: regexp.MustCompile("Invalid JSON String Value"),
302302
},
303303
{
304-
Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, randomSuffix, sampleSrcConfig, testLogDestConfig),
304+
Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, randomSuffix, sampleSrcConfig, testLogDestConfig, "", nil),
305305
ExpectError: regexp.MustCompile("When creating a stream processor, the only valid states are CREATED and STARTED"),
306306
},
307307
}})
308308
}
309309

310+
func TestAccStreamProcessor_createTimeoutWithDeleteOnCreate(t *testing.T) {
311+
acc.SkipTestForCI(t) // Creation of stream processor for testing is too fast to force the creation timeout
312+
var (
313+
projectID, instanceName = acc.ProjectIDExecutionWithStreamInstance(t)
314+
processorName = "new-processor"
315+
randomSuffix = acctest.RandString(5)
316+
createTimeout = "1s"
317+
deleteOnCreateTimeout = true
318+
)
319+
320+
resource.ParallelTest(t, resource.TestCase{
321+
PreCheck: func() { acc.PreCheckBasic(t) },
322+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
323+
CheckDestroy: checkDestroyStreamProcessor,
324+
Steps: []resource.TestStep{
325+
{
326+
Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, randomSuffix, sampleSrcConfig, testLogDestConfig, acc.TimeoutConfig(&createTimeout, nil, nil, true), &deleteOnCreateTimeout),
327+
ExpectError: regexp.MustCompile("will run cleanup because delete_on_create_timeout is true"),
328+
},
329+
}})
330+
}
331+
310332
func checkExists(resourceName string) resource.TestCheckFunc {
311333
return func(s *terraform.State) error {
312334
rs, ok := s.RootModule().Resources[resourceName]
@@ -526,12 +548,16 @@ func composeStreamProcessorChecks(projectID, instanceName, processorName, state
526548
return resource.ComposeAggregateTestCheckFunc(checks...)
527549
}
528550

529-
func config(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig) string {
551+
func config(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string {
530552
t.Helper()
531553
stateConfig := ""
532554
if state != "" {
533555
stateConfig = fmt.Sprintf(`state = %[1]q`, state)
534556
}
557+
deleteOnCreateTimeoutConfig := ""
558+
if deleteOnCreateTimeout != nil {
559+
deleteOnCreateTimeoutConfig = fmt.Sprintf(`delete_on_create_timeout = %[1]t`, *deleteOnCreateTimeout)
560+
}
535561

536562
connectionConfigSrc, connectionIDSrc, pipelineStepSrc := configConnection(t, projectID, instanceName, src, nameSuffix)
537563
connectionConfigDest, connectionIDDest, pipelineStepDest := configConnection(t, projectID, instanceName, dest, nameSuffix)
@@ -581,9 +607,11 @@ func config(t *testing.T, projectID, instanceName, processorName, state, nameSuf
581607
%[5]s
582608
%[6]s
583609
depends_on = [%[7]s]
610+
%[8]s
611+
%[9]s
584612
}
585613
586-
`, projectID, instanceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr) + otherConfig
614+
`, projectID, instanceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig
587615
}
588616

589617
func configConnection(t *testing.T, projectID, instanceName string, config connectionConfig, nameSuffix string) (connectionConfig, resourceID, pipelineStep string) {

internal/service/streamprocessor/state_transition.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,21 @@ const (
2424
const (
2525
ErrorUpdateStateTransition = "Stream Processor must be in %s state to transition to %s state"
2626
ErrorUpdateToCreatedState = "Stream Processor cannot transition from %s to CREATED"
27+
defaultTimeout = 5 * time.Minute // big pipelines can take a while to stop due to checkpointing. By default, we prefer the API to raise the error (~ 3min) than having to expose custom timeouts.
28+
minTimeout = 3 * time.Second
2729
)
2830

2931
func WaitStateTransition(ctx context.Context, requestParams *admin.GetStreamProcessorApiParams, client admin.StreamsApi, pendingStates, desiredStates []string) (*admin.StreamsProcessorWithStats, error) {
32+
return WaitStateTransitionWithTimeout(ctx, requestParams, client, pendingStates, desiredStates, defaultTimeout)
33+
}
34+
35+
func WaitStateTransitionWithTimeout(ctx context.Context, requestParams *admin.GetStreamProcessorApiParams, client admin.StreamsApi, pendingStates, desiredStates []string, timeout time.Duration) (*admin.StreamsProcessorWithStats, error) {
3036
stateConf := &retry.StateChangeConf{
3137
Pending: pendingStates,
3238
Target: desiredStates,
3339
Refresh: refreshFunc(ctx, requestParams, client),
34-
Timeout: 5 * time.Minute, // big pipelines can take a while to stop due to checkpointing. We prefer the API to raise the error (~ 3min) than having to expose custom timeouts.
35-
MinTimeout: 3 * time.Second,
40+
Timeout: timeout,
41+
MinTimeout: minTimeout,
3642
Delay: 0,
3743
}
3844

0 commit comments

Comments
 (0)