Skip to content

Commit f2982d2

Browse files
[CLI-3654] Tableflow Catalog Sync/Integration Status (#3183)
1 parent d3c52c8 commit f2982d2

File tree

8 files changed

+169
-25
lines changed

8 files changed

+169
-25
lines changed

internal/tableflow/command_topic.go

Lines changed: 76 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,25 @@ const (
1919
)
2020

2121
type topicOut struct {
22-
KafkaCluster string `human:"Kafka Cluster" serialized:"kafka_cluster"`
23-
TopicName string `human:"Topic Name" serialized:"topic_name"`
24-
EnableCompaction bool `human:"Enable Compaction" serialized:"enable_compaction"`
25-
EnablePartitioning bool `human:"Enable Partitioning" serialized:"enable_partitioning"`
26-
Environment string `human:"Environment" serialized:"environment"`
27-
RecordFailureStrategy string `human:"Record Failure Strategy" serialized:"record_failure_strategy"`
28-
RetentionMs string `human:"Retention Ms" serialized:"retention_ms"`
29-
StorageType string `human:"Storage Type" serialized:"storage_type"`
30-
ProviderIntegrationId string `human:"Provider Integration ID,omitempty" serialized:"provider_integration_id,omitempty"`
31-
BucketName string `human:"Bucket Name,omitempty" serialized:"bucket_name,omitempty"`
32-
BucketRegion string `human:"Bucket Region,omitempty" serialized:"bucket_region,omitempty"`
33-
Suspended bool `human:"Suspended" serialized:"suspended"`
34-
TableFormats string `human:"Table Formats" serialized:"table_formats"`
35-
TablePath string `human:"Table Path" serialized:"table_path"`
36-
Phase string `human:"Phase" serialized:"phase"`
37-
ErrorMessage string `human:"Error Message,omitempty" serialized:"error_message,omitempty"`
38-
WriteMode string `human:"Write Mode,omitempty" serialized:"write_mode,omitempty"`
22+
KafkaCluster string `human:"Kafka Cluster" serialized:"kafka_cluster"`
23+
TopicName string `human:"Topic Name" serialized:"topic_name"`
24+
EnableCompaction bool `human:"Enable Compaction" serialized:"enable_compaction"`
25+
EnablePartitioning bool `human:"Enable Partitioning" serialized:"enable_partitioning"`
26+
Environment string `human:"Environment" serialized:"environment"`
27+
RecordFailureStrategy string `human:"Record Failure Strategy" serialized:"record_failure_strategy"`
28+
RetentionMs string `human:"Retention Ms" serialized:"retention_ms"`
29+
StorageType string `human:"Storage Type" serialized:"storage_type"`
30+
ProviderIntegrationId string `human:"Provider Integration ID,omitempty" serialized:"provider_integration_id,omitempty"`
31+
BucketName string `human:"Bucket Name,omitempty" serialized:"bucket_name,omitempty"`
32+
BucketRegion string `human:"Bucket Region,omitempty" serialized:"bucket_region,omitempty"`
33+
Suspended bool `human:"Suspended" serialized:"suspended"`
34+
TableFormats string `human:"Table Formats" serialized:"table_formats"`
35+
TablePath string `human:"Table Path" serialized:"table_path"`
36+
Phase string `human:"Phase" serialized:"phase"`
37+
CatalogSyncStatus map[string]string `human:"Catalog Sync Status,omitempty" serialized:"catalog_sync_status,omitempty"`
38+
FailingTableFormat map[string]string `human:"Failing Table Format,omitempty" serialized:"failing_table_format,omitempty"`
39+
ErrorMessage string `human:"Error Message,omitempty" serialized:"error_message,omitempty"`
40+
WriteMode string `human:"Write Mode,omitempty" serialized:"write_mode,omitempty"`
3941
}
4042

4143
func (c *command) newTopicCommand() *cobra.Command {
@@ -106,6 +108,56 @@ func getStorageType(topic tableflowv1.TableflowV1TableflowTopic) (string, error)
106108
return "", fmt.Errorf(errors.CorruptedNetworkResponseErrorMsg, "config")
107109
}
108110

111+
// include error message in output if Sync Status is FAILED
112+
func getDescribeCatalogSyncStatuses(statuses []tableflowv1.TableflowV1CatalogSyncStatus) map[string]string {
113+
result := make(map[string]string)
114+
for _, s := range statuses {
115+
catalogIntegrationId := "id-unknown"
116+
if s.CatalogIntegrationId != nil {
117+
catalogIntegrationId = *s.CatalogIntegrationId
118+
}
119+
syncStatus := "status-unknown"
120+
if s.SyncStatus != nil {
121+
syncStatus = *s.SyncStatus
122+
}
123+
124+
if syncStatus == "FAILED" && s.ErrorMessage.IsSet() {
125+
if v := s.ErrorMessage.Get(); v != nil && *v != "" {
126+
syncStatus = fmt.Sprintf("%s-%s", syncStatus, *v)
127+
}
128+
}
129+
130+
result[catalogIntegrationId] = syncStatus
131+
}
132+
return result
133+
}
134+
135+
// does not include error message in output if Sync Status is FAILED, to maintain readability
136+
func getListCatalogSyncStatuses(statuses []tableflowv1.TableflowV1CatalogSyncStatus) map[string]string {
137+
result := make(map[string]string)
138+
for _, s := range statuses {
139+
catalogIntegrationId := "id-unknown"
140+
if s.CatalogIntegrationId != nil {
141+
catalogIntegrationId = *s.CatalogIntegrationId
142+
}
143+
syncStatus := "status-unknown"
144+
if s.SyncStatus != nil {
145+
syncStatus = *s.SyncStatus
146+
}
147+
148+
result[catalogIntegrationId] = syncStatus
149+
}
150+
return result
151+
}
152+
153+
func getFailingTableFormats(formats []tableflowv1.TableflowV1TableflowTopicStatusFailingTableFormats) map[string]string {
154+
result := make(map[string]string)
155+
for _, f := range formats {
156+
result[f.Format] = f.ErrorMessage
157+
}
158+
return result
159+
}
160+
109161
func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowTopic) error {
110162
storageType, err := getStorageType(topic)
111163
if err != nil {
@@ -119,18 +171,23 @@ func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowT
119171
return fmt.Errorf(errors.CorruptedNetworkResponseErrorMsg, "status not found")
120172
}
121173

174+
strStatus := getDescribeCatalogSyncStatuses(topic.Status.GetCatalogSyncStatuses())
175+
strFormats := getFailingTableFormats(topic.Status.GetFailingTableFormats())
176+
122177
out := &topicOut{
123178
KafkaCluster: topic.GetSpec().KafkaCluster.GetId(),
124179
TopicName: topic.Spec.GetDisplayName(),
125180
EnableCompaction: topic.GetSpec().Config.GetEnableCompaction(), // should be read-only & true
126181
EnablePartitioning: topic.GetSpec().Config.GetEnablePartitioning(), // should be read-only & true
127-
TableFormats: strings.Join(topic.Spec.GetTableFormats(), ""),
182+
TableFormats: strings.Join(topic.Spec.GetTableFormats(), ", "),
128183
Environment: topic.GetSpec().Environment.GetId(),
129184
RetentionMs: topic.GetSpec().Config.GetRetentionMs(),
130185
RecordFailureStrategy: topic.GetSpec().Config.GetRecordFailureStrategy(),
131186
StorageType: storageType,
132187
Suspended: topic.Spec.GetSuspended(),
133188
Phase: topic.Status.GetPhase(),
189+
CatalogSyncStatus: strStatus,
190+
FailingTableFormat: strFormats,
134191
ErrorMessage: topic.Status.GetErrorMessage(),
135192
WriteMode: topic.Status.GetWriteMode(),
136193
}
@@ -146,5 +203,5 @@ func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowT
146203

147204
table := output.NewTable(cmd)
148205
table.Add(out)
149-
return table.Print()
206+
return table.PrintWithAutoWrap(false)
150207
}

internal/tableflow/command_topic_list.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,23 @@ func (c *command) list(cmd *cobra.Command, _ []string) error {
5050
return err
5151
}
5252

53+
strStatus := getListCatalogSyncStatuses(topic.Status.GetCatalogSyncStatuses())
54+
strFormats := getFailingTableFormats(topic.Status.GetFailingTableFormats())
55+
5356
out := &topicOut{
5457
KafkaCluster: topic.GetSpec().KafkaCluster.GetId(),
5558
TopicName: topic.Spec.GetDisplayName(),
5659
EnableCompaction: topic.GetSpec().Config.GetEnableCompaction(), // should be read-only & true
5760
EnablePartitioning: topic.GetSpec().Config.GetEnablePartitioning(), // should be read-only & true
58-
TableFormats: strings.Join(topic.Spec.GetTableFormats(), ""),
61+
TableFormats: strings.Join(topic.Spec.GetTableFormats(), ", "),
5962
Environment: topic.GetSpec().Environment.GetId(),
6063
RetentionMs: topic.GetSpec().Config.GetRetentionMs(),
6164
RecordFailureStrategy: topic.GetSpec().Config.GetRecordFailureStrategy(),
6265
StorageType: storageType,
6366
Suspended: topic.Spec.GetSuspended(),
6467
Phase: topic.Status.GetPhase(),
68+
CatalogSyncStatus: strStatus,
69+
FailingTableFormat: strFormats,
6570
ErrorMessage: topic.Status.GetErrorMessage(),
6671
WriteMode: topic.Status.GetWriteMode(),
6772
}
@@ -78,5 +83,5 @@ func (c *command) list(cmd *cobra.Command, _ []string) error {
7883
list.Add(out)
7984
}
8085

81-
return list.Print()
86+
return list.PrintWithAutoWrap(false)
8287
}

test/fixtures/output/tableflow/topic/describe-topic-json.golden

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,13 @@
1414
"table_formats": "ICEBERG",
1515
"table_path": "s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId",
1616
"phase": "RUNNING",
17+
"catalog_sync_status": {
18+
"cat-id-123": "SUCCESS",
19+
"cat-id-456": "FAILED-Connection timeout"
20+
},
21+
"failing_table_format": {
22+
"DELTA": "Connection timeout ",
23+
"ICEBERG": "Schema validation failed"
24+
},
1725
"write_mode": "UPSERT"
1826
}

test/fixtures/output/tableflow/topic/describe-topic.golden

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,9 @@
1414
| Table Formats | ICEBERG |
1515
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId |
1616
| Phase | RUNNING |
17+
| Catalog Sync Status | cat-id-123=SUCCESS |
18+
| | cat-id-456=FAILED-Connection timeout |
19+
| Failing Table Format | DELTA=Connection timeout |
20+
| | ICEBERG=Schema validation failed |
1721
| Write Mode | UPSERT |
1822
+-------------------------+--------------------------------------------------------------------------+

test/fixtures/output/tableflow/topic/list-topic-json.golden

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
"table_formats": "ICEBERG",
1616
"table_path": "s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId",
1717
"phase": "RUNNING",
18+
"catalog_sync_status": {
19+
"cat-id-123": "SUCCESS",
20+
"cat-id-456": "FAILED"
21+
},
22+
"failing_table_format": {
23+
"DELTA": "Connection timeout ",
24+
"ICEBERG": "Schema validation failed"
25+
},
1826
"write_mode": "UPSERT"
1927
},
2028
{
@@ -30,6 +38,14 @@
3038
"table_formats": "DELTA",
3139
"table_path": "s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId",
3240
"phase": "RUNNING",
41+
"catalog_sync_status": {
42+
"cat-id-123": "SUCCESS",
43+
"cat-id-456": "FAILED"
44+
},
45+
"failing_table_format": {
46+
"DELTA": "Connection timeout ",
47+
"ICEBERG": "Schema validation failed"
48+
},
3349
"write_mode": "APPEND"
3450
}
3551
]
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
Kafka Cluster | Topic Name | Enable Compaction | Enable Partitioning | Environment | Record Failure Strategy | Retention Ms | Storage Type | Provider Integration ID | Bucket Name | Bucket Region | Suspended | Table Formats | Table Path | Phase | Error Message | Write Mode
2-
----------------+---------------+-------------------+---------------------+-------------+-------------------------+--------------+--------------+-------------------------+-------------+---------------+-----------+---------------+--------------------------------------------------------------------------+---------+---------------+-------------
3-
lkc-123456 | topic-byob | true | true | env-596 | SKIP | 604800000 | BYOS | cspi-stgce89r7 | bucket_1 | us-east-1 | false | ICEBERG | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId | RUNNING | | UPSERT
4-
lkc-123456 | topic-managed | true | true | env-596 | SUSPEND | 604800000 | MANAGED | | | | false | DELTA | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId | RUNNING | | APPEND
1+
Kafka Cluster | Topic Name | Enable Compaction | Enable Partitioning | Environment | Record Failure Strategy | Retention Ms | Storage Type | Provider Integration ID | Bucket Name | Bucket Region | Suspended | Table Formats | Table Path | Phase | Catalog Sync Status | Failing Table Format | Error Message | Write Mode
2+
----------------+---------------+-------------------+---------------------+-------------+-------------------------+--------------+--------------+-------------------------+-------------+---------------+-----------+---------------+--------------------------------------------------------------------------+---------+---------------------+----------------------------------+---------------+-------------
3+
lkc-123456 | topic-byob | true | true | env-596 | SKIP | 604800000 | BYOS | cspi-stgce89r7 | bucket_1 | us-east-1 | false | ICEBERG | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId | RUNNING | cat-id-123=SUCCESS | DELTA=Connection timeout | | UPSERT
4+
| | | | | | | | | | | | | | | cat-id-456=FAILED | ICEBERG=Schema validation failed | |
5+
lkc-123456 | topic-managed | true | true | env-596 | SUSPEND | 604800000 | MANAGED | | | | false | DELTA | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId | RUNNING | cat-id-123=SUCCESS | DELTA=Connection timeout | | APPEND
6+
| | | | | | | | | | | | | | | cat-id-456=FAILED | ICEBERG=Schema validation failed | |

test/fixtures/output/tableflow/topic/update-topic.golden

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,9 @@
1414
| Table Formats | ICEBERG |
1515
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId |
1616
| Phase | RUNNING |
17+
| Catalog Sync Status | cat-id-123=SUCCESS |
18+
| | cat-id-456=FAILED-Connection timeout |
19+
| Failing Table Format | DELTA=Connection timeout |
20+
| | ICEBERG=Schema validation failed |
1721
| Write Mode | UPSERT |
1822
+-------------------------+--------------------------------------------------------------------------+

test/test-server/tableflow_handlers.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,30 @@ func getTopicByob(display_name, environmentId, clusterId string) tableflowv1.Tab
191191
Phase: tableflowv1.PtrString("RUNNING"),
192192
//ErrorMessage: tableflowv1.PtrString(""),
193193
WriteMode: "UPSERT",
194+
CatalogSyncStatuses: &[]tableflowv1.TableflowV1CatalogSyncStatus{
195+
{
196+
CatalogIntegrationId: tableflowv1.PtrString("cat-id-123"),
197+
CatalogType: tableflowv1.PtrString("TYPE-1"),
198+
SyncStatus: tableflowv1.PtrString("SUCCESS"),
199+
ErrorMessage: tableflowv1.NullableString{},
200+
},
201+
{
202+
CatalogIntegrationId: tableflowv1.PtrString("cat-id-456"),
203+
CatalogType: tableflowv1.PtrString("TYPE-2"),
204+
SyncStatus: tableflowv1.PtrString("FAILED"),
205+
ErrorMessage: *tableflowv1.NewNullableString(tableflowv1.PtrString("Connection timeout")),
206+
},
207+
},
208+
FailingTableFormats: &[]tableflowv1.TableflowV1TableflowTopicStatusFailingTableFormats{
209+
{
210+
Format: "ICEBERG",
211+
ErrorMessage: "Schema validation failed",
212+
},
213+
{
214+
Format: "DELTA",
215+
ErrorMessage: "Connection timeout ",
216+
},
217+
},
194218
},
195219
}
196220
}
@@ -220,6 +244,30 @@ func getTopicManaged(display_name, environmentId, clusterId string) tableflowv1.
220244
Phase: tableflowv1.PtrString("RUNNING"),
221245
//ErrorMessage: tableflowv1.PtrString(""),
222246
WriteMode: "APPEND",
247+
CatalogSyncStatuses: &[]tableflowv1.TableflowV1CatalogSyncStatus{
248+
{
249+
CatalogIntegrationId: tableflowv1.PtrString("cat-id-123"),
250+
CatalogType: tableflowv1.PtrString("TYPE-1"),
251+
SyncStatus: tableflowv1.PtrString("SUCCESS"),
252+
ErrorMessage: tableflowv1.NullableString{},
253+
},
254+
{
255+
CatalogIntegrationId: tableflowv1.PtrString("cat-id-456"),
256+
CatalogType: tableflowv1.PtrString("TYPE-2"),
257+
SyncStatus: tableflowv1.PtrString("FAILED"),
258+
ErrorMessage: *tableflowv1.NewNullableString(tableflowv1.PtrString("Connection timeout")),
259+
},
260+
},
261+
FailingTableFormats: &[]tableflowv1.TableflowV1TableflowTopicStatusFailingTableFormats{
262+
{
263+
Format: "ICEBERG",
264+
ErrorMessage: "Schema validation failed",
265+
},
266+
{
267+
Format: "DELTA",
268+
ErrorMessage: "Connection timeout ",
269+
},
270+
},
223271
},
224272
}
225273
}

0 commit comments

Comments
 (0)