@@ -6359,111 +6359,125 @@ func TestChangefeedMonitoring(t *testing.T) {
6359
6359
defer leaktest .AfterTest (t )()
6360
6360
defer log .Scope (t ).Close (t )
6361
6361
6362
- testFn := func (t * testing.T , s TestServerWithSystem , f cdctest.TestFeedFactory ) {
6363
- sqlDB := sqlutils .MakeSQLRunner (s .DB )
6364
- sysDB := sqlutils .MakeSQLRunner (s .SystemServer .SQLConn (t ))
6365
- sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY)` )
6366
- sqlDB .Exec (t , `INSERT INTO foo VALUES (1)` )
6367
-
6368
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c != 0 {
6369
- t .Errorf (`expected 0 got %d` , c )
6370
- }
6371
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c != 0 {
6372
- t .Errorf (`expected 0 got %d` , c )
6373
- }
6374
- if c := s .Server .MustGetSQLCounter (`changefeed.flushed_bytes` ); c != 0 {
6375
- t .Errorf (`expected 0 got %d` , c )
6376
- }
6377
- if c := s .Server .MustGetSQLCounter (`changefeed.flushes` ); c != 0 {
6378
- t .Errorf (`expected 0 got %d` , c )
6379
- }
6380
- if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos` ); c != 0 {
6381
- t .Errorf (`expected %d got %d` , 0 , c )
6382
- }
6383
- if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.in` ); c != 0 {
6384
- t .Errorf (`expected 0 got %d` , c )
6385
- }
6386
- if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.out` ); c != 0 {
6387
- t .Errorf (`expected 0 got %d` , c )
6388
- }
6389
- if c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_metadata_nanos` ); c != 0 {
6390
- t .Errorf (`expected 0 got %d` , c )
6391
- }
6392
- if c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_history_scans` ); c != 0 {
6393
- t .Errorf (`expected 0 got %d` , c )
6394
- }
6395
-
6396
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'` )
6397
- _ , err := foo .Next ()
6398
- require .NoError (t , err )
6362
+ testutils .RunTrueAndFalse (t , "schema_locked" , func (t * testing.T , schemaLocked bool ) {
6363
+ testFn := func (t * testing.T , s TestServerWithSystem , f cdctest.TestFeedFactory ) {
6364
+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
6365
+ sysDB := sqlutils .MakeSQLRunner (s .SystemServer .SQLConn (t ))
6366
+ sqlDB .Exec (t , fmt .Sprintf (
6367
+ `CREATE TABLE foo (a INT PRIMARY KEY) WITH (schema_locked=%t)` , schemaLocked ))
6368
+ sqlDB .Exec (t , `INSERT INTO foo VALUES (1)` )
6399
6369
6400
- testutils .SucceedsSoon (t , func () error {
6401
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c != 1 {
6402
- return errors .Errorf (`expected 1 got %d` , c )
6370
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c != 0 {
6371
+ t .Errorf (`expected 0 got %d` , c )
6403
6372
}
6404
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c != 22 {
6405
- return errors .Errorf (`expected 22 got %d` , c )
6373
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c != 0 {
6374
+ t .Errorf (`expected 0 got %d` , c )
6406
6375
}
6407
- if c := s .Server .MustGetSQLCounter (`changefeed.flushed_bytes` ); c != 22 {
6408
- return errors .Errorf (`expected 22 got %d` , c )
6376
+ if c := s .Server .MustGetSQLCounter (`changefeed.flushed_bytes` ); c != 0 {
6377
+ t .Errorf (`expected 0 got %d` , c )
6409
6378
}
6410
- if c := s .Server .MustGetSQLCounter (`changefeed.flushes` ); c < = 0 {
6411
- return errors .Errorf (`expected > 0 got %d` , c )
6379
+ if c := s .Server .MustGetSQLCounter (`changefeed.flushes` ); c ! = 0 {
6380
+ t .Errorf (`expected 0 got %d` , c )
6412
6381
}
6413
- if c := s .Server .MustGetSQLCounter (`changefeed.running ` ); c != 1 {
6414
- return errors .Errorf (`expected 1 got %d` , c )
6382
+ if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos ` ); c != 0 {
6383
+ t .Errorf (`expected %d got %d` , 0 , c )
6415
6384
}
6416
- if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos ` ); c < = 0 {
6417
- return errors .Errorf (`expected > 0 got %d` , c )
6385
+ if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.in ` ); c ! = 0 {
6386
+ t .Errorf (`expected 0 got %d` , c )
6418
6387
}
6419
- if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.in ` ); c < = 0 {
6420
- return errors .Errorf (`expected > 0 got %d` , c )
6388
+ if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.out ` ); c ! = 0 {
6389
+ t .Errorf (`expected 0 got %d` , c )
6421
6390
}
6422
- if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.out ` ); c < = 0 {
6423
- return errors .Errorf (`expected > 0 got %d` , c )
6391
+ if c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_metadata_nanos ` ); c ! = 0 {
6392
+ t .Errorf (`expected 0 got %d` , c )
6424
6393
}
6425
- if c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_history_scans` ); c < = 0 {
6426
- return errors .Errorf (`expected > 0 got %d` , c )
6394
+ if c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_history_scans` ); c ! = 0 {
6395
+ t .Errorf (`expected 0 got %d` , c )
6427
6396
}
6428
- return nil
6429
- })
6430
6397
6431
- sqlDB .Exec (t , `INSERT INTO foo VALUES (2)` )
6398
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'` )
6399
+ _ , err := foo .Next ()
6400
+ require .NoError (t , err )
6432
6401
6433
- // Check that two changefeeds add correctly.
6434
- // Set cluster settings back so we don't interfere with schema changes.
6435
- sysDB .Exec (t , `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'` )
6436
- fooCopy := feed (t , f , `CREATE CHANGEFEED FOR foo` )
6437
- _ , _ = fooCopy .Next ()
6438
- _ , _ = fooCopy .Next ()
6439
- testutils .SucceedsSoon (t , func () error {
6440
- // We can't assert exactly 4 or 88 in case we get (allowed) duplicates
6441
- // from RangeFeed.
6442
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c < 4 {
6443
- return errors .Errorf (`expected >= 4 got %d` , c )
6444
- }
6445
- if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c < 88 {
6446
- return errors .Errorf (`expected >= 88 got %d` , c )
6447
- }
6448
- return nil
6449
- })
6402
+ testutils .SucceedsSoon (t , func () error {
6403
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c != 1 {
6404
+ return errors .Errorf (`expected 1 got %d` , c )
6405
+ }
6406
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c != 22 {
6407
+ return errors .Errorf (`expected 22 got %d` , c )
6408
+ }
6409
+ if c := s .Server .MustGetSQLCounter (`changefeed.flushed_bytes` ); c != 22 {
6410
+ return errors .Errorf (`expected 22 got %d` , c )
6411
+ }
6412
+ if c := s .Server .MustGetSQLCounter (`changefeed.flushes` ); c <= 0 {
6413
+ return errors .Errorf (`expected > 0 got %d` , c )
6414
+ }
6415
+ if c := s .Server .MustGetSQLCounter (`changefeed.running` ); c != 1 {
6416
+ return errors .Errorf (`expected 1 got %d` , c )
6417
+ }
6418
+ if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos` ); c <= 0 {
6419
+ return errors .Errorf (`expected > 0 got %d` , c )
6420
+ }
6421
+ if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.in` ); c <= 0 {
6422
+ return errors .Errorf (`expected > 0 got %d` , c )
6423
+ }
6424
+ if c := s .Server .MustGetSQLCounter (`changefeed.buffer_entries.out` ); c <= 0 {
6425
+ return errors .Errorf (`expected > 0 got %d` , c )
6426
+ }
6427
+ switch c := s .Server .MustGetSQLCounter (`changefeed.schemafeed.table_history_scans` ); {
6428
+ case schemaLocked :
6429
+ // When the table is schema-locked, we permit this metric to be zero
6430
+ // because we might not have done any table history scans before the
6431
+ // schema feed's polling is paused, which can happen if the kv feed
6432
+ // is quicker than the schema feed during startup.
6433
+ if c < 0 {
6434
+ return errors .Errorf (`expected >= 0 got %d` , c )
6435
+ }
6436
+ default :
6437
+ if c <= 0 {
6438
+ return errors .Errorf (`expected > 0 got %d` , c )
6439
+ }
6440
+ }
6441
+ return nil
6442
+ })
6450
6443
6451
- // Cancel all the changefeeds and check that max_behind_nanos returns to 0
6452
- // and the number running returns to 0.
6453
- require .NoError (t , foo .Close ())
6454
- require .NoError (t , fooCopy .Close ())
6455
- testutils .SucceedsSoon (t , func () error {
6456
- if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos` ); c != 0 {
6457
- return errors .Errorf (`expected 0 got %d` , c )
6458
- }
6459
- if c := s .Server .MustGetSQLCounter (`changefeed.running` ); c != 0 {
6460
- return errors .Errorf (`expected 0 got %d` , c )
6461
- }
6462
- return nil
6463
- })
6464
- }
6444
+ sqlDB .Exec (t , `INSERT INTO foo VALUES (2)` )
6465
6445
6466
- cdcTestWithSystem (t , testFn , feedTestForceSink ("sinkless" ))
6446
+ // Check that two changefeeds add correctly.
6447
+ // Set cluster settings back so we don't interfere with schema changes.
6448
+ sysDB .Exec (t , `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'` )
6449
+ fooCopy := feed (t , f , `CREATE CHANGEFEED FOR foo` )
6450
+ _ , _ = fooCopy .Next ()
6451
+ _ , _ = fooCopy .Next ()
6452
+ testutils .SucceedsSoon (t , func () error {
6453
+ // We can't assert exactly 4 or 88 in case we get (allowed) duplicates
6454
+ // from RangeFeed.
6455
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_messages` ); c < 4 {
6456
+ return errors .Errorf (`expected >= 4 got %d` , c )
6457
+ }
6458
+ if c := s .Server .MustGetSQLCounter (`changefeed.emitted_bytes` ); c < 88 {
6459
+ return errors .Errorf (`expected >= 88 got %d` , c )
6460
+ }
6461
+ return nil
6462
+ })
6463
+
6464
+ // Cancel all the changefeeds and check that max_behind_nanos returns to 0
6465
+ // and the number running returns to 0.
6466
+ require .NoError (t , foo .Close ())
6467
+ require .NoError (t , fooCopy .Close ())
6468
+ testutils .SucceedsSoon (t , func () error {
6469
+ if c := s .Server .MustGetSQLCounter (`changefeed.max_behind_nanos` ); c != 0 {
6470
+ return errors .Errorf (`expected 0 got %d` , c )
6471
+ }
6472
+ if c := s .Server .MustGetSQLCounter (`changefeed.running` ); c != 0 {
6473
+ return errors .Errorf (`expected 0 got %d` , c )
6474
+ }
6475
+ return nil
6476
+ })
6477
+ }
6478
+
6479
+ cdcTestWithSystem (t , testFn , feedTestForceSink ("sinkless" ))
6480
+ })
6467
6481
}
6468
6482
6469
6483
func TestChangefeedRetryableError (t * testing.T ) {
0 commit comments