Skip to content

Commit f9a96f4

Browse files
craig[bot]andyyang890
andcommitted
Merge #148481
148481: changefeedccl: split up TestChangefeedCursor into subtests r=aerfrei,rharding6373 a=andyyang890 This patch splits up `TestChangefeedCursor` into subtests. Doing so also has the side effect of closing each changefeed before starting the next, which may reduce timeouts due to resource contention. Fixes #146179 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 1dcc89f + af04a3f commit f9a96f4

File tree

1 file changed

+55
-47
lines changed

1 file changed

+55
-47
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,62 +1029,70 @@ func TestChangefeedCursor(t *testing.T) {
10291029
require.True(t, beforeInsert.Less(insertTimestamp) && insertTimestamp.Less(tsLogical) && tsLogical.Less(s.Server.Clock().Now()),
10301030
fmt.Sprintf("beforeInsert: %s, insertTimestamp: %s, tsLogical: %s", beforeInsert, insertTimestamp, tsLogical))
10311031

1032-
// The below function is currently used to test negative timestamp in cursor i.e of the form
1033-
// "-3us".
1034-
// Using this function we can calculate the difference with the time that was before
1035-
// the insert statement, which is set as the new cursor value inside createChangefeedJobRecord
1036-
calculateCursor := func(currentTime *hlc.Timestamp) string {
1037-
// Should convert to microseconds as that is the maximum precision we support
1038-
diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000
1039-
diffStr := strconv.FormatInt(diff, 10) + "us"
1040-
return diffStr
1041-
}
1042-
1043-
knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
1044-
knobs.OverrideCursor = calculateCursor
1032+
t.Run("negative cursor", func(t *testing.T) {
1033+
// The below function is currently used to test negative timestamp in cursor i.e of the form
1034+
// "-3us".
1035+
// Using this function we can calculate the difference with the time that was before
1036+
// the insert statement, which is set as the new cursor value inside createChangefeedJobRecord
1037+
calculateCursor := func(currentTime *hlc.Timestamp) string {
1038+
// Should convert to microseconds as that is the maximum precision we support
1039+
diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000
1040+
diffStr := strconv.FormatInt(diff, 10) + "us"
1041+
return diffStr
1042+
}
1043+
1044+
knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
1045+
knobs.OverrideCursor = calculateCursor
1046+
1047+
// The "-3 days" is a placeholder here - it will be replaced with actual difference
1048+
// in createChangefeedJobRecord
1049+
fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days")
1050+
defer closeFeed(t, fooInterval)
1051+
assertPayloads(t, fooInterval, []string{
1052+
`foo: [1]->{"after": {"a": 1, "b": "before"}}`,
1053+
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1054+
})
10451055

1046-
// The "-3 days" is a placeholder here - it will be replaced with actual difference
1047-
// in createChangefeedJobRecord
1048-
fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days")
1049-
defer closeFeed(t, fooInterval)
1050-
assertPayloads(t, fooInterval, []string{
1051-
`foo: [1]->{"after": {"a": 1, "b": "before"}}`,
1052-
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1056+
// We do not need to override for the remaining cases
1057+
knobs.OverrideCursor = nil
10531058
})
10541059

1055-
// We do not need to override for the remaining cases
1056-
knobs.OverrideCursor = nil
1060+
t.Run("decimal cursor", func(t *testing.T) {
1061+
fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, eval.TimestampToDecimalDatum(tsLogical).String())
1062+
defer closeFeed(t, fooLogical)
1063+
assertPayloads(t, fooLogical, []string{
1064+
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1065+
})
10571066

1058-
fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, eval.TimestampToDecimalDatum(tsLogical).String())
1059-
defer closeFeed(t, fooLogical)
1060-
assertPayloads(t, fooLogical, []string{
1061-
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1067+
// Check that the cursor is properly hooked up to the job statement
1068+
// time. The sinkless tests currently don't have a way to get the
1069+
// statement timestamp, so only verify this for enterprise.
1070+
if e, ok := fooLogical.(cdctest.EnterpriseTestFeed); ok {
1071+
var bytes []byte
1072+
sqlDB.QueryRow(t, jobutils.JobPayloadByIDQuery, e.JobID()).Scan(&bytes)
1073+
var payload jobspb.Payload
1074+
require.NoError(t, protoutil.Unmarshal(bytes, &payload))
1075+
require.Equal(t, tsLogical, payload.GetChangefeed().StatementTime)
1076+
}
10621077
})
10631078

1064-
nanosStr := strconv.FormatInt(tsClock.UnixNano(), 10)
1065-
fooNanosStr := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, nanosStr)
1066-
defer closeFeed(t, fooNanosStr)
1067-
assertPayloads(t, fooNanosStr, []string{
1068-
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1079+
t.Run("nanos cursor", func(t *testing.T) {
1080+
nanosStr := strconv.FormatInt(tsClock.UnixNano(), 10)
1081+
fooNanosStr := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, nanosStr)
1082+
defer closeFeed(t, fooNanosStr)
1083+
assertPayloads(t, fooNanosStr, []string{
1084+
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1085+
})
10691086
})
10701087

1071-
timeStr := tsClock.Format(`2006-01-02 15:04:05.999999`)
1072-
fooString := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, timeStr)
1073-
defer closeFeed(t, fooString)
1074-
assertPayloads(t, fooString, []string{
1075-
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1088+
t.Run("datetime cursor", func(t *testing.T) {
1089+
timeStr := tsClock.Format(`2006-01-02 15:04:05.999999`)
1090+
fooString := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, timeStr)
1091+
defer closeFeed(t, fooString)
1092+
assertPayloads(t, fooString, []string{
1093+
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
1094+
})
10761095
})
1077-
1078-
// Check that the cursor is properly hooked up to the job statement
1079-
// time. The sinkless tests currently don't have a way to get the
1080-
// statement timestamp, so only verify this for enterprise.
1081-
if e, ok := fooLogical.(cdctest.EnterpriseTestFeed); ok {
1082-
var bytes []byte
1083-
sqlDB.QueryRow(t, jobutils.JobPayloadByIDQuery, e.JobID()).Scan(&bytes)
1084-
var payload jobspb.Payload
1085-
require.NoError(t, protoutil.Unmarshal(bytes, &payload))
1086-
require.Equal(t, tsLogical, payload.GetChangefeed().StatementTime)
1087-
}
10881096
}
10891097

10901098
cdcTest(t, testFn)

0 commit comments

Comments
 (0)