-
Notifications
You must be signed in to change notification settings - Fork 42
feat: Add StreamWorkspace CloudFormation resource #1517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a46c35f to
38ccb32
Compare
kpatel71716
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pretty good at a high level. I have some questions and small comments for now
|
|
||
| _Type_: String | ||
|
|
||
| _Allowed Values_: <code>Kafka</code> | <code>Cluster</code> | <code>Sample</code> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] we have many other connection types. Should we add the connection docs and configs in a follow-up and remove all the stream connection code to simplify this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed connection docs (streamsconnection.md, streamskafkaauthentication.md, streamskafkasecurity.md) - will add in follow-up PR
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| result := resource.ValidateTierComparison(tc.streamConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] since this method is in the resource package, can we just do result := ValidateTierComparison(tc.streamConfig). Also do we need to export resource.ValidateTierComparison of can we just have validateTierComparison
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported ValidateTierComparison to resolve linter issue requiring resource_test.go to be in resource_test package. Since resource_test.go uses a separate package (resource_test) instead of resource package, these method need to be exported to be accessible in test cases. This follows the same pattern used in other resources like project and search-index.
|
|
||
| // GetTierValue returns a numeric value for tier comparison | ||
| // SP2 < SP5 < SP10 < SP30 < SP50 | ||
| func GetTierValue(tier string) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] does this method need to be exported? can we just have getTierValue? Same goes for the other methods in this package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported GetTierValue and other methods to resolve linter issue requiring resource_test.go to be in resource_test package. Since resource_test.go uses a separate package (resource_test) instead of resource package, these functions need to be exported to be accessible in test cases. This follows the same pattern used in other resources like project and search-index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. I was thinking that we could have the tests be in the resource package as well instead of their own package but not a blocker from me
| cloudProvider = "AWS" | ||
| region = "VIRGINIA_USA" | ||
| tier = "SP30" | ||
| maxTierSize = "100" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] This should be a valid tier value to prevent confusion (even though this will be validated downstream in the api server code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed - changed maxTierSize from "100" to "SP50" to use a valid tier value.
| }, nil | ||
| } | ||
|
|
||
| // Update is not supported for StreamWorkspace as all configurable properties are create-only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an fyi that update support wis on the roadmap to be supported soon (we support it unofficially in our apis, but it is hidden from the documentation as we test and communicate this to our blocked customers). No action needed for now but worth mentioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kpatel71716 is there a timeline on when we want to officially support Update? I believe we do support/expose it in our Terraform provider right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would estimate this will be publicly available in around a month. Terraform is a bit different as it actually destroys and recreates the workspace to update it (which I understand is a bit confusing)
38ccb32 to
003592f
Compare
| return NewStreamsTenant(model) | ||
| } | ||
|
|
||
| // NewStreamsTenant maps CFN model to Atlas API StreamsTenant (kept for backward compatibility) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate on "(kept for backward compatibility)"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the vague backward compatibility comment; the function NewStreamWorkspaceCreateReq is the standard mapping function with no legacy dependencies
| Name: connection.Name, | ||
| Type: connection.Type, | ||
| } | ||
| if connection.GetType() == "Kafka" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make "Kafka" & "Cluster" constants & reuse here & in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced string literals with the existing Kafka and Cluster constants and updated tests to use them.
| @@ -0,0 +1,190 @@ | |||
| // Copyright 2024 MongoDB Inc | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copyright year updated to 2026.
| } | ||
|
|
||
| // Validate tier comparison: MaxTierSize must not be less than Tier | ||
| if errEvent := ValidateTierComparison(currentModel.StreamConfig); errEvent != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] does the API validate this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the tier comparison validation; the API enforces that MaxTierSize must not be less than Tier.
| return handleError(resp, constants.CREATE, err) | ||
| } | ||
|
|
||
| currentModel.Id = createdStreamWorkspace.Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of only updating the id in currentModel, why don't we re-build it from createdStreamWorkspace completely using GetStreamWorkspaceModel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to rebuild the complete model using GetStreamWorkspaceModel instead of only updating the Id field.
| } | ||
|
|
||
| // GetStreamWorkspaceModel maps API response to CFN model, preserving primary identifier fields from currentModel | ||
| func GetStreamWorkspaceModel(streamTenant *admin.StreamsTenant, currentModel *Model) *Model { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which "primary identifier fields" are being referred to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the vague comment
| // Update is not supported for StreamWorkspace as all configurable properties are create-only. | ||
| // All properties (WorkspaceName, ProjectId, Profile, StreamConfig, DataProcessRegion) are in createOnlyProperties, | ||
| // which means CloudFormation will require resource replacement for any changes. | ||
| // Since the resource has a custom name (WorkspaceName), CloudFormation cannot perform replacement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Since the resource has a custom name (WorkspaceName), CloudFormation cannot perform replacement." found this a bit confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the confusing comment; the Update function is implemented and updates DataProcessRegion.Region, while other properties remain create-only per the schema.
| } | ||
|
|
||
| // Validate tier comparison: MaxTierSize must not be less than Tier | ||
| if errEvent := ValidateTierComparison(currentModel.StreamConfig); errEvent != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can Tier be updated? IIUC then no right? since the object is immutable & StreamConfig is createOnly.
Do we still need this validation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the tier comparison validation from Update
| return handleError(resp, constants.UPDATE, err) | ||
| } | ||
|
|
||
| // Resource exists but is immutable - return current state (no-op update) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a log here that an update isn't actually supported & we're just returning the current state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update function now performs actual updates (DataProcessRegion.Region); removed the immutable comment since updates are supported.
be161ea to
8aee0e6
Compare
| "type": "object", | ||
| "description": "Configuration options for an Atlas Stream Processing Workspace.", | ||
| "properties": { | ||
| "Tier": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] is it difficult to to make the Tier property required here? Does doing so break any automated schema generation? If so, then no action needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking Tier as required should be straightforward. The Atlas SDK defines Tier as optional (json:"tier,omitempty"), so this adds CloudFormation schema-level validation. No code changes needed.
kpatel71716
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks good at a high level from another pass. I'll tag someone on my team as well to take a look and we'll give it another pass to make sure we don't miss anything
8aee0e6 to
9aa4e50
Compare
I'll take a look EOD today or early monday morning. |
| "go.mongodb.org/atlas-sdk/v20250312010/mockadmin" | ||
| ) | ||
|
|
||
| func createTestStreamWorkspaceModel() *resource.Model { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sivaram-mongodb could have a look at the recent guidelines shared on Slack & revisit this PR once?
Do all these tests actually add any value?
Also can you update the screenshots in the description to include all CRUD operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this resource_test.go
9aa4e50 to
018f1a6
Compare
018f1a6 to
82de04d
Compare
| Name: connection.Name, | ||
| Type: connection.Type, | ||
| } | ||
| if connection.GetType() == Kafka { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] do we only support two connection types for cloudformation (kafka and cluster?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also removed.
| } | ||
| } | ||
|
|
||
| func NewModelConnections(streamConfig *[]admin.StreamsConnection) []StreamsConnection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] should this be called streamConnections instead of streamConfig? We already have another function above for streamConfig which can be confusing.
func newModelStreamConfig(streamConfig *admin.StreamConfig) *StreamConfig {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the stream connections references since this is a stream workspace resource
| DataProcessRegion *StreamsDataProcessRegion `json:",omitempty"` | ||
| StreamConfig *StreamConfig `json:",omitempty"` | ||
| Id *string `json:",omitempty"` | ||
| Connections []StreamsConnection `json:",omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] for terraform, we have connections as its own resource, in cloudformation, do we include other resources in the workspace model like this vs as a separate resource model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connections is being removed from this code.
| "type": "string", | ||
| "description": "Selected tier for the Stream Workspace. Configures Memory / VCPU allowances.", | ||
| "title": "Stream Workspace Tier", | ||
| "enum": ["SP2", "SP5", "SP10", "SP30", "SP50"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the enum values here and below for maxTierSize and instead include those in the field description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| - **AWS Only**: This CloudFormation resource is designed for AWS deployments. The CloudProvider parameter is constrained to "AWS" only. | ||
| - **Create-Only Properties**: `WorkspaceName`, `ProjectId`, and `Profile` are create-only properties. To change these, you must delete and recreate the stack. | ||
| - **Updateable Properties**: `StreamConfig.Tier` and `StreamConfig.MaxTierSize` can be updated after creation. | ||
| - **Read-Only Properties**: `Id`, `Hostnames`, and `Connections` are read-only and returned by CloudFormation but cannot be set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is connections still applicable?
| - `MaxTierSize`: SP50 | ||
| - `Profile`: default | ||
|
|
||
| ## Notes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this section. It may be better to include in the field description itself instead
|
|
||
| - Verify Atlas API keys are correctly stored in AWS Secrets Manager | ||
| - Check CloudWatch logs for handler execution errors | ||
| - Ensure the resource type is registered in your private registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont believe this is a requirement for users as long as they have the extension activated in their account
| - Ensure the resource type is registered in your private registry | ||
| - Verify your IP address is on the Atlas IP Access List | ||
|
|
||
| **Workspace Not Found in Atlas:** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why do we need this? does it happen very often?
|
@sivaram-mongodb @ParthasarathyV can we also have these contract tests run in Github CI? |
389a5e5 to
7f9571d
Compare
7f9571d to
0972221
Compare
Proposed changes
Added new resource
MongoDB::Atlas::StreamWorkspacefor managing Atlas Stream Processing Workspaces.Resource Configuration
Required Properties:
WorkspaceName: Human-readable label for the workspaceProjectId: Atlas project identifierDataProcessRegion: Cloud provider and region for stream processing (AWS only for CloudFormation)Optional Properties:
StreamConfig: Configuration for stream processing capabilitiesTier: Selected tier (SP2, SP5, SP10, SP30, SP50)MaxTierSize: Maximum tier size for auto-scaling (must be >= Tier)Read-Only Properties:
Id: Unique workspace identifierConnections: List of stream connections configured for the workspaceHostnames: List of hostnames assigned to the workspaceNote: Resource is effectively immutable - all configurable properties are create-only.
Testing
CFN Contract Tests:

Stack Testing:
Jira ticket: CLOUDP-368428
Type of change:
Manual QA performed:
Required Checklist:
make fmtand formatted my codeScreenshots