Skip to content

Commit e755165

Browse files
committed
fix concurrent issue
1 parent 20fde41 commit e755165

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

coordinator/internal/controller/proxy/auth.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package proxy
22

33
import (
4+
"context"
45
"fmt"
6+
"sync"
7+
"time"
58

69
jwt "github.com/appleboy/gin-jwt/v2"
710
"github.com/gin-gonic/gin"
@@ -21,6 +24,7 @@ type AuthController struct {
2124
proverMgr *ProverManager
2225
}
2326

27+
const upstreamConnTimeout = time.Second * 5
2428
const LoginParamCache = "login_param"
2529
const ProverTypesKey = "prover_types"
2630
const SignatureKey = "prover_signature"
@@ -59,17 +63,25 @@ func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
5963
session := a.proverMgr.GetOrCreate(loginParam.PublicKey)
6064
log.Debug("start handling login", "cli", loginParam.Message.ProverName)
6165

66+
loginCtx, cf := context.WithTimeout(context.Background(), upstreamConnTimeout)
67+
var wg sync.WaitGroup
6268
for _, cli := range a.clients {
63-
69+
wg.Add(1)
6470
go func(cli Client) {
65-
if err := session.ProxyLogin(c, cli, &loginParam.LoginParameter); err != nil {
71+
defer wg.Done()
72+
if err := session.ProxyLogin(loginCtx, cli, &loginParam.LoginParameter); err != nil {
6673
log.Error("proxy login failed during token cache update",
6774
"userKey", loginParam.PublicKey,
6875
"upstream", cli.Name(),
6976
"error", err)
7077
}
7178
}(cli)
7279
}
80+
go func(cliName string) {
81+
wg.Wait()
82+
cf()
83+
log.Debug("first login attempt has completed", "cli", cliName)
84+
}(loginParam.Message.ProverName)
7385

7486
return loginParam.LoginParameter, nil
7587
}

coordinator/internal/controller/proxy/client_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) {
123123
func (cliMgr *ClientManager) Reset(cli *upClient) {
124124
cliMgr.cachedCli.Lock()
125125
if cliMgr.cachedCli.cli == cli {
126+
log.Info("cached client cleared", "name", cliMgr.name)
126127
cliMgr.cachedCli.cli = nil
127128
}
128129
cliMgr.cachedCli.Unlock()
129-
log.Info("cached client cleared", "name", cliMgr.name)
130130
}
131131

132132
func (cliMgr *ClientManager) Name() string {

coordinator/internal/controller/proxy/prover_session.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func (c *proverSession) maintainLogin(ctx context.Context, cliMgr Client, up str
162162
}
163163

164164
if resp.ErrCode == ctypes.ErrJWTTokenExpired {
165+
log.Info("up stream has expired, renew upstream connection", "up", up)
165166
cliMgr.Reset(cli)
166167
cli = cliMgr.Client(ctx)
167168
if cli == nil {

0 commit comments

Comments
 (0)