@@ -54,6 +54,7 @@ import (
54
54
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
55
55
"github.com/cockroachdb/cockroach/pkg/util/hlc"
56
56
"github.com/cockroachdb/cockroach/pkg/util/log"
57
+ "github.com/cockroachdb/cockroach/pkg/util/log/channel"
57
58
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
58
59
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
59
60
"github.com/cockroachdb/cockroach/pkg/util/randutil"
@@ -78,8 +79,8 @@ func init() {
78
79
sql .AddPlanHook ("changefeed" , changefeedPlanHook , changefeedTypeCheck )
79
80
jobs .RegisterConstructor (
80
81
jobspb .TypeChangefeed ,
81
- func (job * jobs.Job , _ * cluster.Settings ) jobs.Resumer {
82
- r := & changefeedResumer {job : job }
82
+ func (job * jobs.Job , s * cluster.Settings ) jobs.Resumer {
83
+ r := & changefeedResumer {job : job , sv : & s . SV }
83
84
r .mu .perNodeAggregatorStats = make (bulk.ComponentAggregatorStats )
84
85
return r
85
86
},
@@ -282,7 +283,8 @@ func changefeedPlanHook(
282
283
p .ExtendedEvalContext ().Descs .ReleaseAll (ctx )
283
284
284
285
telemetry .Count (`changefeed.create.core` )
285
- logCreateChangefeedTelemetry (ctx , jr , changefeedStmt .Select != nil , targets .Size )
286
+ shouldMigrate := log .ShouldMigrateEvent (p .ExecCfg ().SV ())
287
+ logCreateChangefeedTelemetry (ctx , jr , changefeedStmt .Select != nil , targets .Size , shouldMigrate )
286
288
if err := maybeShowCursorAgeWarning (ctx , p , opts ); err != nil {
287
289
return err
288
290
}
@@ -384,8 +386,8 @@ func changefeedPlanHook(
384
386
if err := maybeShowCursorAgeWarning (ctx , p , opts ); err != nil {
385
387
return err
386
388
}
387
-
388
- logCreateChangefeedTelemetry (ctx , jr , changefeedStmt .Select != nil , targets .Size )
389
+ shouldMigrate := log . ShouldMigrateEvent ( p . ExecCfg (). SV ())
390
+ logCreateChangefeedTelemetry (ctx , jr , changefeedStmt .Select != nil , targets .Size , shouldMigrate )
389
391
390
392
select {
391
393
case <- ctx .Done ():
@@ -400,7 +402,9 @@ func changefeedPlanHook(
400
402
rowFnLogErrors := func (ctx context.Context , resultsCh chan <- tree.Datums ) error {
401
403
err := rowFn (ctx , resultsCh )
402
404
if err != nil {
403
- logChangefeedFailedTelemetryDuringStartup (ctx , description , failureTypeForStartupError (err ))
405
+
406
+ shouldMigrate := log .ShouldMigrateEvent (p .ExecCfg ().SV ())
407
+ logChangefeedFailedTelemetryDuringStartup (ctx , description , failureTypeForStartupError (err ), shouldMigrate )
404
408
var e * kvpb.BatchTimestampBeforeGCError
405
409
if errors .As (err , & e ) && opts .HasStartCursor () {
406
410
err = errors .Wrapf (err ,
@@ -1320,6 +1324,7 @@ func validateAndNormalizeChangefeedExpression(
1320
1324
1321
1325
type changefeedResumer struct {
1322
1326
job * jobs.Job
1327
+ sv * settings.Values
1323
1328
1324
1329
mu struct {
1325
1330
syncutil.Mutex
@@ -1792,17 +1797,17 @@ func (b *changefeedResumer) OnFailOrCancel(
1792
1797
}
1793
1798
numTargets = targets .Size
1794
1799
}
1795
-
1800
+ shouldMigrate := log . ShouldMigrateEvent ( b . sv )
1796
1801
// If this job has failed (not canceled), increment the counter.
1797
1802
if jobs .HasErrJobCanceled (
1798
1803
errors .DecodeError (ctx , * b .job .Payload ().FinalResumeError ),
1799
1804
) {
1800
1805
telemetry .Count (`changefeed.enterprise.cancel` )
1801
- logChangefeedCanceledTelemetry (ctx , b .job , numTargets )
1806
+ logChangefeedCanceledTelemetry (ctx , b .job , numTargets , shouldMigrate )
1802
1807
} else {
1803
1808
telemetry .Count (`changefeed.enterprise.fail` )
1804
1809
exec .ExecCfg ().JobRegistry .MetricsStruct ().Changefeed .(* Metrics ).Failures .Inc (1 )
1805
- logChangefeedFailedTelemetry (ctx , b .job , changefeedbase .UnknownError , numTargets )
1810
+ logChangefeedFailedTelemetry (ctx , b .job , changefeedbase .UnknownError , numTargets , shouldMigrate )
1806
1811
}
1807
1812
return nil
1808
1813
}
@@ -1890,7 +1895,7 @@ func getChangefeedTargetName(
1890
1895
}
1891
1896
1892
1897
func logCreateChangefeedTelemetry (
1893
- ctx context.Context , jr * jobs.Record , isTransformation bool , numTargets uint ,
1898
+ ctx context.Context , jr * jobs.Record , isTransformation bool , numTargets uint , migrateEvent bool ,
1894
1899
) {
1895
1900
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
1896
1901
if jr != nil {
@@ -1903,11 +1908,11 @@ func logCreateChangefeedTelemetry(
1903
1908
Transformation : isTransformation ,
1904
1909
}
1905
1910
1906
- log .StructuredEvent (ctx , severity .INFO , createChangefeedEvent )
1911
+ getChangefeedEventMigrator ( migrateEvent ) .StructuredEvent (ctx , severity .INFO , createChangefeedEvent )
1907
1912
}
1908
1913
1909
1914
func logAlterChangefeedTelemetry (
1910
- ctx context.Context , job * jobs.Job , prevDescription string , numTargets uint ,
1915
+ ctx context.Context , job * jobs.Job , prevDescription string , numTargets uint , migrateEvent bool ,
1911
1916
) {
1912
1917
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
1913
1918
if job != nil {
@@ -1921,11 +1926,15 @@ func logAlterChangefeedTelemetry(
1921
1926
PreviousDescription : prevDescription ,
1922
1927
}
1923
1928
1924
- log .StructuredEvent (ctx , severity .INFO , alterChangefeedEvent )
1929
+ getChangefeedEventMigrator ( migrateEvent ) .StructuredEvent (ctx , severity .INFO , alterChangefeedEvent )
1925
1930
}
1926
1931
1927
1932
func logChangefeedFailedTelemetry (
1928
- ctx context.Context , job * jobs.Job , failureType changefeedbase.FailureType , numTargets uint ,
1933
+ ctx context.Context ,
1934
+ job * jobs.Job ,
1935
+ failureType changefeedbase.FailureType ,
1936
+ numTargets uint ,
1937
+ migrateEvent bool ,
1929
1938
) {
1930
1939
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
1931
1940
if job != nil {
@@ -1938,21 +1947,26 @@ func logChangefeedFailedTelemetry(
1938
1947
FailureType : failureType ,
1939
1948
}
1940
1949
1941
- log .StructuredEvent (ctx , severity .INFO , changefeedFailedEvent )
1950
+ getChangefeedEventMigrator ( migrateEvent ) .StructuredEvent (ctx , severity .INFO , changefeedFailedEvent )
1942
1951
}
1943
1952
1944
1953
func logChangefeedFailedTelemetryDuringStartup (
1945
- ctx context.Context , description string , failureType changefeedbase.FailureType ,
1954
+ ctx context.Context ,
1955
+ description string ,
1956
+ failureType changefeedbase.FailureType ,
1957
+ migrateEvent bool ,
1946
1958
) {
1947
1959
changefeedFailedEvent := & eventpb.ChangefeedFailed {
1948
1960
CommonChangefeedEventDetails : eventpb.CommonChangefeedEventDetails {Description : description },
1949
1961
FailureType : failureType ,
1950
1962
}
1951
1963
1952
- log .StructuredEvent (ctx , severity .INFO , changefeedFailedEvent )
1964
+ getChangefeedEventMigrator ( migrateEvent ) .StructuredEvent (ctx , severity .INFO , changefeedFailedEvent )
1953
1965
}
1954
1966
1955
- func logChangefeedCanceledTelemetry (ctx context.Context , job * jobs.Job , numTargets uint ) {
1967
+ func logChangefeedCanceledTelemetry (
1968
+ ctx context.Context , job * jobs.Job , numTargets uint , migrateEvent bool ,
1969
+ ) {
1956
1970
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
1957
1971
if job != nil {
1958
1972
changefeedDetails := job .Details ().(jobspb.ChangefeedDetails )
@@ -1962,7 +1976,7 @@ func logChangefeedCanceledTelemetry(ctx context.Context, job *jobs.Job, numTarge
1962
1976
changefeedCanceled := & eventpb.ChangefeedCanceled {
1963
1977
CommonChangefeedEventDetails : changefeedEventDetails ,
1964
1978
}
1965
- log .StructuredEvent (ctx , severity .INFO , changefeedCanceled )
1979
+ getChangefeedEventMigrator ( migrateEvent ) .StructuredEvent (ctx , severity .INFO , changefeedCanceled )
1966
1980
}
1967
1981
1968
1982
func makeCommonChangefeedEventDetails (
@@ -2101,3 +2115,12 @@ func maybeUpgradePreProductionReadyExpression(
2101
2115
"Please see CDC documentation on the use of new cdc_prev tuple." ,
2102
2116
tree .AsString (oldExpression ), tree .AsString (newExpression ))
2103
2117
}
2118
+
2119
+ func getChangefeedEventMigrator (migrateEvent bool ) log.StructuredEventMigrator {
2120
+ return log .NewStructuredEventMigrator (
2121
+ func () bool {
2122
+ return migrateEvent
2123
+ },
2124
+ channel .CHANGEFEED ,
2125
+ )
2126
+ }
0 commit comments