Skip to content

Commit 14b42e0

Browse files
authored
[APIE-537] Add "Error Handling" field and flag for tableflow topic (#3180)
1 parent 5507072 commit 14b42e0

21 files changed

+328
-12
lines changed

internal/tableflow/command_topic.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import (
1616
const (
1717
byos = "BYOS"
1818
managed = "MANAGED"
19+
20+
suspend = "SUSPEND"
21+
skip = "SKIP"
22+
log = "LOG"
1923
)
2024

2125
type topicOut struct {
@@ -25,6 +29,8 @@ type topicOut struct {
2529
EnablePartitioning bool `human:"Enable Partitioning" serialized:"enable_partitioning"`
2630
Environment string `human:"Environment" serialized:"environment"`
2731
RecordFailureStrategy string `human:"Record Failure Strategy" serialized:"record_failure_strategy"`
32+
ErrorHandling string `human:"Error Handling,omitempty" serialized:"error_handling,omitempty"`
33+
LogTarget string `human:"Log Target,omitempty" serialized:"log_target,omitempty"`
2834
RetentionMs string `human:"Retention Ms" serialized:"retention_ms"`
2935
StorageType string `human:"Storage Type" serialized:"storage_type"`
3036
ProviderIntegrationId string `human:"Provider Integration ID,omitempty" serialized:"provider_integration_id,omitempty"`
@@ -108,6 +114,20 @@ func getStorageType(topic tableflowv1.TableflowV1TableflowTopic) (string, error)
108114
return "", fmt.Errorf(errors.CorruptedNetworkResponseErrorMsg, "config")
109115
}
110116

117+
func getErrorHandlingMode(topic tableflowv1.TableflowV1TableflowTopic) string {
118+
if topic.Spec.GetConfig().ErrorHandling != nil {
119+
if topic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingSuspend != nil {
120+
return suspend
121+
} else if topic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingSkip != nil {
122+
return skip
123+
} else if topic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingLog != nil {
124+
return log
125+
}
126+
}
127+
128+
return ""
129+
}
130+
111131
// include error message in output if Sync Status is FAILED
112132
func getDescribeCatalogSyncStatuses(statuses []tableflowv1.TableflowV1CatalogSyncStatus) map[string]string {
113133
result := make(map[string]string)
@@ -183,6 +203,8 @@ func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowT
183203
Environment: topic.GetSpec().Environment.GetId(),
184204
RetentionMs: topic.GetSpec().Config.GetRetentionMs(),
185205
RecordFailureStrategy: topic.GetSpec().Config.GetRecordFailureStrategy(),
206+
ErrorHandling: getErrorHandlingMode(topic),
207+
LogTarget: topic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingLog.GetTarget(), // this Get function will return empty string if the ErrorHandling is not LOG
186208
StorageType: storageType,
187209
Suspended: topic.Spec.GetSuspended(),
188210
Phase: topic.Status.GetPhase(),
@@ -205,3 +227,8 @@ func printTopicTable(cmd *cobra.Command, topic tableflowv1.TableflowV1TableflowT
205227
table.Add(out)
206228
return table.PrintWithAutoWrap(false)
207229
}
230+
231+
func addErrorHandlingFlags(cmd *cobra.Command) {
232+
cmd.Flags().String("error-handling", "", "Specify the error handling strategy, one of SUSPEND, SKIP, or LOG.")
233+
cmd.Flags().String("log-target", "", "Specify the target topic for the LOG error handling strategy.")
234+
}

internal/tableflow/command_topic_enable.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,15 @@ func (c *command) newTopicEnableCommand() *cobra.Command {
3939
cmd.Flags().String("provider-integration", "", "Specify the provider integration id.")
4040
cmd.Flags().String("bucket-name", "", "Specify the name of the AWS S3 bucket.")
4141
cmd.Flags().String("table-formats", "ICEBERG", "Specify the table formats, one of DELTA or ICEBERG.")
42-
cmd.Flags().String("record-failure-strategy", "SUSPEND", "Specify the record failure strategy, one of SUSPEND or SKIP.")
42+
addErrorHandlingFlags(cmd)
4343

4444
pcmd.AddContextFlag(cmd, c.CLICommand)
4545
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
4646
pcmd.AddOutputFlag(cmd)
4747

48+
// Deprecated
49+
cmd.Flags().String("record-failure-strategy", "", "DEPRECATED: Specify the record failure strategy, one of SUSPEND or SKIP.")
50+
4851
return cmd
4952
}
5053

@@ -71,6 +74,16 @@ func (c *command) enable(cmd *cobra.Command, args []string) error {
7174
return err
7275
}
7376

77+
errorHandling, err := cmd.Flags().GetString("error-handling")
78+
if err != nil {
79+
return err
80+
}
81+
82+
logTarget, err := cmd.Flags().GetString("log-target")
83+
if err != nil {
84+
return err
85+
}
86+
7487
storageType, err := cmd.Flags().GetString("storage-type")
7588
if err != nil {
7689
return err
@@ -103,8 +116,36 @@ func (c *command) enable(cmd *cobra.Command, args []string) error {
103116
}
104117

105118
createTopic.Spec.Config = &tableflowv1.TableflowV1TableFlowTopicConfigsSpec{
106-
RetentionMs: tableflowv1.PtrString(retentionMs),
107-
RecordFailureStrategy: tableflowv1.PtrString(recordFailureStrategy),
119+
RetentionMs: tableflowv1.PtrString(retentionMs),
120+
}
121+
122+
if cmd.Flags().Changed("record-failure-strategy") {
123+
createTopic.Spec.Config.SetRecordFailureStrategy(recordFailureStrategy)
124+
}
125+
126+
if cmd.Flags().Changed("error-handling") {
127+
if strings.ToUpper(errorHandling) == suspend {
128+
createTopic.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
129+
TableflowV1ErrorHandlingSuspend: &tableflowv1.TableflowV1ErrorHandlingSuspend{
130+
Mode: suspend,
131+
},
132+
}
133+
} else if strings.ToUpper(errorHandling) == skip {
134+
createTopic.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
135+
TableflowV1ErrorHandlingSkip: &tableflowv1.TableflowV1ErrorHandlingSkip{
136+
Mode: skip,
137+
},
138+
}
139+
} else if strings.ToUpper(errorHandling) == log {
140+
createTopic.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
141+
TableflowV1ErrorHandlingLog: &tableflowv1.TableflowV1ErrorHandlingLog{
142+
Mode: log,
143+
},
144+
}
145+
if cmd.Flags().Changed("log-target") {
146+
createTopic.Spec.Config.ErrorHandling.TableflowV1ErrorHandlingLog.SetTarget(logTarget)
147+
}
148+
}
108149
}
109150

110151
if strings.ToUpper(storageType) == "BYOS" {

internal/tableflow/command_topic_list.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func (c *command) list(cmd *cobra.Command, _ []string) error {
6262
Environment: topic.GetSpec().Environment.GetId(),
6363
RetentionMs: topic.GetSpec().Config.GetRetentionMs(),
6464
RecordFailureStrategy: topic.GetSpec().Config.GetRecordFailureStrategy(),
65+
ErrorHandling: getErrorHandlingMode(topic),
66+
LogTarget: topic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingLog.GetTarget(), // this Get function will return empty string if the ErrorHandling is not LOG
6567
StorageType: storageType,
6668
Suspended: topic.Spec.GetSuspended(),
6769
Phase: topic.Status.GetPhase(),

internal/tableflow/command_topic_update.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tableflow
22

33
import (
44
"fmt"
5+
"strings"
56

67
"github.com/spf13/cobra"
78

@@ -31,12 +32,15 @@ func (c *command) newTopicUpdateCommand() *cobra.Command {
3132

3233
cmd.Flags().String("retention-ms", "", "Specify the Tableflow table retention time in milliseconds.")
3334
cmd.Flags().String("table-formats", "", "Specify the table formats, one of DELTA or ICEBERG.")
34-
cmd.Flags().String("record-failure-strategy", "SUSPEND", "Specify the record failure strategy, one of SUSPEND or SKIP.")
35+
addErrorHandlingFlags(cmd)
3536

3637
pcmd.AddContextFlag(cmd, c.CLICommand)
3738
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
3839
pcmd.AddOutputFlag(cmd)
3940

41+
// Deprecated
42+
cmd.Flags().String("record-failure-strategy", "", "DEPRECATED: Specify the record failure strategy, one of SUSPEND or SKIP.")
43+
4044
return cmd
4145
}
4246

@@ -67,6 +71,16 @@ func (c *command) update(cmd *cobra.Command, args []string) error {
6771
return err
6872
}
6973

74+
errorHandling, err := cmd.Flags().GetString("error-handling")
75+
if err != nil {
76+
return err
77+
}
78+
79+
logTarget, err := cmd.Flags().GetString("log-target")
80+
if err != nil {
81+
return err
82+
}
83+
7084
topicUpdate := tableflowv1.TableflowV1TableflowTopicUpdate{
7185
Spec: &tableflowv1.TableflowV1TableflowTopicSpecUpdate{
7286
Environment: &tableflowv1.GlobalObjectReference{Id: environmentId},
@@ -87,6 +101,48 @@ func (c *command) update(cmd *cobra.Command, args []string) error {
87101
topicUpdate.Spec.Config.SetRecordFailureStrategy(recordFailureStrategy)
88102
}
89103

104+
if cmd.Flags().Changed("error-handling") {
105+
if strings.ToUpper(errorHandling) == suspend {
106+
topicUpdate.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
107+
TableflowV1ErrorHandlingSuspend: &tableflowv1.TableflowV1ErrorHandlingSuspend{
108+
Mode: suspend,
109+
},
110+
}
111+
} else if strings.ToUpper(errorHandling) == skip {
112+
topicUpdate.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
113+
TableflowV1ErrorHandlingSkip: &tableflowv1.TableflowV1ErrorHandlingSkip{
114+
Mode: skip,
115+
},
116+
}
117+
} else if strings.ToUpper(errorHandling) == log {
118+
topicUpdate.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
119+
TableflowV1ErrorHandlingLog: &tableflowv1.TableflowV1ErrorHandlingLog{
120+
Mode: log,
121+
},
122+
}
123+
if cmd.Flags().Changed("log-target") {
124+
topicUpdate.Spec.Config.ErrorHandling.TableflowV1ErrorHandlingLog.SetTarget(logTarget)
125+
}
126+
}
127+
}
128+
129+
if cmd.Flags().Changed("log-target") && !cmd.Flags().Changed("error-handling") {
130+
// We must check for the edge case where the current error handling mode is *not* LOG, but the user is trying to update the log target anyway
131+
// We should not assume that the user wants to change the mode to LOG, so we check the current mode and do nothing if it is not LOG
132+
currentTopic, err := c.V2Client.GetTableflowTopic(environmentId, cluster.GetId(), args[0])
133+
if err != nil {
134+
return err
135+
}
136+
if strings.ToUpper(currentTopic.GetSpec().Config.GetErrorHandling().TableflowV1ErrorHandlingLog.GetMode()) == log {
137+
topicUpdate.Spec.Config.ErrorHandling = &tableflowv1.TableflowV1TableFlowTopicConfigsSpecErrorHandlingOneOf{
138+
TableflowV1ErrorHandlingLog: &tableflowv1.TableflowV1ErrorHandlingLog{
139+
Mode: log,
140+
Target: tableflowv1.PtrString(logTarget),
141+
},
142+
}
143+
}
144+
}
145+
90146
topic, err := c.V2Client.UpdateTableflowTopic(args[0], topicUpdate)
91147
if err != nil {
92148
return fmt.Errorf("Error with updating Tableflow topic: %w", err)

test/fixtures/output/tableflow/topic/describe-topic-json.golden

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"enable_partitioning": true,
66
"environment": "env-596",
77
"record_failure_strategy": "SKIP",
8+
"error_handling": "SKIP",
89
"retention_ms": "604800000",
910
"storage_type": "BYOS",
1011
"provider_integration_id": "cspi-stgce89r7",

test/fixtures/output/tableflow/topic/describe-topic.golden

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
| Enable Partitioning | true |
66
| Environment | env-596 |
77
| Record Failure Strategy | SKIP |
8+
| Error Handling | SKIP |
89
| Retention Ms | 604800000 |
910
| Storage Type | BYOS |
1011
| Provider Integration ID | cspi-stgce89r7 |

test/fixtures/output/tableflow/topic/enable-help.golden

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ Flags:
2222
--provider-integration string Specify the provider integration id.
2323
--bucket-name string Specify the name of the AWS S3 bucket.
2424
--table-formats string Specify the table formats, one of DELTA or ICEBERG. (default "ICEBERG")
25-
--record-failure-strategy string Specify the record failure strategy, one of SUSPEND or SKIP. (default "SUSPEND")
25+
--error-handling string Specify the error handling strategy, one of SUSPEND, SKIP, or LOG.
26+
--log-target string Specify the target topic for the LOG error handling strategy.
2627
--context string CLI context name.
2728
--environment string Environment ID.
2829
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")
30+
--record-failure-strategy string DEPRECATED: Specify the record failure strategy, one of SUSPEND or SKIP.
2931

3032
Global Flags:
3133
-h, --help Show help for this command.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
+-------------------------+--------------------------------------------------------------------------+
2+
| Kafka Cluster | lkc-123456 |
3+
| Topic Name | topic-managed |
4+
| Enable Compaction | true |
5+
| Enable Partitioning | false |
6+
| Environment | |
7+
| Record Failure Strategy | |
8+
| Error Handling | LOG |
9+
| Log Target | log_topic |
10+
| Retention Ms | 604800000 |
11+
| Storage Type | MANAGED |
12+
| Suspended | false |
13+
| Table Formats | ICEBERG |
14+
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId |
15+
| Phase | RUNNING |
16+
| Write Mode | APPEND |
17+
+-------------------------+--------------------------------------------------------------------------+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
+-------------------------+--------------------------------------------------------------------------+
2+
| Kafka Cluster | lkc-123456 |
3+
| Topic Name | topic-managed |
4+
| Enable Compaction | true |
5+
| Enable Partitioning | false |
6+
| Environment | |
7+
| Record Failure Strategy | |
8+
| Error Handling | SKIP |
9+
| Retention Ms | 604800000 |
10+
| Storage Type | MANAGED |
11+
| Suspended | false |
12+
| Table Formats | ICEBERG |
13+
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId |
14+
| Phase | RUNNING |
15+
| Write Mode | APPEND |
16+
+-------------------------+--------------------------------------------------------------------------+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
+-------------------------+--------------------------------------------------------------------------+
2+
| Kafka Cluster | lkc-123456 |
3+
| Topic Name | topic-managed |
4+
| Enable Compaction | true |
5+
| Enable Partitioning | false |
6+
| Environment | |
7+
| Record Failure Strategy | |
8+
| Error Handling | SUSPEND |
9+
| Retention Ms | 604800000 |
10+
| Storage Type | MANAGED |
11+
| Suspended | false |
12+
| Table Formats | ICEBERG |
13+
| Table Path | s3://dummy-bucket-name-1//10011010/11101100/org-1/env-2/lkc-3/v1/tableId |
14+
| Phase | RUNNING |
15+
| Write Mode | APPEND |
16+
+-------------------------+--------------------------------------------------------------------------+

0 commit comments

Comments
 (0)