Skip to content

Commit 14de7e9

Browse files
[APIE-483] Support for Azure Tableflow (#3190)
1 parent 58ef5b6 commit 14de7e9

16 files changed

+190
-17
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ require (
5050
github.com/confluentinc/ccloud-sdk-go-v2/service-quota v0.2.0
5151
github.com/confluentinc/ccloud-sdk-go-v2/srcm v0.7.3
5252
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
53-
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.4.0
53+
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
5454
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
5555
github.com/confluentinc/cmf-sdk-go v0.0.4
5656
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/srcm v0.7.3 h1:ozdDSJHruQIgtxS5hwz8Rp8p
258258
github.com/confluentinc/ccloud-sdk-go-v2/srcm v0.7.3/go.mod h1:cD0AeCMBAWBesmWxWCMgVYNABYgHJ/ahCj7b4HP2R2I=
259259
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1 h1:WZJYfgXJrvTIYQpCFps/qHF7T8ekgPlX/SFqx4EY2zQ=
260260
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1/go.mod h1:kB+MXWYYg9ohrTCb27LlfpTbuexAzyYAmum105ow0ho=
261-
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.4.0 h1:QFp1J5P4o3I28c4eXU12gZFC2YauG8XozxvHBXltzFo=
262-
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.4.0/go.mod h1:WRCZS91/w+RlmsBwHsxcTGW8zIquRAJNnzkJtuWk2yY=
261+
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zCJwBY6/FiJfyN9Q/XO8e6sMRI=
262+
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
263263
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
264264
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
265265
github.com/confluentinc/cmf-sdk-go v0.0.4 h1:IAzACCIgcp0OAah9pvr6xtqaLUoQNoorxynNzIH5dQQ=

internal/tableflow/command_topic.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
const (
1717
byos = "BYOS"
1818
managed = "MANAGED"
19+
azure = "AzureDataLakeStorageGen2"
1920

2021
suspend = "SUSPEND"
2122
skip = "SKIP"
@@ -36,6 +37,9 @@ type topicOut struct {
3637
ProviderIntegrationId string `human:"Provider Integration ID,omitempty" serialized:"provider_integration_id,omitempty"`
3738
BucketName string `human:"Bucket Name,omitempty" serialized:"bucket_name,omitempty"`
3839
BucketRegion string `human:"Bucket Region,omitempty" serialized:"bucket_region,omitempty"`
40+
ContainerName string `human:"Container Name,omitempty" serialized:"container_name,omitempty"`
41+
StorageAccountName string `human:"Storage Account Name,omitempty" serialized:"storage_account_name,omitempty"`
42+
StorageRegion string `human:"Storage Region,omitempty" serialized:"storage_region ,omitempty"`
3943
Suspended bool `human:"Suspended" serialized:"suspended"`
4044
TableFormats string `human:"Table Formats" serialized:"table_formats"`
4145
TablePath string `human:"Table Path" serialized:"table_path"`
@@ -111,6 +115,10 @@ func getStorageType(topic tableflowv1.TableflowV1TableflowTopic) (string, error)
111115
return managed, nil
112116
}
113117

118+
if config.TableflowV1AzureAdlsSpec != nil {
119+
return azure, nil
120+
}
121+
114122
return "", fmt.Errorf(errors.CorruptedNetworkResponseErrorMsg, "config")
115123
}
116124

@@ -221,6 +229,12 @@ func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowT
221229
out.TablePath = topic.Spec.Storage.TableflowV1ByobAwsSpec.GetTablePath()
222230
} else if storageType == managed {
223231
out.TablePath = topic.Spec.Storage.TableflowV1ManagedStorageSpec.GetTablePath()
232+
} else if storageType == azure {
233+
out.ProviderIntegrationId = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetProviderIntegrationId()
234+
out.ContainerName = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetContainerName()
235+
out.StorageAccountName = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetStorageAccountName()
236+
out.StorageRegion = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetStorageRegion()
237+
out.TablePath = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetTablePath()
224238
}
225239

226240
table := output.NewTable(cmd)

internal/tableflow/command_topic_enable.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ func (c *command) newTopicEnableCommand() *cobra.Command {
3535
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
3636

3737
cmd.Flags().String("retention-ms", "604800000", "Specify the max age of snapshots (Iceberg) or versions (Delta) (snapshot/version expiration) to keep on the table in milliseconds for the Tableflow enabled topic.")
38-
cmd.Flags().String("storage-type", "MANAGED", "Specify the storage type of the Kafka cluster, one of MANAGED or BYOS.")
38+
cmd.Flags().String("storage-type", "MANAGED", "Specify the storage type of the Kafka cluster, one of MANAGED, BYOS or AzureDataLakeStorageGen2.")
3939
cmd.Flags().String("provider-integration", "", "Specify the provider integration id.")
4040
cmd.Flags().String("bucket-name", "", "Specify the name of the AWS S3 bucket.")
4141
cmd.Flags().String("table-formats", "ICEBERG", "Specify the table formats, one of DELTA or ICEBERG.")
42+
cmd.Flags().String("storage-account-name", "", "Specify the storage account name for Azure Data Lake.")
43+
cmd.Flags().String("container-name", "", "Specify the container name for Azure Data Lake.")
4244
addErrorHandlingFlags(cmd)
4345

4446
pcmd.AddContextFlag(cmd, c.CLICommand)
@@ -105,6 +107,16 @@ func (c *command) enable(cmd *cobra.Command, args []string) error {
105107
}
106108
tableFormatsSlice := []string{tableFormats}
107109

110+
storageAccountName, err := cmd.Flags().GetString("storage-account-name")
111+
if err != nil {
112+
return err
113+
}
114+
115+
containerName, err := cmd.Flags().GetString("container-name")
116+
if err != nil {
117+
return err
118+
}
119+
108120
createTopic := tableflowv1.TableflowV1TableflowTopic{
109121

110122
Spec: &tableflowv1.TableflowV1TableflowTopicSpec{
@@ -156,8 +168,8 @@ func (c *command) enable(cmd *cobra.Command, args []string) error {
156168
createTopic.Spec.Storage = &tableflowv1.TableflowV1TableflowTopicSpecStorageOneOf{
157169
TableflowV1ByobAwsSpec: &tableflowv1.TableflowV1ByobAwsSpec{
158170
Kind: "ByobAws",
159-
BucketName: *tableflowv1.PtrString(bucketName),
160-
ProviderIntegrationId: *tableflowv1.PtrString(providerIntegration),
171+
BucketName: bucketName,
172+
ProviderIntegrationId: providerIntegration,
161173
},
162174
}
163175
} else if strings.ToUpper(storageType) == "MANAGED" {
@@ -166,6 +178,18 @@ func (c *command) enable(cmd *cobra.Command, args []string) error {
166178
Kind: "Managed",
167179
},
168180
}
181+
} else if strings.ToUpper(storageType) == "AZUREDATALAKESTORAGEGEN2" {
182+
if !cmd.Flags().Changed("provider-integration") || !cmd.Flags().Changed("storage-account-name") || !cmd.Flags().Changed("container-name") {
183+
return fmt.Errorf("provider-integration, storage-account-name and container-name flags are required when storage-type is AzureDataLakeStorageGen2.")
184+
}
185+
createTopic.Spec.Storage = &tableflowv1.TableflowV1TableflowTopicSpecStorageOneOf{
186+
TableflowV1AzureAdlsSpec: &tableflowv1.TableflowV1AzureAdlsSpec{
187+
Kind: "AzureDataLakeStorageGen2",
188+
StorageAccountName: storageAccountName,
189+
ContainerName: containerName,
190+
ProviderIntegrationId: providerIntegration,
191+
},
192+
}
169193
} else {
170194
return fmt.Errorf("Unrecognized Storage Type: %s", storageType)
171195
}

internal/tableflow/command_topic_list.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *command) list(cmd *cobra.Command, _ []string) error {
8080
out.TablePath = topic.Spec.Storage.TableflowV1ByobAwsSpec.GetTablePath()
8181
} else if storageType == managed {
8282
out.TablePath = topic.Spec.Storage.TableflowV1ManagedStorageSpec.GetTablePath()
83+
} else if storageType == azure {
84+
out.ProviderIntegrationId = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetProviderIntegrationId()
85+
out.ContainerName = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetContainerName()
86+
out.StorageAccountName = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetStorageAccountName()
87+
out.StorageRegion = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetStorageRegion()
88+
out.TablePath = topic.Spec.Storage.TableflowV1AzureAdlsSpec.GetTablePath()
8389
}
8490

8591
list.Add(out)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Are you sure you want to delete topic "topic-azure"? (y/n): Requested to delete topic "topic-azure".
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
topic-byob
22
topic-managed
3+
topic-azure
34
:4
45
Completion ended with directive: ShellCompDirectiveNoFileComp
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"kafka_cluster": "lkc-123456",
3+
"topic_name": "topic-azure",
4+
"enable_compaction": true,
5+
"enable_partitioning": true,
6+
"environment": "env-596",
7+
"record_failure_strategy": "SKIP",
8+
"retention_ms": "604800000",
9+
"storage_type": "AzureDataLakeStorageGen2",
10+
"provider_integration_id": "cspi-stgce89r7",
11+
"container_name": "Container1",
12+
"storage_account_name": "Acc1",
13+
"storage_region ": "US1",
14+
"suspended": false,
15+
"table_formats": "ICEBERG",
16+
"table_path": "s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId2",
17+
"phase": "RUNNING",
18+
"write_mode": "UPSERT"
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
+-------------------------+---------------------------------------------------------------------------+
2+
| Kafka Cluster | lkc-123456 |
3+
| Topic Name | topic-azure |
4+
| Enable Compaction | true |
5+
| Enable Partitioning | true |
6+
| Environment | env-596 |
7+
| Record Failure Strategy | SKIP |
8+
| Retention Ms | 604800000 |
9+
| Storage Type | AzureDataLakeStorageGen2 |
10+
| Provider Integration ID | cspi-stgce89r7 |
11+
| Container Name | Container1 |
12+
| Storage Account Name | Acc1 |
13+
| Storage Region | US1 |
14+
| Suspended | false |
15+
| Table Formats | ICEBERG |
16+
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId2 |
17+
| Phase | RUNNING |
18+
| Write Mode | UPSERT |
19+
+-------------------------+---------------------------------------------------------------------------+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Are you sure you want to disable topic "topic-azure"? (y/n): Requested to disable topic "topic-azure".

0 commit comments

Comments
 (0)