diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 1f3a294f7f..418670f39f 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -118,15 +118,46 @@ func (dir *RegistryDirectory) Subscribe(url *common.URL) error { logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) } + }() + + // Get the timeout time from the registration center configuration (default time 5s) + registerUrl := dir.registry.GetURL() + + var timeoutStr string + + if registerUrl != nil { + if val := registerUrl.GetParam(constant.RegistryTimeoutKey, ""); val != "" { + timeoutStr = val + } + } + + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + logger.Warnf("Invalid timeout value %s, using default %s", timeoutStr, constant.DefaultRegTimeout) + timeout, _ = time.ParseDuration(constant.DefaultRegTimeout) + } + + done := make(chan struct{}) + + go func() { urlToReg := getConsumerUrlToRegistry(url) err := dir.registry.Register(urlToReg) if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", url.String(), dir.registry.GetURL().String(), err.Error()) } + + close(done) }() - return nil + select { + case <-done: + logger.Infof("register completed successfully for service: %s", url.Key()) + return nil + case <-time.After(timeout): + logger.Errorf("register timed out for service: %s", url.Key()) + return fmt.Errorf("register timed out for service: %s", url.Key()) + } } // Notify monitor changes from registry,and update the cacheServices