Skip to content

Commit 69ab44c

Browse files
committed
changefeedccl: support camelCase parameter names for Azure Event Hub sink configuration
Added support for camelCase parameter names (e.g., "SharedAccessKeyName") in Azure Event Hub Kafka sink configuration while maintaining backward compatibility with the existing snake_case format (e.g., "shared_access_key_name"). This change makes the parameter naming more flexible and aligned with Azure's documentation conventions. Fixes: #123494 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-38378 Release note (sql change): Added support for camelCase parameter names (e.g., "SharedAccessKeyName") in Azure Event Hub Kafka sink configuration
1 parent afef2d2 commit 69ab44c

File tree

6 files changed

+45
-5
lines changed

6 files changed

+45
-5
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func init() {
116116
changefeedbase.SinkParamClientCert,
117117
changefeedbase.SinkParamConfluentAPISecret,
118118
changefeedbase.SinkParamAzureAccessKey,
119+
changefeedbase.SinkParamAzureAccessKeyCamel,
119120
})
120121
if err != nil {
121122
return nil, err

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7046,6 +7046,16 @@ func TestChangefeedErrors(t *testing.T) {
70467046
`CREATE CHANGEFEED FOR foo INTO $1 WITH topic_in_value, format='experimental_avro'`,
70477047
`kafka://nope`,
70487048
)
7049+
sqlDB.ExpectErrWithTimeout(
7050+
t, `scheme azure-event-hub cannot specify both shared_access_key and SharedAccessKey`,
7051+
`CREATE CHANGEFEED FOR foo INTO $1`,
7052+
`azure-event-hub://nope:9999?SharedAccessKey=redacted&shared_access_key=redacted&shared_access_key_name=saspolicyhistory`,
7053+
)
7054+
sqlDB.ExpectErrWithTimeout(
7055+
t, `scheme azure-event-hub cannot specify both shared_access_key_name and SharedAccessKeyName`,
7056+
`CREATE CHANGEFEED FOR foo INTO $1`,
7057+
`azure-event-hub://nope:9999?SharedAccessKeyName=saspolicyhistory&shared_access_key_name=saspolicyhistory&SharedAccessKey=redacted`,
7058+
)
70497059

70507060
// Unordered flag required for some options, disallowed for others.
70517061
sqlDB.ExpectErrWithTimeout(t, `resolved timestamps cannot be guaranteed to be correct in unordered mode`, `CREATE CHANGEFEED FOR foo WITH resolved, unordered`)

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,11 @@ const (
251251
SinkParamConfluentAPIKey = `api_key`
252252
SinkParamConfluentAPISecret = `api_secret`
253253

254-
SinkSchemeAzureKafka = `azure-event-hub`
255-
SinkParamAzureAccessKeyName = `shared_access_key_name`
256-
SinkParamAzureAccessKey = `shared_access_key`
254+
SinkSchemeAzureKafka = `azure-event-hub`
255+
SinkParamAzureAccessKeyName = `shared_access_key_name`
256+
SinkParamAzureAccessKeyNameCamel = `SharedAccessKeyName`
257+
SinkParamAzureAccessKey = `shared_access_key`
258+
SinkParamAzureAccessKeyCamel = `SharedAccessKey`
257259

258260
RegistryParamCACert = `ca_cert`
259261
RegistryParamClientCert = `client_cert`

pkg/ccl/changefeedccl/sink_external_connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func init() {
129129
changefeedbase.SinkParamClientKey,
130130
changefeedbase.SinkParamConfluentAPISecret,
131131
changefeedbase.SinkParamAzureAccessKey,
132+
changefeedbase.SinkParamAzureAccessKeyCamel,
132133
))
133134
}
134135

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,12 +1065,33 @@ func buildAzureKafkaConfig(u *changefeedbase.SinkURL) (dialConfig kafkaDialConfi
10651065
hostName := u.Hostname()
10661066
// saslUser="$ConnectionString"
10671067
// saslPassword="Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>;
1068-
sharedAccessKeyName := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKeyName)
1068+
sharedAccessKeyNameSnake := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKeyName)
1069+
sharedAccessKeyNameCamel := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKeyNameCamel)
1070+
if sharedAccessKeyNameSnake != `` && sharedAccessKeyNameCamel != `` {
1071+
return kafkaDialConfig{}, errors.Newf(`scheme %s cannot specify both %s and %s`, u.Scheme, changefeedbase.SinkParamAzureAccessKeyName, changefeedbase.SinkParamAzureAccessKeyNameCamel)
1072+
}
1073+
1074+
sharedAccessKeyName := sharedAccessKeyNameSnake
1075+
if sharedAccessKeyName == `` {
1076+
sharedAccessKeyName = sharedAccessKeyNameCamel
1077+
}
1078+
10691079
if sharedAccessKeyName == `` {
10701080
return kafkaDialConfig{},
10711081
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKeyName /*param*/)
10721082
}
1073-
sharedAccessKey := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKey)
1083+
1084+
sharedAccessKeySnake := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKey)
1085+
sharedAccessKeyCamel := u.ConsumeParam(changefeedbase.SinkParamAzureAccessKeyCamel)
1086+
if sharedAccessKeySnake != `` && sharedAccessKeyCamel != `` {
1087+
return kafkaDialConfig{}, errors.Newf(`scheme %s cannot specify both %s and %s`, u.Scheme, changefeedbase.SinkParamAzureAccessKey, changefeedbase.SinkParamAzureAccessKeyCamel)
1088+
}
1089+
1090+
sharedAccessKey := sharedAccessKeySnake
1091+
if sharedAccessKey == `` {
1092+
sharedAccessKey = sharedAccessKeyCamel
1093+
}
1094+
10741095
if sharedAccessKey == `` {
10751096
return kafkaDialConfig{},
10761097
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKey /*param*/)

pkg/ccl/changefeedccl/sink_kafka_connection_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,11 @@ func TestAzureKafkaDefaults(t *testing.T) {
471471
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicyhistory&shared_access_key=q%2BSecretRedacted%3D",
472472
expected: makeExpectation("myeventhubs.servicebus.windows.net", "saspolicyhistory", "q+SecretRedacted="),
473473
},
474+
{
475+
name: "test camel case to snake case param names fallback",
476+
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?SharedAccessKeyName=saspolicyhistory&SharedAccessKey=q%2BSecretRedacted%3D",
477+
expected: makeExpectation("myeventhubs.servicebus.windows.net", "saspolicyhistory", "q+SecretRedacted="),
478+
},
474479
}
475480
t.Run("sarama", func(t *testing.T) {
476481
for _, tc := range cases {

0 commit comments

Comments
 (0)