Skip to content

Commit 78dbe6c

Browse files
committed
controller WIP
1 parent 9df6429 commit 78dbe6c

File tree

5 files changed

+216
-390
lines changed

5 files changed

+216
-390
lines changed

coordinator/internal/controller/proxy/auth.go

Lines changed: 16 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package proxy
22

33
import (
44
"fmt"
5-
"sync"
65

7-
"context"
86
"time"
97

108
jwt "github.com/appleboy/gin-jwt/v2"
@@ -20,192 +18,30 @@ import (
2018

2119
// AuthController is login API
2220
type AuthController struct {
23-
apiLogin *api.AuthController
24-
clients Clients
25-
userTokenCache *UserTokenCache
26-
}
27-
28-
type TokenUpdate struct {
29-
PublicKey string
30-
Upstream string
31-
Phase uint
32-
LoginParam types.LoginParameter
33-
CompleteNotify chan<- *types.LoginSchema
34-
}
35-
36-
type UpstreamTokens struct {
37-
LoginData map[string]*types.LoginSchema
38-
LoginPhase uint
39-
NextLoginPhase uint
40-
}
41-
42-
type UserTokenCache struct {
43-
sync.RWMutex
44-
data map[string]UpstreamTokens
45-
tokenCacheUpdate chan<- *TokenUpdate
46-
}
47-
48-
func newUserTokens() UpstreamTokens {
49-
return UpstreamTokens{
50-
LoginData: make(map[string]*types.LoginSchema),
51-
}
52-
}
53-
54-
func newUserCache(tokenCacheUpdate chan<- *TokenUpdate) *UserTokenCache {
55-
return &UserTokenCache{
56-
data: make(map[string]UpstreamTokens),
57-
tokenCacheUpdate: tokenCacheUpdate,
58-
}
59-
}
60-
61-
// get retrieves UpstreamTokens for a given user key, returns empty if still not exists
62-
func (c *UserTokenCache) Get(userKey string) *UpstreamTokens {
63-
c.RLock()
64-
defer c.RUnlock()
65-
66-
tokens, exists := c.data[userKey]
67-
if !exists {
68-
return nil
69-
}
70-
71-
return &tokens
72-
}
73-
74-
// prepare for a total update via Login request
75-
func (c *UserTokenCache) updatePrepare(userKey string) UpstreamTokens {
76-
c.Lock()
77-
defer c.Unlock()
78-
79-
if _, exists := c.data[userKey]; !exists {
80-
log.Info("initializing user token cache", "userKey", userKey)
81-
c.data[userKey] = newUserTokens()
82-
}
83-
updated := c.data[userKey]
84-
updated.NextLoginPhase = updated.LoginPhase + 1
85-
c.data[userKey] = updated
86-
return updated
87-
}
88-
89-
// partialSet updates a single entry in upstreamTokens for a given user
90-
func (c *UserTokenCache) partialSet(userKey string, upstreamName string, loginSchema *types.LoginSchema, phase uint) {
91-
c.Lock()
92-
defer c.Unlock()
93-
94-
// Get existing tokens or create new map
95-
tokens, exists := c.data[userKey]
96-
if exists && tokens.NextLoginPhase == phase {
97-
// Update the specific upstream entry
98-
tokens.LoginData[upstreamName] = loginSchema
99-
}
100-
}
101-
102-
// LoginParameterWithHardForkName constructs new payload for login
103-
type LoginParameterWithUpstreamTokens struct {
104-
*types.LoginParameter
105-
Tokens UpstreamTokens
21+
apiLogin *api.AuthController
22+
clients Clients
23+
proverMgr *ProverManager
10624
}
10725

10826
const upstreamConnTimeout = time.Second * 2
109-
const expireTolerant = 10 * time.Minute
11027
const LoginParamCache = "login_param"
11128
const ProverTypesKey = "prover_types"
11229
const SignatureKey = "prover_signature"
11330

11431
// NewAuthController returns an LoginController instance
115-
func NewAuthController(cfg *config.ProxyConfig, clients Clients, vf *verifier.Verifier) *AuthController {
32+
func NewAuthController(cfg *config.ProxyConfig, clients Clients, vf *verifier.Verifier, proverMgr *ProverManager) *AuthController {
11633

11734
loginLogic := auth.NewLoginLogicWithSimpleDEduplicator(cfg.ProxyManager.Verifier, vf)
11835

119-
// Create the token cache update channel
120-
tokenCacheUpdateChan := make(chan *TokenUpdate)
121-
12236
authController := &AuthController{
123-
apiLogin: api.NewAuthControllerWithLogic(loginLogic),
124-
clients: clients,
125-
userTokenCache: newUserCache(tokenCacheUpdateChan),
37+
apiLogin: api.NewAuthControllerWithLogic(loginLogic),
38+
clients: clients,
39+
proverMgr: proverMgr,
12640
}
12741

128-
// Launch token cache manager in a separate goroutine
129-
go authController.toeknCacheManager(tokenCacheUpdateChan)
130-
13142
return authController
13243
}
13344

134-
func (a *AuthController) TokenCache() *UserTokenCache { return a.userTokenCache }
135-
136-
func (a *AuthController) doUpdateRequest(ctx context.Context, req *TokenUpdate) (ret *types.LoginSchema) {
137-
if req.CompleteNotify != nil {
138-
defer func(ctx context.Context) {
139-
select {
140-
case <-ctx.Done():
141-
case req.CompleteNotify <- ret:
142-
}
143-
144-
}(ctx)
145-
}
146-
147-
cli := a.clients[req.Upstream]
148-
if cli := cli.Client(ctx); cli != nil {
149-
var err error
150-
if ret, err = cli.ProxyLogin(ctx, &req.LoginParam); err == nil {
151-
a.userTokenCache.partialSet(req.PublicKey, req.Upstream, ret, req.Phase)
152-
} else {
153-
log.Error("proxy login failed during token cache update",
154-
"userKey", req.PublicKey,
155-
"upstream", req.Upstream,
156-
"phase", req.Phase,
157-
"error", err)
158-
}
159-
}
160-
return
161-
162-
}
163-
164-
func (a *AuthController) toeknCacheManager(request <-chan *TokenUpdate) {
165-
166-
ctx := context.TODO()
167-
var managerStatusLock sync.Mutex
168-
managerStatus := make(map[string]map[string]uint)
169-
170-
for {
171-
req, ok := <-request
172-
if !ok {
173-
return
174-
}
175-
176-
// ensure the manager request is not outdated
177-
tokens := a.userTokenCache.Get(req.PublicKey)
178-
if tokens == nil {
179-
// Highly not possible, if raise, the reason is unknown, just log the Error
180-
continue
181-
}
182-
phase := tokens.NextLoginPhase
183-
if req.Phase < phase {
184-
// drop the out-dated request
185-
continue
186-
}
187-
188-
// ensure only one login request is launched for the same phase
189-
managerStatusLock.Lock()
190-
stat, ok := managerStatus[req.Upstream]
191-
if !ok {
192-
managerStatus[req.Upstream] = make(map[string]uint)
193-
stat = managerStatus[req.Upstream]
194-
}
195-
if phase, running := stat[req.PublicKey]; running && phase >= req.Phase {
196-
managerStatusLock.Unlock()
197-
continue
198-
} else {
199-
stat[req.PublicKey] = req.Phase
200-
}
201-
managerStatusLock.Unlock()
202-
203-
go a.doUpdateRequest(ctx, req)
204-
205-
}
206-
207-
}
208-
20945
// Login extended the Login hander in api controller
21046
func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
21147

@@ -219,54 +55,24 @@ func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
21955
return nil, fmt.Errorf("proxy do not support recursive login")
22056
}
22157

222-
tokens := a.userTokenCache.updatePrepare(loginParam.PublicKey)
223-
notifies := make([]chan *types.LoginSchema, len(a.clients))
224-
225-
for n := range a.clients {
226-
227-
// Check if we have a valid cached token that hasn't expired
228-
if knownEntry, existed := tokens.LoginData[n]; existed {
229-
timeRemaining := time.Until(knownEntry.Time)
230-
if timeRemaining > expireTolerant {
231-
// Token is still valid enouth, continue to next client
232-
continue
233-
}
234-
}
235-
236-
notify := make(chan *types.LoginSchema)
237-
notifies = append(notifies, notify)
238-
request := TokenUpdate{
239-
PublicKey: loginParam.PublicKey,
240-
Upstream: n,
241-
Phase: tokens.NextLoginPhase,
242-
LoginParam: loginParam.LoginParameter,
243-
CompleteNotify: notify,
244-
}
245-
defer close(notify)
246-
select {
247-
case <-c.Done():
248-
case a.userTokenCache.tokenCacheUpdate <- &request:
249-
}
58+
session := a.proverMgr.GetOrCreate(loginParam.PublicKey)
25059

251-
}
60+
for n, cli := range a.clients {
25261

253-
// collect all request's compeletions
254-
for _, chn := range notifies {
255-
select {
256-
case <-c.Done():
257-
case <-chn:
62+
if err := session.ProxyLogin(c, cli, n, &loginParam.LoginParameter); err != nil {
63+
log.Error("proxy login failed during token cache update",
64+
"userKey", loginParam.PublicKey,
65+
"upstream", n,
66+
"error", err)
25867
}
25968
}
26069

261-
return LoginParameterWithUpstreamTokens{
262-
LoginParameter: &loginParam.LoginParameter,
263-
Tokens: tokens,
264-
}, nil
70+
return loginParam.LoginParameter, nil
26571
}
26672

26773
// PayloadFunc returns jwt.MapClaims with {public key, prover name}.
26874
func (a *AuthController) PayloadFunc(data interface{}) jwt.MapClaims {
269-
v, ok := data.(LoginParameterWithUpstreamTokens)
75+
v, ok := data.(types.LoginParameter)
27076
if !ok {
27177
return jwt.MapClaims{}
27278
}
@@ -316,23 +122,6 @@ func (a *AuthController) IdentityHandler(c *gin.Context) interface{} {
316122
}
317123

318124
if loginParam.PublicKey != "" {
319-
// ensure tokenCache
320-
a.userTokenCache.RLock()
321-
_, exists := a.userTokenCache.data[loginParam.PublicKey]
322-
if !exists {
323-
a.userTokenCache.RUnlock()
324-
a.userTokenCache.Lock()
325-
if _, exists := a.userTokenCache.data[loginParam.PublicKey]; !exists {
326-
log.Info("creating token cache for user after proxy restart",
327-
"publicKey", loginParam.PublicKey,
328-
"proverName", loginParam.Message.ProverName,
329-
"reason", "prover using JWT token from before proxy restart")
330-
a.userTokenCache.data[loginParam.PublicKey] = newUserTokens()
331-
}
332-
a.userTokenCache.Unlock()
333-
} else {
334-
a.userTokenCache.RUnlock()
335-
}
336125

337126
c.Set(LoginParamCache, loginParam)
338127
return loginParam.PublicKey

coordinator/internal/controller/proxy/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package proxy
22

33
import (
4+
"github.com/prometheus/client_golang/prometheus"
45
"github.com/scroll-tech/go-ethereum/log"
56

67
"scroll-tech/coordinator/internal/config"
@@ -21,7 +22,7 @@ var (
2122
type Clients map[string]Client
2223

2324
// InitController inits Controller with database
24-
func InitController(cfg *config.ProxyConfig) {
25+
func InitController(cfg *config.ProxyConfig, reg prometheus.Registerer) {
2526
// normalize cfg
2627
cfg.ProxyManager.Normalize()
2728

@@ -42,7 +43,9 @@ func InitController(cfg *config.ProxyConfig) {
4243
clients[nm] = cli
4344
}
4445

45-
Auth = NewAuthController(cfg, clients, vf)
46-
// GetTask = NewGetTaskController(cfg, chainCfg, db, vf, reg)
47-
// SubmitProof = NewSubmitProofController(cfg, chainCfg, db, vf, reg)
46+
proverManager := NewProverManager()
47+
48+
Auth = NewAuthController(cfg, clients, vf, proverManager)
49+
GetTask = NewGetTaskController(cfg, clients, proverManager, reg)
50+
SubmitProof = NewSubmitProofController(cfg, clients, proverManager, reg)
4851
}

0 commit comments

Comments
 (0)