@@ -2,8 +2,10 @@ package test_helpers
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "reflect"
8+ "sync"
79 "time"
810
911 "github.com/tarantool/go-tarantool/v2"
@@ -206,6 +208,50 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant
206208 return err
207209 }
208210
211+ checkRole := func (conn * tarantool.Connection , isReplica bool ) string {
212+ data , err := conn .Do (tarantool .NewCallRequest ("box.info" )).Get ()
213+ switch {
214+ case err != nil :
215+ return fmt .Sprintf ("failed to get box.info: %s" , err )
216+ case len (data ) < 1 :
217+ return "box.info is empty"
218+ }
219+
220+ boxInfo , ok := data [0 ].(map [interface {}]interface {})
221+ if ! ok {
222+ return "unexpected type in box.info response"
223+ }
224+
225+ status , statusFound := boxInfo ["status" ]
226+ readonly , readonlyFound := boxInfo ["ro" ]
227+ switch {
228+ case ! statusFound :
229+ return "box.info.status is missing"
230+ case status != "running" :
231+ return fmt .Sprintf ("box.info.status='%s' (waiting for 'running')" , status )
232+ case ! readonlyFound :
233+ return "box.info.ro is missing"
234+ case readonly != isReplica :
235+ return fmt .Sprintf ("box.info.ro='%v' (waiting for '%v')" , readonly , isReplica )
236+ default :
237+ return ""
238+ }
239+ }
240+
241+ problem := "not checked yet"
242+
243+ // Wait for the role to be applied.
244+ for len (problem ) != 0 {
245+ select {
246+ case <- time .After (10 * time .Millisecond ):
247+ case <- ctx .Done ():
248+ return fmt .Errorf ("%w: failed to apply role, the last problem: %s" ,
249+ ctx .Err (), problem )
250+ }
251+
252+ problem = checkRole (conn , isReplica )
253+ }
254+
209255 return nil
210256}
211257
@@ -215,16 +261,23 @@ func SetClusterRO(dialers []tarantool.Dialer, connOpts tarantool.Opts,
215261 return fmt .Errorf ("number of servers should be equal to number of roles" )
216262 }
217263
264+ ctx , cancel := GetConnectContext ()
265+ defer cancel ()
266+
267+ // Apply roles in parallel.
268+ errs := make ([]error , len (dialers ))
269+ var wg sync.WaitGroup
270+ wg .Add (len (dialers ))
218271 for i , dialer := range dialers {
219- ctx , cancel := GetConnectContext ()
220- err := SetInstanceRO (ctx , dialer , connOpts , roles [i ])
221- cancel ()
222- if err != nil {
223- return err
224- }
272+ // Pass loop variables to avoid its closure.
273+ go func (i int , dialer tarantool.Dialer ) {
274+ defer wg .Done ()
275+ errs [i ] = SetInstanceRO (ctx , dialer , connOpts , roles [i ])
276+ }(i , dialer )
225277 }
278+ wg .Wait ()
226279
227- return nil
280+ return errors . Join ( errs ... )
228281}
229282
230283func StartTarantoolInstances (instsOpts []StartOpts ) ([]* TarantoolInstance , error ) {
0 commit comments