Skip to content

Commit 6fa90e9

Browse files
craig[bot]Jeremyyang920
andcommitted
108514: awsdms: use custom status checker r=otan a=Jeremyyang920 This commit switches the dms test to use a custom status checker instead of the one from the SDK. The way the SDK checks for status changes has a likely chance of a race condition which causes it to return an error when there is really no error. We now use a custom method to do the status checks which does an explicit status check each time the describe task api is called so we are always checking the latest result compared to the way the sdk was doing it. Fixes: cockroachdb#108270 Release note: None Co-authored-by: Jeremy Yang <[email protected]>
2 parents 4af8a54 + af2dc5a commit 6fa90e9

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

pkg/cmd/roachtest/tests/awsdms.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask(
871871
return err
872872
}
873873
t.L().Printf("waiting for replication task to be ready")
874-
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
874+
input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName)
875+
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil {
875876
return err
876877
}
877878

@@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask(
899900
}
900901

901902
t.L().Printf("waiting for replication task to be running")
902-
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
903-
ctx,
904-
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
905-
awsdmsWaitTimeLimit,
906-
); err != nil {
903+
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil {
907904
return err
908905
}
909906
}
910907
return nil
911908
}
912909

910+
func dmsTaskStatusChecker(
911+
ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string,
912+
) error {
913+
closer := make(chan struct{})
914+
r := retry.StartWithCtx(ctx, retry.Options{
915+
InitialBackoff: 10 * time.Second,
916+
MaxBackoff: 30 * time.Second,
917+
Closer: closer,
918+
})
919+
timeout := time.After(awsdmsWaitTimeLimit)
920+
for r.Next() {
921+
select {
922+
case <-timeout:
923+
close(closer)
924+
// Since we only ever have a unique task returned per filter,
925+
// it should be safe to direct index for the task name.
926+
return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status)
927+
default:
928+
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input)
929+
if err != nil {
930+
return err
931+
}
932+
for _, task := range dmsTasks.ReplicationTasks {
933+
// If we match the status we want, close the retry and exit.
934+
if *task.Status == status {
935+
close(closer)
936+
}
937+
}
938+
}
939+
}
940+
return nil
941+
}
942+
913943
func isDMSResourceNotFound(err error) bool {
914944
return errors.HasType(err, &dmstypes.ResourceNotFoundFault{})
915945
}

0 commit comments

Comments
 (0)