@@ -35,7 +35,6 @@ import (
35
35
"strconv"
36
36
"strings"
37
37
"sync"
38
- "sync/atomic"
39
38
"time"
40
39
41
40
"github.com/IBM/sarama"
@@ -876,7 +875,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster, cfg cdcBank
876
875
`SET CLUSTER SETTING changefeed.span_checkpoint.lag_threshold = '1us'` ,
877
876
} {
878
877
if _ , err := db .Exec (stmt ); err != nil {
879
- t .Fatal (err )
878
+ t .Fatal (fmt . Sprintf ( "failed to execute stmt %q: %v" , stmt , err ) )
880
879
}
881
880
}
882
881
}
@@ -910,33 +909,35 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster, cfg cdcBank
910
909
defer l .Close ()
911
910
912
911
t .Status ("running workload" )
913
- workloadCtx , workloadCancel := context .WithCancel (ctx )
914
- defer workloadCancel ()
915
912
916
- m := c .NewMonitor (workloadCtx , crdbNodes )
917
- var doneAtomic int64
918
913
messageBuf := make (chan * sarama.ConsumerMessage , 4096 )
919
914
const requestedResolved = 100
920
- if cfg .kafkaChaos {
921
- m .Go (func (ctx context.Context ) error {
915
+
916
+ m := c .NewMonitor (ctx , crdbNodes )
917
+ chaosCancel := func () func () {
918
+ if ! cfg .kafkaChaos {
919
+ return func () {}
920
+ }
921
+ return m .GoWithCancel (func (ctx context.Context ) error {
922
922
period , downTime := time .Minute , 10 * time .Second
923
923
err := kafka .chaosLoop (ctx , period , downTime , nil )
924
- if atomic . LoadInt64 ( & doneAtomic ) > 0 {
924
+ if errors . Is ( err , context . Canceled ) {
925
925
return nil
926
926
}
927
927
return errors .Wrap (err , "kafka chaos loop failed" )
928
928
})
929
- }
930
- m . Go (func (ctx context.Context ) error {
929
+ }()
930
+ workloadCancel := m . GoWithCancel (func (ctx context.Context ) error {
931
931
err := c .RunE (ctx , option .WithNodes (workloadNode ), `./cockroach workload run bank {pgurl:1} --max-rate=10` )
932
- if atomic . LoadInt64 ( & doneAtomic ) > 0 {
932
+ if errors . Is ( err , context . Canceled ) {
933
933
return nil
934
934
}
935
935
return errors .Wrap (err , "workload failed" )
936
936
})
937
937
m .Go (func (ctx context.Context ) error {
938
+ defer chaosCancel ()
938
939
defer workloadCancel ()
939
- defer func () { close (messageBuf ) }( )
940
+ defer close (messageBuf )
940
941
v := cdctest .NewCountValidator (cdctest .NoOpValidator )
941
942
for {
942
943
m , err := tc .next (ctx )
@@ -961,7 +962,6 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster, cfg cdcBank
961
962
l .Printf ("%d of %d resolved timestamps received from kafka, latest is %s behind realtime, %s beind realtime when sent to kafka" ,
962
963
v .NumResolvedWithRows , requestedResolved , timeutil .Since (resolved .GoTime ()), m .Timestamp .Sub (resolved .GoTime ()))
963
964
if v .NumResolvedWithRows >= requestedResolved {
964
- atomic .StoreInt64 (& doneAtomic , 1 )
965
965
break
966
966
}
967
967
}
@@ -3471,9 +3471,17 @@ var kafkaServices = map[string][]string{
3471
3471
}
3472
3472
3473
3473
func (k kafkaManager ) restart (ctx context.Context , targetService string , envVars ... string ) {
3474
+ if err := k .restartE (ctx , targetService , envVars ... ); err != nil {
3475
+ k .t .Fatal (err )
3476
+ }
3477
+ }
3478
+
3479
+ func (k kafkaManager ) restartE (ctx context.Context , targetService string , envVars ... string ) error {
3474
3480
services := kafkaServices [targetService ]
3475
3481
3476
- k .c .Run (ctx , option .WithNodes (k .kafkaSinkNodes ), "touch" , k .serverJAASConfig ())
3482
+ if err := k .c .RunE (ctx , option .WithNodes (k .kafkaSinkNodes ), "touch" , k .serverJAASConfig ()); err != nil {
3483
+ return err
3484
+ }
3477
3485
for _ , svcName := range services {
3478
3486
// The confluent tool applies the KAFKA_OPTS to all
3479
3487
// services. Also, the kafka.logs.dir is used by each
@@ -3492,12 +3500,21 @@ func (k kafkaManager) restart(ctx context.Context, targetService string, envVars
3492
3500
startCmd += fmt .Sprintf (" %s local services %s start" , k .confluentBin (), svcName )
3493
3501
3494
3502
// Sometimes kafka wants to be difficult and not start back up first try. Give it some time.
3495
- k .c .Run (ctx , option .WithNodes (k .kafkaSinkNodes ).WithRetryOpts (retry.Options {
3496
- InitialBackoff : 5 * time .Second ,
3497
- MaxBackoff : 30 * time .Second ,
3498
- MaxRetries : 30 ,
3499
- }).WithShouldRetryFn (func (* install.RunResultDetails ) bool { return true }), startCmd )
3503
+ if err := k .c .RunE (ctx , option .
3504
+ WithNodes (k .kafkaSinkNodes ).
3505
+ WithRetryOpts (retry.Options {
3506
+ InitialBackoff : 5 * time .Second ,
3507
+ MaxBackoff : 30 * time .Second ,
3508
+ MaxRetries : 30 ,
3509
+ }).
3510
+ WithShouldRetryFn (func (* install.RunResultDetails ) bool { return true }),
3511
+ startCmd ,
3512
+ ); err != nil {
3513
+ return err
3514
+ }
3500
3515
}
3516
+
3517
+ return nil
3501
3518
}
3502
3519
3503
3520
func (k kafkaManager ) makeCommand (exe string , args ... string ) string {
@@ -3509,8 +3526,19 @@ func (k kafkaManager) makeCommand(exe string, args ...string) string {
3509
3526
}
3510
3527
3511
3528
func (k kafkaManager ) stop (ctx context.Context ) {
3512
- k .c .Run (ctx , option .WithNodes (k .kafkaSinkNodes ), fmt .Sprintf ("rm -f %s" , k .serverJAASConfig ()))
3513
- k .c .Run (ctx , option .WithNodes (k .kafkaSinkNodes ), k .makeCommand ("confluent" , "local services stop" ))
3529
+ if err := k .stopE (ctx ); err != nil {
3530
+ k .t .Fatal (err )
3531
+ }
3532
+ }
3533
+
3534
+ func (k kafkaManager ) stopE (ctx context.Context ) error {
3535
+ if err := k .c .RunE (ctx , option .WithNodes (k .kafkaSinkNodes ), fmt .Sprintf ("rm -f %s" , k .serverJAASConfig ())); err != nil {
3536
+ return err
3537
+ }
3538
+ if err := k .c .RunE (ctx , option .WithNodes (k .kafkaSinkNodes ), k .makeCommand ("confluent" , "local services stop" )); err != nil {
3539
+ return err
3540
+ }
3541
+ return nil
3514
3542
}
3515
3543
3516
3544
func (k kafkaManager ) chaosLoop (
@@ -3528,7 +3556,9 @@ func (k kafkaManager) chaosLoop(
3528
3556
}
3529
3557
3530
3558
k .t .L ().Printf ("kafka chaos loop iteration %d: stopping" , i )
3531
- k .stop (ctx )
3559
+ if err := k .stopE (ctx ); err != nil {
3560
+ return err
3561
+ }
3532
3562
3533
3563
select {
3534
3564
case <- stopper :
@@ -3539,7 +3569,9 @@ func (k kafkaManager) chaosLoop(
3539
3569
}
3540
3570
3541
3571
k .t .L ().Printf ("kafka chaos loop iteration %d: restarting" , i )
3542
- k .restart (ctx , "kafka" )
3572
+ if err := k .restartE (ctx , "kafka" ); err != nil {
3573
+ return err
3574
+ }
3543
3575
}
3544
3576
}
3545
3577
0 commit comments