Skip to content

Commit 267e0fe

Browse files
committed
changefeedccl: prep logging channel migration of changefeed events
In v26.1, changefeed events will be moved from the TELEMETRY logging channel to the CHANGEFEED logging channel. This commit gates this migration under the cluster setting: `log.channel_compatibility_mode.enabled` and will log these events to the CHANGEFEED channel if this setting is set to false. Users can set this setting to false in their clusters to validate, test, and identify potential downstream impacts to their logging setups and pipelines. Epic: CRDB-53410 Part of: CRDB-53412 Release note (ops change): changefeed events will be moved to the CHANGEFEED channel in 26.1. In order to test the impact of these changes, users can set the setting: `log.channel_compatibility_mode.enabled` to false. Note that this will cause these logs to start logging in the CHANGEFEED channel so this shouldn't be tested in a production environment.
1 parent 2d73bfc commit 267e0fe

File tree

8 files changed

+218
-145
lines changed

8 files changed

+218
-145
lines changed

docs/generated/eventlog.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ Events in this category are logged to the `TELEMETRY` channel.
3232

3333
An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED statements that are run.
3434

35+
Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
36+
To test compatability before this, set the cluster setting
37+
`log.channel_compatibility_mode.enabled` to false. This will send the
38+
events to `CHANGEFEED` instead of `TELEMETRY`.
39+
3540

3641
| Field | Description | Sensitive |
3742
|--|--|--|
@@ -54,6 +59,11 @@ An event of type `alter_changefeed` is an event for any ALTER CHANGEFEED stateme
5459

5560
An event of type `changefeed_canceled` is an event for any changefeed cancellations.
5661

62+
Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
63+
To test compatability before this, set the cluster setting
64+
`log.channel_compatibility_mode.enabled` to false. This will send the
65+
events to `CHANGEFEED` instead of `TELEMETRY`.
66+
5767

5868

5969

@@ -73,6 +83,11 @@ An event of type `changefeed_canceled` is an event for any changefeed cancellati
7383

7484
An event of type `changefeed_emitted_bytes` is an event representing the bytes emitted by a changefeed over an interval.
7585

86+
Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
87+
To test compatability before this, set the cluster setting
88+
`log.channel_compatibility_mode.enabled` to false. This will send the
89+
events to `CHANGEFEED` instead of `TELEMETRY`.
90+
7691

7792
| Field | Description | Sensitive |
7893
|--|--|--|
@@ -99,6 +114,11 @@ An event of type `changefeed_emitted_bytes` is an event representing the bytes e
99114
An event of type `changefeed_failed` is an event for any changefeed failure since the plan hook
100115
was triggered.
101116

117+
Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
118+
To test compatability before this, set the cluster setting
119+
`log.channel_compatibility_mode.enabled` to false. This will send the
120+
events to `CHANGEFEED` instead of `TELEMETRY`.
121+
102122

103123
| Field | Description | Sensitive |
104124
|--|--|--|
@@ -123,6 +143,11 @@ An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query
123143
successfully starts running. Failed CREATE statements will show up as
124144
ChangefeedFailed events.
125145

146+
Note: in version 26.1, these events will be moved to the `CHANGEFEED` channel.
147+
To test compatability before this, set the cluster setting
148+
`log.channel_compatibility_mode.enabled` to false. This will send the
149+
events to `CHANGEFEED` instead of `TELEMETRY`.
150+
126151

127152
| Field | Description | Sensitive |
128153
|--|--|--|

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ go_library(
144144
"//pkg/util/intsets",
145145
"//pkg/util/json",
146146
"//pkg/util/log",
147+
"//pkg/util/log/channel",
147148
"//pkg/util/log/eventpb",
148149
"//pkg/util/log/logcrash",
149150
"//pkg/util/log/severity",
@@ -280,6 +281,7 @@ go_test(
280281
"//pkg/server",
281282
"//pkg/server/serverpb",
282283
"//pkg/server/telemetry",
284+
"//pkg/settings",
283285
"//pkg/settings/cluster",
284286
"//pkg/spanconfig",
285287
"//pkg/spanconfig/spanconfigjob",
@@ -337,6 +339,8 @@ go_test(
337339
"//pkg/util/leaktest",
338340
"//pkg/util/log",
339341
"//pkg/util/log/eventpb",
342+
"//pkg/util/log/logpb",
343+
"//pkg/util/log/logtestutils",
340344
"//pkg/util/metamorphic",
341345
"//pkg/util/mon",
342346
"//pkg/util/parquet",

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3535
"github.com/cockroachdb/cockroach/pkg/sql/types"
3636
"github.com/cockroachdb/cockroach/pkg/util/hlc"
37+
"github.com/cockroachdb/cockroach/pkg/util/log"
3738
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3839
"github.com/cockroachdb/errors"
3940
)
@@ -250,8 +251,8 @@ func alterChangefeedPlanHook(
250251
}
251252

252253
telemetry.Count(telemetryPath)
253-
254-
logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size)
254+
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
255+
logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size, shouldMigrate)
255256

256257
select {
257258
case <-ctx.Done():

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
5555
"github.com/cockroachdb/cockroach/pkg/util/hlc"
5656
"github.com/cockroachdb/cockroach/pkg/util/log"
57+
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
5758
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
5859
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
5960
"github.com/cockroachdb/cockroach/pkg/util/randutil"
@@ -78,8 +79,8 @@ func init() {
7879
sql.AddPlanHook("changefeed", changefeedPlanHook, changefeedTypeCheck)
7980
jobs.RegisterConstructor(
8081
jobspb.TypeChangefeed,
81-
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
82-
r := &changefeedResumer{job: job}
82+
func(job *jobs.Job, s *cluster.Settings) jobs.Resumer {
83+
r := &changefeedResumer{job: job, sv: &s.SV}
8384
r.mu.perNodeAggregatorStats = make(bulk.ComponentAggregatorStats)
8485
return r
8586
},
@@ -282,7 +283,8 @@ func changefeedPlanHook(
282283
p.ExtendedEvalContext().Descs.ReleaseAll(ctx)
283284

284285
telemetry.Count(`changefeed.create.core`)
285-
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size)
286+
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
287+
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate)
286288
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
287289
return err
288290
}
@@ -384,8 +386,8 @@ func changefeedPlanHook(
384386
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
385387
return err
386388
}
387-
388-
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size)
389+
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
390+
logCreateChangefeedTelemetry(ctx, jr, changefeedStmt.Select != nil, targets.Size, shouldMigrate)
389391

390392
select {
391393
case <-ctx.Done():
@@ -400,7 +402,9 @@ func changefeedPlanHook(
400402
rowFnLogErrors := func(ctx context.Context, resultsCh chan<- tree.Datums) error {
401403
err := rowFn(ctx, resultsCh)
402404
if err != nil {
403-
logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err))
405+
406+
shouldMigrate := log.ShouldMigrateEvent(p.ExecCfg().SV())
407+
logChangefeedFailedTelemetryDuringStartup(ctx, description, failureTypeForStartupError(err), shouldMigrate)
404408
var e *kvpb.BatchTimestampBeforeGCError
405409
if errors.As(err, &e) && opts.HasStartCursor() {
406410
err = errors.Wrapf(err,
@@ -1320,6 +1324,7 @@ func validateAndNormalizeChangefeedExpression(
13201324

13211325
type changefeedResumer struct {
13221326
job *jobs.Job
1327+
sv *settings.Values
13231328

13241329
mu struct {
13251330
syncutil.Mutex
@@ -1792,17 +1797,17 @@ func (b *changefeedResumer) OnFailOrCancel(
17921797
}
17931798
numTargets = targets.Size
17941799
}
1795-
1800+
shouldMigrate := log.ShouldMigrateEvent(b.sv)
17961801
// If this job has failed (not canceled), increment the counter.
17971802
if jobs.HasErrJobCanceled(
17981803
errors.DecodeError(ctx, *b.job.Payload().FinalResumeError),
17991804
) {
18001805
telemetry.Count(`changefeed.enterprise.cancel`)
1801-
logChangefeedCanceledTelemetry(ctx, b.job, numTargets)
1806+
logChangefeedCanceledTelemetry(ctx, b.job, numTargets, shouldMigrate)
18021807
} else {
18031808
telemetry.Count(`changefeed.enterprise.fail`)
18041809
exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1)
1805-
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets)
1810+
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError, numTargets, shouldMigrate)
18061811
}
18071812
return nil
18081813
}
@@ -1890,7 +1895,7 @@ func getChangefeedTargetName(
18901895
}
18911896

18921897
func logCreateChangefeedTelemetry(
1893-
ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint,
1898+
ctx context.Context, jr *jobs.Record, isTransformation bool, numTargets uint, migrateEvent bool,
18941899
) {
18951900
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
18961901
if jr != nil {
@@ -1903,11 +1908,11 @@ func logCreateChangefeedTelemetry(
19031908
Transformation: isTransformation,
19041909
}
19051910

1906-
log.StructuredEvent(ctx, severity.INFO, createChangefeedEvent)
1911+
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, createChangefeedEvent)
19071912
}
19081913

19091914
func logAlterChangefeedTelemetry(
1910-
ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint,
1915+
ctx context.Context, job *jobs.Job, prevDescription string, numTargets uint, migrateEvent bool,
19111916
) {
19121917
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
19131918
if job != nil {
@@ -1921,11 +1926,15 @@ func logAlterChangefeedTelemetry(
19211926
PreviousDescription: prevDescription,
19221927
}
19231928

1924-
log.StructuredEvent(ctx, severity.INFO, alterChangefeedEvent)
1929+
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, alterChangefeedEvent)
19251930
}
19261931

19271932
func logChangefeedFailedTelemetry(
1928-
ctx context.Context, job *jobs.Job, failureType changefeedbase.FailureType, numTargets uint,
1933+
ctx context.Context,
1934+
job *jobs.Job,
1935+
failureType changefeedbase.FailureType,
1936+
numTargets uint,
1937+
migrateEvent bool,
19291938
) {
19301939
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
19311940
if job != nil {
@@ -1938,21 +1947,26 @@ func logChangefeedFailedTelemetry(
19381947
FailureType: failureType,
19391948
}
19401949

1941-
log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
1950+
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
19421951
}
19431952

19441953
func logChangefeedFailedTelemetryDuringStartup(
1945-
ctx context.Context, description string, failureType changefeedbase.FailureType,
1954+
ctx context.Context,
1955+
description string,
1956+
failureType changefeedbase.FailureType,
1957+
migrateEvent bool,
19461958
) {
19471959
changefeedFailedEvent := &eventpb.ChangefeedFailed{
19481960
CommonChangefeedEventDetails: eventpb.CommonChangefeedEventDetails{Description: description},
19491961
FailureType: failureType,
19501962
}
19511963

1952-
log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
1964+
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
19531965
}
19541966

1955-
func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTargets uint) {
1967+
func logChangefeedCanceledTelemetry(
1968+
ctx context.Context, job *jobs.Job, numTargets uint, migrateEvent bool,
1969+
) {
19561970
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
19571971
if job != nil {
19581972
changefeedDetails := job.Details().(jobspb.ChangefeedDetails)
@@ -1962,7 +1976,7 @@ func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTarge
19621976
changefeedCanceled := &eventpb.ChangefeedCanceled{
19631977
CommonChangefeedEventDetails: changefeedEventDetails,
19641978
}
1965-
log.StructuredEvent(ctx, severity.INFO, changefeedCanceled)
1979+
getChangefeedEventMigrator(migrateEvent).StructuredEvent(ctx, severity.INFO, changefeedCanceled)
19661980
}
19671981

19681982
func makeCommonChangefeedEventDetails(
@@ -2101,3 +2115,12 @@ func maybeUpgradePreProductionReadyExpression(
21012115
"Please see CDC documentation on the use of new cdc_prev tuple.",
21022116
tree.AsString(oldExpression), tree.AsString(newExpression))
21032117
}
2118+
2119+
func getChangefeedEventMigrator(migrateEvent bool) log.StructuredEventMigrator {
2120+
return log.NewStructuredEventMigrator(
2121+
func() bool {
2122+
return migrateEvent
2123+
},
2124+
channel.CHANGEFEED,
2125+
)
2126+
}

0 commit comments

Comments
 (0)