Skip to content

Commit 77bb5da

Browse files
craig[bot]KeithCh
andcommitted
Merge #144161
144161: changefeedccl: deflake TestChangefeedContinuousTelemetryOnTermination r=andyyang890 a=KeithCh There was an attempt to deflake this test in #135961 but we were still seeing this occasionally. This change expands on that attempt. The main cause is from a transient KV server error that occurs when we cancel a changefeed. The error causes a new changefeed aggregator to be started, which in turn creates a new telemetryMetricsRecorder instance. Since the lastEmitTime field in telemetryMetricsRecorder is initialized to 0, it emits a changefeed_emitted_bytes event at the very next maybeFlushLogs() call. A counter is added to detect when this transient error occurs, and prevents the second logger from emitting any logs. Resolves #143700 Release note: None Co-authored-by: Keith Chow <[email protected]>
2 parents 6321d87 + 723718a commit 77bb5da

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7553,6 +7553,7 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {
75537553

75547554
type testTelemetryLogger struct {
75557555
telemetryLogger
7556+
id int32
75567557
afterIncEmittedCounters func(numMessages int, numBytes int)
75577558
}
75587559

@@ -7563,6 +7564,18 @@ func (t *testTelemetryLogger) incEmittedCounters(numMessages int, numBytes int)
75637564
t.afterIncEmittedCounters(numMessages, numBytes)
75647565
}
75657566

7567+
func (t *testTelemetryLogger) maybeFlushLogs() {
7568+
if t.id == 1 {
7569+
t.telemetryLogger.maybeFlushLogs()
7570+
}
7571+
}
7572+
7573+
func (t *testTelemetryLogger) close() {
7574+
if t.id == 1 {
7575+
t.telemetryLogger.close()
7576+
}
7577+
}
7578+
75667579
func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
75677580
defer leaktest.AfterTest(t)()
75687581
defer log.Scope(t).Close(t)
@@ -7583,6 +7596,7 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
75837596
}
75847597
return nil
75857598
}
7599+
var numPeriodicTelemetryLogger atomic.Int32
75867600
// Synchronization to prevent a race between the changefeed closing
75877601
// and the telemetry logger getting emitted counts after messages
75887602
// have been emitted to the sink.
@@ -7595,6 +7609,7 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
75957609
seen.Store(true)
75967610
}
75977611
},
7612+
id: numPeriodicTelemetryLogger.Add(1),
75987613
}
75997614
}
76007615

@@ -7623,7 +7638,14 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
76237638

76247639
// Close the changefeed and ensure logs were created after closing.
76257640
require.NoError(t, foo.Close())
7626-
verifyLogsWithEmittedBytesAndMessages(t, jobID, afterFirstLog.UnixNano(), interval.Nanoseconds(), true /* closing */)
7641+
7642+
if numPeriodicTelemetryLogger.Load() > 1 {
7643+
t.Log("transient error")
7644+
}
7645+
7646+
verifyLogsWithEmittedBytesAndMessages(
7647+
t, jobID, afterFirstLog.UnixNano(), interval.Nanoseconds(), true, /* closing */
7648+
)
76277649
}
76287650

76297651
cdcTest(t, testFn)

0 commit comments

Comments
 (0)