Skip to content

Commit a428c45

Browse files
committed
changefeedccl: prepend file- to cloudstorage http(s) schemes
Changefeeds previously used http as their URI to align with backups. Backups no longer support http, and for changefeeds http is ambiguous, leading users to create cloudstorage sinks when they meant webhook sinks. This commit changes the scheme to file-http. Webhooks still use webhook-http. Informs: cockroachdb#98719 Release note (enterprise change): Changefeeds that create files over an http connection may now be specified via `INTO 'file-https://'` to disambiguate with `webhook-https`.
1 parent 922d6e6 commit a428c45

File tree

7 files changed

+29
-9
lines changed

7 files changed

+29
-9
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -909,8 +909,18 @@ func validateSink(
909909
return err
910910
}
911911

912-
if u.Scheme == changefeedbase.SinkSchemeCloudStorageHTTP || u.Scheme == changefeedbase.SinkSchemeCloudStorageHTTPS {
913-
p.BufferClientNotice(ctx, pgnotice.Newf(`%s sinks will emit using cloud storage semantics. For a webhook sink, prepend webhook- to the sink URI.`))
912+
ambiguousSchemes := map[string][2]string{
913+
changefeedbase.DeprecatedSinkSchemeHTTP: {changefeedbase.SinkSchemeCloudStorageHTTP, changefeedbase.SinkSchemeWebhookHTTP},
914+
changefeedbase.DeprecatedSinkSchemeHTTPS: {changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeWebhookHTTPS},
915+
}
916+
917+
if disambiguations, isAmbiguous := ambiguousSchemes[u.Scheme]; isAmbiguous {
918+
p.BufferClientNotice(ctx, pgnotice.Newf(
919+
`Interpreting deprecated URI scheme %s as %s. For webhook semantics, use %s.`,
920+
u.Scheme,
921+
disambiguations[0],
922+
disambiguations[1],
923+
))
914924
}
915925

916926
var nilOracle timestampLowerBoundOracle

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4821,7 +4821,7 @@ func TestChangefeedErrors(t *testing.T) {
48214821
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?ca_cert=Zm9v`,
48224822
)
48234823
sqlDB.ExpectErr(
4824-
t, `sink requires https`,
4824+
t, `sink requires webhook-https`,
48254825
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-http://fake-host`,
48264826
)
48274827
sqlDB.ExpectErr(

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ const (
162162
DeprecatedSinkSchemeCloudStorageNodelocal = `experimental-nodelocal`
163163
DeprecatedSinkSchemeCloudStorageS3 = `experimental-s3`
164164

165+
// DeprecatedSinkSchemeHTTP is interpreted as cloudstorage over HTTP PUT.
166+
DeprecatedSinkSchemeHTTP = `http`
167+
// DeprecatedSinkSchemeHTTPS is interpreted as cloudstorage over HTTPS PUT.
168+
DeprecatedSinkSchemeHTTPS = `https`
169+
165170
// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
166171
OptKafkaSinkConfig = `kafka_sink_config`
167172
OptPubsubSinkConfig = `pubsub_sink_config`
@@ -186,13 +191,11 @@ const (
186191
SinkParamTopicName = `topic_name`
187192
SinkSchemeCloudStorageAzure = `azure`
188193
SinkSchemeCloudStorageGCS = `gs`
189-
SinkSchemeCloudStorageHTTP = `http`
190-
SinkSchemeCloudStorageHTTPS = `https`
194+
SinkSchemeCloudStorageHTTP = `file-http`
195+
SinkSchemeCloudStorageHTTPS = `file-https`
191196
SinkSchemeCloudStorageNodelocal = `nodelocal`
192197
SinkSchemeCloudStorageS3 = `s3`
193198
SinkSchemeExperimentalSQL = `experimental-sql`
194-
SinkSchemeHTTP = `http`
195-
SinkSchemeHTTPS = `https`
196199
SinkSchemeKafka = `kafka`
197200
SinkSchemeNull = `null`
198201
SinkSchemeWebhookHTTP = `webhook-http`

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func isCloudStorageSink(u *url.URL) bool {
4949
changefeedbase.SinkSchemeCloudStorageNodelocal, changefeedbase.SinkSchemeCloudStorageHTTP,
5050
changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeCloudStorageAzure:
5151
return true
52+
// During the deprecation period, we need to keep parsing these as cloudstorage for backwards
53+
// compatibility. Afterwards we'll either remove them or move them to webhook.
54+
case changefeedbase.DeprecatedSinkSchemeHTTP, changefeedbase.DeprecatedSinkSchemeHTTPS:
55+
return true
5256
default:
5357
return false
5458
}
@@ -375,6 +379,7 @@ func makeCloudStorageSink(
375379
}
376380
}
377381
u.Scheme = strings.TrimPrefix(u.Scheme, `experimental-`)
382+
u.Scheme = strings.TrimPrefix(u.Scheme, `file-`)
378383

379384
sinkID := atomic.AddInt64(&cloudStorageSinkIDAtomic, 1)
380385
sessID, err := generateChangefeedSessionID()

pkg/ccl/changefeedccl/sink_external_connection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
9292
GcpScheme: connectionpb.ConnectionProvider_gcpubsub,
9393
changefeedbase.SinkSchemeCloudStorageHTTP: connectionpb.ConnectionProvider_http,
9494
changefeedbase.SinkSchemeCloudStorageHTTPS: connectionpb.ConnectionProvider_https,
95+
changefeedbase.DeprecatedSinkSchemeHTTP: connectionpb.ConnectionProvider_http,
96+
changefeedbase.DeprecatedSinkSchemeHTTPS: connectionpb.ConnectionProvider_https,
9597
changefeedbase.SinkSchemeCloudStorageNodelocal: connectionpb.ConnectionProvider_nodelocal,
9698
changefeedbase.SinkSchemeCloudStorageS3: connectionpb.ConnectionProvider_s3,
9799
changefeedbase.SinkSchemeKafka: connectionpb.ConnectionProvider_kafka,

pkg/ccl/changefeedccl/sink_webhook.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func makeDeprecatedWebhookSink(
233233
mb metricsRecorderBuilder,
234234
) (Sink, error) {
235235
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
236-
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
236+
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
237237
}
238238
u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)
239239

pkg/ccl/changefeedccl/sink_webhook_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func validateWebhookOpts(
229229
u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions,
230230
) error {
231231
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
232-
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
232+
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
233233
}
234234

235235
switch encodingOpts.Format {

0 commit comments

Comments
 (0)