Skip to content

Commit d36b659

Browse files
committed
changefeedccl: deflake TestShowChangefeedJobsRedacted
This test was failing when all of the subtests were passing, which points to that the test was not cleaned up correctly. This commit uses the cdctest helper, and has run succesfully over 4000 time without a flake. Fixes: #151280 Release note: none
1 parent 7d65e38 commit d36b659

File tree

1 file changed

+53
-60
lines changed

1 file changed

+53
-60
lines changed

pkg/ccl/changefeedccl/show_changefeed_jobs_test.go

Lines changed: 53 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2525
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
2626
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
27-
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2827
"github.com/cockroachdb/cockroach/pkg/testutils"
2928
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3029
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -166,69 +165,63 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
166165
defer leaktest.AfterTest(t)()
167166
defer log.Scope(t).Close(t)
168167

169-
s, stopServer := makeServer(t)
170-
defer stopServer()
168+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
169+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
170+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
171171

172-
knobs := s.TestingKnobs.
173-
DistSQL.(*execinfra.TestingKnobs).
174-
Changefeed.(*TestingKnobs)
175-
knobs.WrapSink = func(s Sink, _ jobspb.JobID) Sink {
176-
if _, ok := s.(*externalConnectionKafkaSink); ok {
177-
return s
172+
const apiSecret = "bar"
173+
const certSecret = "Zm9v"
174+
for _, tc := range []struct {
175+
name string
176+
uri string
177+
expectedSinkURI string
178+
expectedDescription string
179+
}{
180+
{
181+
name: "api_secret",
182+
uri: fmt.Sprintf("confluent-cloud://nope?api_key=fee&api_secret=%s", apiSecret),
183+
},
184+
{
185+
name: "sasl_password",
186+
uri: fmt.Sprintf("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa", apiSecret),
187+
},
188+
{
189+
name: "ca_cert",
190+
uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret),
191+
},
192+
{
193+
name: "shared_access_key",
194+
uri: fmt.Sprintf("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain", apiSecret),
195+
},
196+
} {
197+
t.Run(tc.name, func(t *testing.T) {
198+
foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri),
199+
optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"})
200+
defer closeFeed(t, foo)
201+
202+
efoo, ok := foo.(cdctest.EnterpriseTestFeed)
203+
require.True(t, ok)
204+
jobID := efoo.JobID()
205+
206+
var sinkURI, description string
207+
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
208+
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
209+
expectedSinkURI := replacer.Replace(tc.uri)
210+
expectedDescription := replacer.Replace(fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri))
211+
require.Equal(t, expectedSinkURI, sinkURI)
212+
require.Equal(t, expectedDescription, description)
213+
})
178214
}
179-
return &externalConnectionKafkaSink{sink: s, ignoreDialError: true}
180-
}
181-
182-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
183-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
184-
185-
const apiSecret = "bar"
186-
const certSecret = "Zm9v"
187-
for _, tc := range []struct {
188-
name string
189-
uri string
190-
expectedSinkURI string
191-
expectedDescription string
192-
}{
193-
{
194-
name: "api_secret",
195-
uri: fmt.Sprintf("confluent-cloud://nope?api_key=fee&api_secret=%s", apiSecret),
196-
},
197-
{
198-
name: "sasl_password",
199-
uri: fmt.Sprintf("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa", apiSecret),
200-
},
201-
{
202-
name: "ca_cert",
203-
uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret),
204-
},
205-
{
206-
name: "shared_access_key",
207-
uri: fmt.Sprintf("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain", apiSecret),
208-
},
209-
} {
210-
t.Run(tc.name, func(t *testing.T) {
211-
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri)
212-
var jobID jobspb.JobID
213-
sqlDB.QueryRow(t, createStmt).Scan(&jobID)
214-
var sinkURI, description string
215-
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
216-
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
217-
expectedSinkURI := replacer.Replace(tc.uri)
218-
expectedDescription := replacer.Replace(createStmt)
219-
require.Equal(t, expectedSinkURI, sinkURI)
220-
require.Equal(t, expectedDescription, description)
215+
t.Run("jobs", func(t *testing.T) {
216+
queryStr := sqlDB.QueryStr(t, "SELECT description from [SHOW JOBS]")
217+
require.NotContains(t, queryStr, apiSecret)
218+
require.NotContains(t, queryStr, certSecret)
219+
queryStr = sqlDB.QueryStr(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]")
220+
require.NotContains(t, queryStr, apiSecret)
221+
require.NotContains(t, queryStr, certSecret)
221222
})
222223
}
223-
224-
t.Run("jobs", func(t *testing.T) {
225-
queryStr := sqlDB.QueryStr(t, "SELECT description from [SHOW JOBS]")
226-
require.NotContains(t, queryStr, apiSecret)
227-
require.NotContains(t, queryStr, certSecret)
228-
queryStr = sqlDB.QueryStr(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]")
229-
require.NotContains(t, queryStr, apiSecret)
230-
require.NotContains(t, queryStr, certSecret)
231-
})
224+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestNoExternalConnection)
232225
}
233226

234227
func TestShowChangefeedJobs(t *testing.T) {

0 commit comments

Comments
 (0)