@@ -28,6 +28,7 @@ import (
2828 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2929 "github.com/cockroachdb/cockroach/pkg/jobs"
3030 "github.com/cockroachdb/cockroach/pkg/roachprod/install"
31+ "github.com/cockroachdb/cockroach/pkg/roachprod/logger"
3132 "github.com/cockroachdb/cockroach/pkg/util/randutil"
3233 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
3334 "github.com/cockroachdb/cockroach/pkg/workload/histogram"
@@ -293,7 +294,7 @@ func runCDCBenchScan(
293294 // Wait for the changefeed to complete, and compute throughput.
294295 m .Go (func (ctx context.Context ) error {
295296 t .L ().Printf ("waiting for changefeed to finish" )
296- info , err := waitForChangefeed (ctx , conn , jobID , func (info changefeedInfo ) (bool , error ) {
297+ info , err := waitForChangefeed (ctx , conn , jobID , t . L (), func (info changefeedInfo ) (bool , error ) {
297298 switch jobs .Status (info .status ) {
298299 case jobs .StatusSucceeded :
299300 return true , nil
@@ -444,7 +445,7 @@ func runCDCBenchWorkload(
444445 // the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
445446 // ranges it was found to sometimes lag by over 8 minutes.
446447 m .Go (func (ctx context.Context ) error {
447- info , err := waitForChangefeed (ctx , conn , jobID , func (info changefeedInfo ) (bool , error ) {
448+ info , err := waitForChangefeed (ctx , conn , jobID , t . L (), func (info changefeedInfo ) (bool , error ) {
448449 switch jobs .Status (info .status ) {
449450 case jobs .StatusPending , jobs .StatusRunning :
450451 doneValue := done .Load ()
@@ -465,7 +466,7 @@ func runCDCBenchWorkload(
465466 now := timeutil .Now ()
466467 t .L ().Printf ("waiting for changefeed watermark to reach current time (%s)" ,
467468 now .Format (time .RFC3339 ))
468- info , err := waitForChangefeed (ctx , conn , jobID , func (info changefeedInfo ) (bool , error ) {
469+ info , err := waitForChangefeed (ctx , conn , jobID , t . L (), func (info changefeedInfo ) (bool , error ) {
469470 switch jobs .Status (info .status ) {
470471 case jobs .StatusPending , jobs .StatusRunning :
471472 return info .highwaterTime .After (now ), nil
@@ -539,11 +540,15 @@ func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []strin
539540
540541// waitForChangefeed waits until the changefeed satisfies the given closure.
541542func waitForChangefeed (
542- ctx context.Context , conn * gosql.DB , jobID int , f func (changefeedInfo ) (bool , error ),
543+ ctx context.Context ,
544+ conn * gosql.DB ,
545+ jobID int ,
546+ logger * logger.Logger ,
547+ f func (changefeedInfo ) (bool , error ),
543548) (changefeedInfo , error ) {
544549 ticker := time .NewTicker (5 * time .Second )
545550 defer ticker .Stop ()
546- for {
551+ for attempt := 0 ; ; attempt ++ {
547552 select {
548553 case <- ticker .C :
549554 case <- ctx .Done ():
@@ -552,7 +557,11 @@ func waitForChangefeed(
552557
553558 info , err := getChangefeedInfo (conn , jobID )
554559 if err != nil {
555- return changefeedInfo {}, err
560+ logger .Errorf ("error getting changefeed info: %v (attempt %d)" , err , attempt + 1 )
561+ if attempt > 5 {
562+ return changefeedInfo {}, errors .Wrap (err , "failed 5 attempts to get changefeed info" )
563+ }
564+ continue
556565 } else if info .errMsg != "" {
557566 return changefeedInfo {}, errors .Errorf ("changefeed error: %s" , info .errMsg )
558567 }
0 commit comments