@@ -24,7 +24,6 @@ import (
24
24
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
25
25
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
26
26
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
27
- "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
28
27
"github.com/cockroachdb/cockroach/pkg/testutils"
29
28
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
30
29
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -166,69 +165,63 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
166
165
defer leaktest .AfterTest (t )()
167
166
defer log .Scope (t ).Close (t )
168
167
169
- s , stopServer := makeServer (t )
170
- defer stopServer ()
168
+ testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
169
+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
170
+ sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
171
171
172
- knobs := s .TestingKnobs .
173
- DistSQL .(* execinfra.TestingKnobs ).
174
- Changefeed .(* TestingKnobs )
175
- knobs .WrapSink = func (s Sink , _ jobspb.JobID ) Sink {
176
- if _ , ok := s .(* externalConnectionKafkaSink ); ok {
177
- return s
172
+ const apiSecret = "bar"
173
+ const certSecret = "Zm9v"
174
+ for _ , tc := range []struct {
175
+ name string
176
+ uri string
177
+ expectedSinkURI string
178
+ expectedDescription string
179
+ }{
180
+ {
181
+ name : "api_secret" ,
182
+ uri : fmt .Sprintf ("confluent-cloud://nope?api_key=fee&api_secret=%s" , apiSecret ),
183
+ },
184
+ {
185
+ name : "sasl_password" ,
186
+ uri : fmt .Sprintf ("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa" , apiSecret ),
187
+ },
188
+ {
189
+ name : "ca_cert" ,
190
+ uri : fmt .Sprintf ("kafka://nope?ca_cert=%s&tls_enabled=true" , certSecret ),
191
+ },
192
+ {
193
+ name : "shared_access_key" ,
194
+ uri : fmt .Sprintf ("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain" , apiSecret ),
195
+ },
196
+ } {
197
+ t .Run (tc .name , func (t * testing.T ) {
198
+ foo := feed (t , f , fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri ),
199
+ optOutOfMetamorphicEnrichedEnvelope {reason : "compares text of changefeed statement" })
200
+ defer closeFeed (t , foo )
201
+
202
+ efoo , ok := foo .(cdctest.EnterpriseTestFeed )
203
+ require .True (t , ok )
204
+ jobID := efoo .JobID ()
205
+
206
+ var sinkURI , description string
207
+ sqlDB .QueryRow (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]" , jobID ).Scan (& sinkURI , & description )
208
+ replacer := strings .NewReplacer (apiSecret , "redacted" , certSecret , "redacted" )
209
+ expectedSinkURI := replacer .Replace (tc .uri )
210
+ expectedDescription := replacer .Replace (fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri ))
211
+ require .Equal (t , expectedSinkURI , sinkURI )
212
+ require .Equal (t , expectedDescription , description )
213
+ })
178
214
}
179
- return & externalConnectionKafkaSink {sink : s , ignoreDialError : true }
180
- }
181
-
182
- sqlDB := sqlutils .MakeSQLRunner (s .DB )
183
- sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
184
-
185
- const apiSecret = "bar"
186
- const certSecret = "Zm9v"
187
- for _ , tc := range []struct {
188
- name string
189
- uri string
190
- expectedSinkURI string
191
- expectedDescription string
192
- }{
193
- {
194
- name : "api_secret" ,
195
- uri : fmt .Sprintf ("confluent-cloud://nope?api_key=fee&api_secret=%s" , apiSecret ),
196
- },
197
- {
198
- name : "sasl_password" ,
199
- uri : fmt .Sprintf ("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa" , apiSecret ),
200
- },
201
- {
202
- name : "ca_cert" ,
203
- uri : fmt .Sprintf ("kafka://nope?ca_cert=%s&tls_enabled=true" , certSecret ),
204
- },
205
- {
206
- name : "shared_access_key" ,
207
- uri : fmt .Sprintf ("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain" , apiSecret ),
208
- },
209
- } {
210
- t .Run (tc .name , func (t * testing.T ) {
211
- createStmt := fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri )
212
- var jobID jobspb.JobID
213
- sqlDB .QueryRow (t , createStmt ).Scan (& jobID )
214
- var sinkURI , description string
215
- sqlDB .QueryRow (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]" , jobID ).Scan (& sinkURI , & description )
216
- replacer := strings .NewReplacer (apiSecret , "redacted" , certSecret , "redacted" )
217
- expectedSinkURI := replacer .Replace (tc .uri )
218
- expectedDescription := replacer .Replace (createStmt )
219
- require .Equal (t , expectedSinkURI , sinkURI )
220
- require .Equal (t , expectedDescription , description )
215
+ t .Run ("jobs" , func (t * testing.T ) {
216
+ queryStr := sqlDB .QueryStr (t , "SELECT description from [SHOW JOBS]" )
217
+ require .NotContains (t , queryStr , apiSecret )
218
+ require .NotContains (t , queryStr , certSecret )
219
+ queryStr = sqlDB .QueryStr (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]" )
220
+ require .NotContains (t , queryStr , apiSecret )
221
+ require .NotContains (t , queryStr , certSecret )
221
222
})
222
223
}
223
-
224
- t .Run ("jobs" , func (t * testing.T ) {
225
- queryStr := sqlDB .QueryStr (t , "SELECT description from [SHOW JOBS]" )
226
- require .NotContains (t , queryStr , apiSecret )
227
- require .NotContains (t , queryStr , certSecret )
228
- queryStr = sqlDB .QueryStr (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]" )
229
- require .NotContains (t , queryStr , apiSecret )
230
- require .NotContains (t , queryStr , certSecret )
231
- })
224
+ cdcTest (t , testFn , feedTestForceSink ("kafka" ), feedTestNoExternalConnection )
232
225
}
233
226
234
227
func TestShowChangefeedJobs (t * testing.T ) {
0 commit comments