Skip to content

Commit 18e0053

Browse files
[CLI-3394] Configurable Max-eCKU (#3151)
1 parent 6eb69fd commit 18e0053

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+839
-31
lines changed

cmd/lint/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ var vocabWords = []string{
246246
"deserializer",
247247
"deserializers",
248248
"dns",
249+
"ecku",
249250
"elastic",
250251
"env",
251252
"eu",

internal/kafka/command_cluster_create.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ func (c *clusterCommand) newCreateCommand() *cobra.Command {
6363
pcmd.AddAvailabilityFlag(cmd)
6464
pcmd.AddTypeFlag(cmd)
6565
cmd.Flags().Int("cku", 0, `Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".`)
66+
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+
67+
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
6668
pcmd.AddByokKeyFlag(cmd, c.AuthenticatedCLICommand)
6769
pcmd.AddNetworkFlag(cmd, c.AuthenticatedCLICommand)
6870
pcmd.AddContextFlag(cmd, c.CLICommand)
@@ -135,6 +137,26 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
135137
Byok: keyGlobalObjectReference,
136138
}}
137139

140+
if cmd.Flags().Changed("max-ecku") {
141+
maxEcku, err := cmd.Flags().GetInt("max-ecku")
142+
if err != nil {
143+
return err
144+
}
145+
if clusterType == skuDedicated {
146+
return errors.NewErrorWithSuggestions("the `--max-ecku` flag can only be used when creating a Basic, Standard, Enterprise, or Freight Kafka cluster", "Specify a different cluster with `--type` flag.")
147+
}
148+
149+
if clusterType == skuBasic {
150+
createCluster.Spec.Config.CmkV2Basic.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
151+
} else if clusterType == skuStandard {
152+
createCluster.Spec.Config.CmkV2Standard.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
153+
} else if clusterType == skuEnterprise {
154+
createCluster.Spec.Config.CmkV2Enterprise.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
155+
} else if clusterType == skuFreight {
156+
createCluster.Spec.Config.CmkV2Freight.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
157+
}
158+
}
159+
138160
if cmd.Flags().Changed("cku") {
139161
cku, err := cmd.Flags().GetInt("cku")
140162
if err != nil {

internal/kafka/command_cluster_describe.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type describeStruct struct {
3939
ByokKeyId string `human:"BYOK Key ID" serialized:"byok_key_id"`
4040
EncryptionKeyId string `human:"Encryption Key ID" serialized:"encryption_key_id"`
4141
RestEndpoint string `human:"REST Endpoint" serialized:"rest_endpoint"`
42+
MaxEcku int32 `human:"Max eCKU,omitempty" serialized:"max_ecku,omitempty"`
4243
TopicCount int `human:"Topic Count,omitempty" serialized:"topic_count,omitempty"`
4344
}
4445

@@ -149,6 +150,7 @@ func convertClusterToDescribeStruct(cluster *cmkv2.CmkV2Cluster, usageLimits *ka
149150
ByokKeyId: getCmkByokId(cluster),
150151
EncryptionKeyId: getCmkEncryptionKey(cluster),
151152
RestEndpoint: cluster.Spec.GetHttpEndpoint(),
153+
MaxEcku: getCmkMaxEcku(cluster),
152154
}
153155

154156
// Only set limits field if usage limits are available
@@ -191,6 +193,13 @@ func getKafkaClusterDescribeFields(cluster *cmkv2.CmkV2Cluster, basicFields []st
191193
if cluster.Spec.Byok != nil {
192194
describeFields = append(describeFields, "ByokId")
193195
}
196+
} else {
197+
// Max eCKU field is available only for Basic, Standard, Enterprise, and Freight clusters
198+
// Only show it if the value is positive (non-positive values like -1 indicate it's not set)
199+
maxEcku := getCmkMaxEcku(cluster)
200+
if maxEcku > 0 {
201+
describeFields = append(describeFields, "MaxEcku")
202+
}
194203
}
195204

196205
if getTopicCount {

internal/kafka/command_cluster_update.go

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,24 @@ func (c *clusterCommand) newUpdateCommand() *cobra.Command {
3232
Text: `Update the type of a Kafka cluster from "Basic" to "Standard":`,
3333
Code: `confluent kafka cluster update lkc-123456 --type "standard"`,
3434
},
35+
examples.Example{
36+
Text: `Update the Max eCKU count of a Kafka cluster:`,
37+
Code: `confluent kafka cluster update lkc-123456 --max-ecku 5`,
38+
},
3539
),
3640
}
3741

3842
cmd.Flags().String("name", "", "Name of the Kafka cluster.")
3943
cmd.Flags().Uint32("cku", 0, `Number of Confluent Kafka Units. For Kafka clusters of type "dedicated" only. When shrinking a cluster, you must reduce capacity one CKU at a time.`)
4044
cmd.Flags().String("type", "", `Type of the Kafka cluster. Only supports upgrading from "Basic" to "Standard".`)
45+
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+
46+
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
4147
pcmd.AddContextFlag(cmd, c.CLICommand)
4248
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
4349
pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
4450
pcmd.AddOutputFlag(cmd)
4551

46-
cmd.MarkFlagsOneRequired("name", "cku", "type")
52+
cmd.MarkFlagsOneRequired("name", "cku", "type", "max-ecku")
4753

4854
return cmd
4955
}
@@ -91,7 +97,36 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
9197
update.Spec.Config = &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{CmkV2Dedicated: &cmkv2.CmkV2Dedicated{Kind: "Dedicated", Cku: updatedCku}}
9298
}
9399

94-
if cmd.Flags().Changed("type") {
100+
if cmd.Flags().Changed("max-ecku") {
101+
maxEcku, err := cmd.Flags().GetInt("max-ecku")
102+
if err != nil {
103+
return err
104+
}
105+
currentConfig := currentCluster.GetSpec().Config
106+
if currentConfig.CmkV2Dedicated != nil {
107+
return errors.NewErrorWithSuggestions("the `--max-ecku` flag can only be used when creating or updating a Basic, Standard, Enterprise, or Freight Kafka cluster", "Specify another cluster or use the `--cku` flag instead.")
108+
}
109+
if maxEcku < 1 {
110+
return fmt.Errorf("`--max-ecku` value must be at least 1")
111+
}
112+
113+
targetType := c.getCurrentClusterType(currentConfig)
114+
if cmd.Flags().Changed("type") {
115+
newType, err := cmd.Flags().GetString("type")
116+
if err != nil {
117+
return err
118+
}
119+
if newType == "" {
120+
return fmt.Errorf("`--type` flag value must not be empty")
121+
}
122+
if currentConfig.CmkV2Basic == nil || strings.ToLower(newType) != "standard" {
123+
return fmt.Errorf(`clusters can only be upgraded from "Basic" to "Standard"`)
124+
}
125+
targetType = "Standard"
126+
}
127+
128+
update.Spec.Config = c.createClusterConfig(targetType, int32(maxEcku))
129+
} else if cmd.Flags().Changed("type") {
95130
newType, err := cmd.Flags().GetString("type")
96131
if err != nil {
97132
return err
@@ -100,12 +135,12 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
100135
return fmt.Errorf("`--type` flag value must not be empty")
101136
}
102137

103-
// Validate cluster type upgrade
104138
currentConfig := currentCluster.GetSpec().Config
105139
if currentConfig.CmkV2Basic == nil || strings.ToLower(newType) != "standard" {
106140
return fmt.Errorf(`clusters can only be upgraded from "Basic" to "Standard"`)
107141
}
108142

143+
// When upgrading type without specifying max-ecku, use default max-ecku value for Standard cluster returned by API
109144
// Set the new cluster type
110145
update.Spec.Config = &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
111146
CmkV2Standard: &cmkv2.CmkV2Standard{
@@ -134,6 +169,59 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
134169
return c.outputKafkaClusterDescription(cmd, &updatedCluster, true, usageLimits)
135170
}
136171

172+
func (c *clusterCommand) getCurrentClusterType(config *cmkv2.CmkV2ClusterSpecConfigOneOf) string {
173+
if config.CmkV2Basic != nil {
174+
return "Basic"
175+
} else if config.CmkV2Standard != nil {
176+
return "Standard"
177+
} else if config.CmkV2Enterprise != nil {
178+
return "Enterprise"
179+
} else if config.CmkV2Freight != nil {
180+
return "Freight"
181+
}
182+
return "Basic"
183+
}
184+
185+
func (c *clusterCommand) createClusterConfig(clusterType string, maxEcku int32) *cmkv2.CmkV2ClusterSpecUpdateConfigOneOf {
186+
switch clusterType {
187+
case "Basic":
188+
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
189+
CmkV2Basic: &cmkv2.CmkV2Basic{
190+
Kind: "Basic",
191+
MaxEcku: cmkv2.PtrInt32(maxEcku),
192+
},
193+
}
194+
case "Standard":
195+
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
196+
CmkV2Standard: &cmkv2.CmkV2Standard{
197+
Kind: "Standard",
198+
MaxEcku: cmkv2.PtrInt32(maxEcku),
199+
},
200+
}
201+
case "Enterprise":
202+
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
203+
CmkV2Enterprise: &cmkv2.CmkV2Enterprise{
204+
Kind: "Enterprise",
205+
MaxEcku: cmkv2.PtrInt32(maxEcku),
206+
},
207+
}
208+
case "Freight":
209+
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
210+
CmkV2Freight: &cmkv2.CmkV2Freight{
211+
Kind: "Freight",
212+
MaxEcku: cmkv2.PtrInt32(maxEcku),
213+
},
214+
}
215+
default:
216+
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
217+
CmkV2Basic: &cmkv2.CmkV2Basic{
218+
Kind: "Basic",
219+
MaxEcku: cmkv2.PtrInt32(maxEcku),
220+
},
221+
}
222+
}
223+
}
224+
137225
func (c *clusterCommand) validateResize(cku int32, currentCluster *cmkv2.CmkV2Cluster) (int32, error) {
138226
// Ensure the cluster is a Dedicated Cluster
139227
if currentCluster.GetSpec().Config.CmkV2Dedicated == nil {

internal/kafka/utils.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -116,23 +116,6 @@ func getCmkClusterType(cluster *cmkv2.CmkV2Cluster) string {
116116
return ccstructs.Sku_name[0] // UNKNOWN
117117
}
118118

119-
func getCmkMaxEcku(cluster *cmkv2.CmkV2Cluster) int32 {
120-
if isBasic(cluster) {
121-
return cluster.Spec.Config.CmkV2Basic.GetMaxEcku()
122-
}
123-
if isStandard(cluster) {
124-
return cluster.Spec.Config.CmkV2Standard.GetMaxEcku()
125-
}
126-
if isEnterprise(cluster) {
127-
return cluster.Spec.Config.CmkV2Enterprise.GetMaxEcku()
128-
}
129-
if isFreight(cluster) {
130-
return cluster.Spec.Config.CmkV2Freight.GetMaxEcku()
131-
}
132-
133-
return -1
134-
}
135-
136119
func getCmkClusterSize(cluster *cmkv2.CmkV2Cluster) int32 {
137120
if isDedicated(cluster) {
138121
return *cluster.Status.Cku
@@ -147,6 +130,27 @@ func getCmkClusterPendingSize(cluster *cmkv2.CmkV2Cluster) int32 {
147130
return -1
148131
}
149132

133+
func getCmkMaxEcku(cluster *cmkv2.CmkV2Cluster) int32 {
134+
if isBasic(cluster) {
135+
if cluster.GetSpec().Config.CmkV2Basic.MaxEcku != nil {
136+
return cluster.GetSpec().Config.CmkV2Basic.GetMaxEcku()
137+
}
138+
} else if isStandard(cluster) {
139+
if cluster.GetSpec().Config.CmkV2Standard.MaxEcku != nil {
140+
return cluster.GetSpec().Config.CmkV2Standard.GetMaxEcku()
141+
}
142+
} else if isEnterprise(cluster) {
143+
if cluster.GetSpec().Config.CmkV2Enterprise.MaxEcku != nil {
144+
return cluster.GetSpec().Config.CmkV2Enterprise.GetMaxEcku()
145+
}
146+
} else if isFreight(cluster) {
147+
if cluster.GetSpec().Config.CmkV2Freight.MaxEcku != nil {
148+
return cluster.GetSpec().Config.CmkV2Freight.GetMaxEcku()
149+
}
150+
}
151+
return -1
152+
}
153+
150154
func getCmkByokId(cluster *cmkv2.CmkV2Cluster) string {
151155
if isDedicated(cluster) && cluster.Spec.Byok != nil {
152156
return cluster.Spec.Byok.Id

test/fixtures/output/kafka/1.golden

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Flags:
2323
--availability string Specify the availability of the cluster as "single-zone", "multi-zone", "low", or "high". (default "single-zone")
2424
--type string Specify the type of the Kafka cluster as "basic", "standard", "enterprise", "freight", or "dedicated". (default "basic")
2525
--cku int Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".
26+
--max-ecku int Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. Kafka clusters with "HIGH" availability must have at least two eCKUs.
2627
--byok string Confluent Cloud Key ID of a registered encryption key (use "confluent byok create" to register a key).
2728
--network string Network ID.
2829
--context string CLI context name.

test/fixtures/output/kafka/cluster/create-basic-with-ecku-limits.golden

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ It may take up to 5 minutes for the Kafka cluster to be ready.
44
| ID | lkc-with-ecku-limits |
55
| Name | my-basic-cluster-with-ecku-limits |
66
| Type | BASIC |
7-
| Ingress Limit (MB/s) | 25 |
8-
| Egress Limit (MB/s) | 75 |
7+
| Ingress Limit (MB/s) | 250 |
8+
| Egress Limit (MB/s) | 750 |
99
| Storage | 5000 GB |
1010
| Cloud | aws |
1111
| Region | us-west-2 |
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Error: the `--max-ecku` flag can only be used when creating a Basic, Standard, Enterprise, or Freight Kafka cluster
2+
3+
Suggestions:
4+
Specify a different cluster with `--type` flag.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
It may take up to 5 minutes for the Kafka cluster to be ready.
2+
+----------------------+---------------------------+
3+
| Current | false |
4+
| ID | lkc-def963 |
5+
| Name | my-new-cluster |
6+
| Type | ENTERPRISE |
7+
| Ingress Limit (MB/s) | 240 |
8+
| Egress Limit (MB/s) | 720 |
9+
| Storage | Unlimited |
10+
| Cloud | aws |
11+
| Region | us-east-1 |
12+
| Availability | multi-zone |
13+
| Status | PROVISIONING |
14+
| Endpoint | SASL_SSL://kafka-endpoint |
15+
| REST Endpoint | https://pkc-endpoint |
16+
| Max eCKU | 4 |
17+
+----------------------+---------------------------+

test/fixtures/output/kafka/cluster/create-flag-error.golden

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Error: at least one of the flags in the group [name cku type] is required
1+
Error: at least one of the flags in the group [name cku type max-ecku] is required
22
Usage:
33
confluent kafka cluster update <id> [flags]
44

@@ -11,10 +11,15 @@ Update the type of a Kafka cluster from "Basic" to "Standard":
1111

1212
$ confluent kafka cluster update lkc-123456 --type "standard"
1313

14+
Update the Max eCKU count of a Kafka cluster:
15+
16+
$ confluent kafka cluster update lkc-123456 --max-ecku 5
17+
1418
Flags:
1519
--name string Name of the Kafka cluster.
1620
--cku uint32 Number of Confluent Kafka Units. For Kafka clusters of type "dedicated" only. When shrinking a cluster, you must reduce capacity one CKU at a time.
1721
--type string Type of the Kafka cluster. Only supports upgrading from "Basic" to "Standard".
22+
--max-ecku int Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. Kafka clusters with "HIGH" availability must have at least two eCKUs.
1823
--context string CLI context name.
1924
--environment string Environment ID.
2025
--kafka-endpoint string Endpoint to be used for this Kafka cluster.

0 commit comments

Comments
 (0)