Skip to content

Commit e6be62f

Browse files
committed
WIP
1 parent c72ee5d commit e6be62f

File tree

4 files changed

+230
-106
lines changed

4 files changed

+230
-106
lines changed

common/types/response.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"net/http"
55

66
"github.com/gin-gonic/gin"
7+
"github.com/mitchellh/mapstructure"
78
)
89

910
// Response the response schema
@@ -13,6 +14,10 @@ type Response struct {
1314
Data interface{} `json:"data"`
1415
}
1516

17+
func (resp *Response) DecodeData(out interface{}) error {
18+
return mapstructure.Decode(resp.Data, out)
19+
}
20+
1621
// RenderJSON renders response with json
1722
func RenderJSON(ctx *gin.Context, errCode int, err error, data interface{}) {
1823
var errMsg string

coordinator/internal/controller/proxy/client.go

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,36 +11,27 @@ import (
1111
ctypes "scroll-tech/common/types"
1212
"scroll-tech/coordinator/internal/config"
1313
"scroll-tech/coordinator/internal/types"
14-
15-
"github.com/mitchellh/mapstructure"
1614
)
1715

18-
type ClientHelper interface {
19-
GenLoginParam(string) (*types.LoginParameter, error)
20-
OnResp(*upClient, *http.Response)
21-
}
22-
2316
// Client wraps an http client with a preset host for coordinator API calls
2417
type upClient struct {
2518
httpClient *http.Client
2619
baseURL string
2720
loginToken string
28-
helper ClientHelper
2921
}
3022

3123
// NewClient creates a new Client with the specified host
32-
func newUpClient(cfg *config.UpStream, helper ClientHelper) *upClient {
24+
func newUpClient(cfg *config.UpStream) *upClient {
3325
return &upClient{
3426
httpClient: &http.Client{
3527
Timeout: time.Duration(cfg.ConnectionTimeoutSec) * time.Second,
3628
},
3729
baseURL: cfg.BaseUrl,
38-
helper: helper,
3930
}
4031
}
4132

4233
// FullLogin performs the complete login process: get challenge then login
43-
func (c *upClient) Login(ctx context.Context) (*types.LoginSchema, error) {
34+
func (c *upClient) Login(ctx context.Context, genLogin func(string) (*types.LoginParameter, error)) (*types.LoginSchema, error) {
4435
// Step 1: Get challenge
4536
url := fmt.Sprintf("%s/coordinator/v1/challenge", c.baseURL)
4637

@@ -68,7 +59,7 @@ func (c *upClient) Login(ctx context.Context) (*types.LoginSchema, error) {
6859
// Step 3: Use the token from challenge as Bearer token for login
6960
url = fmt.Sprintf("%s/coordinator/v1/login", c.baseURL)
7061

71-
param, err := c.helper.GenLoginParam(loginSchema.Token)
62+
param, err := genLogin(loginSchema.Token)
7263
if err != nil {
7364
return nil, fmt.Errorf("failed to setup login parameter: %w", err)
7465
}
@@ -91,28 +82,38 @@ func (c *upClient) Login(ctx context.Context) (*types.LoginSchema, error) {
9182
return nil, fmt.Errorf("failed to perform login request: %w", err)
9283
}
9384

94-
// Parse login response as LoginSchema and store the token
95-
if loginResp.StatusCode == http.StatusOK {
85+
parsedResp, err := handleHttpResp(loginResp)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
var loginResult types.LoginSchema
91+
err = parsedResp.DecodeData(&loginResult)
92+
if err != nil {
93+
return nil, fmt.Errorf("login parsing data fail: %v", err)
94+
}
95+
c.loginToken = loginResult.Token
96+
return &loginResult, nil
97+
98+
}
99+
100+
func handleHttpResp(resp *http.Response) (*ctypes.Response, error) {
101+
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusUnauthorized {
102+
defer resp.Body.Close()
96103
var respWithData ctypes.Response
97104
// Note: Body is consumed after decoding, caller should not read it again
98-
if err := json.NewDecoder(loginResp.Body).Decode(&respWithData); err == nil {
99-
var loginResult types.LoginSchema
100-
err = mapstructure.Decode(respWithData.Data, &loginResult)
101-
if err != nil {
102-
return nil, fmt.Errorf("login parsing data fail, get %v", respWithData.Data)
103-
}
104-
c.loginToken = loginResult.Token
105-
return &loginResult, nil
105+
if err := json.NewDecoder(resp.Body).Decode(&respWithData); err == nil {
106+
return &respWithData, nil
106107
} else {
107-
return nil, fmt.Errorf("login parsing response failed: %v", err)
108+
return nil, fmt.Errorf("login parsing expected response failed: %v", err)
108109
}
109-
}
110110

111-
return nil, fmt.Errorf("login request failed with status: %d", loginResp.StatusCode)
111+
}
112+
return nil, fmt.Errorf("login request failed with status: %d", resp.StatusCode)
112113
}
113114

114115
// ProxyLogin makes a POST request to /v1/proxy_login with LoginParameter
115-
func (c *upClient) ProxyLogin(ctx context.Context, param *types.LoginParameter) (*types.LoginSchema, error) {
116+
func (c *upClient) ProxyLogin(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error) {
116117
url := fmt.Sprintf("%s/coordinator/v1/proxy_login", c.baseURL)
117118

118119
jsonData, err := json.Marshal(param)
@@ -132,26 +133,11 @@ func (c *upClient) ProxyLogin(ctx context.Context, param *types.LoginParameter)
132133
if err != nil {
133134
return nil, fmt.Errorf("failed to perform proxy login request: %w", err)
134135
}
135-
defer proxyLoginResp.Body.Close()
136-
137-
// Call helper's OnResp method with the response
138-
c.helper.OnResp(c, proxyLoginResp)
139-
140-
// Parse proxy login response as LoginSchema
141-
if proxyLoginResp.StatusCode == http.StatusOK {
142-
var loginResult types.LoginSchema
143-
if err := json.NewDecoder(proxyLoginResp.Body).Decode(&loginResult); err == nil {
144-
return &loginResult, nil
145-
}
146-
// If parsing fails, still return success but with nil result
147-
return nil, nil
148-
}
149-
150-
return nil, fmt.Errorf("proxy login request failed with status: %d", proxyLoginResp.StatusCode)
136+
return handleHttpResp(proxyLoginResp)
151137
}
152138

153139
// GetTask makes a POST request to /v1/get_task with GetTaskParameter
154-
func (c *upClient) GetTask(ctx context.Context, param *types.GetTaskParameter, token string) (*http.Response, error) {
140+
func (c *upClient) GetTask(ctx context.Context, param *types.GetTaskParameter, token string) (*ctypes.Response, error) {
155141
url := fmt.Sprintf("%s/coordinator/v1/get_task", c.baseURL)
156142

157143
jsonData, err := json.Marshal(param)
@@ -169,11 +155,15 @@ func (c *upClient) GetTask(ctx context.Context, param *types.GetTaskParameter, t
169155
req.Header.Set("Authorization", "Bearer "+token)
170156
}
171157

172-
return c.httpClient.Do(req)
158+
resp, err := c.httpClient.Do(req)
159+
if err != nil {
160+
return nil, err
161+
}
162+
return handleHttpResp(resp)
173163
}
174164

175165
// SubmitProof makes a POST request to /v1/submit_proof with SubmitProofParameter
176-
func (c *upClient) SubmitProof(ctx context.Context, param *types.SubmitProofParameter, token string) (*http.Response, error) {
166+
func (c *upClient) SubmitProof(ctx context.Context, param *types.SubmitProofParameter, token string) (*ctypes.Response, error) {
177167
url := fmt.Sprintf("%s/coordinator/v1/submit_proof", c.baseURL)
178168

179169
jsonData, err := json.Marshal(param)
@@ -191,5 +181,9 @@ func (c *upClient) SubmitProof(ctx context.Context, param *types.SubmitProofPara
191181
req.Header.Set("Authorization", "Bearer "+token)
192182
}
193183

194-
return c.httpClient.Do(req)
184+
resp, err := c.httpClient.Do(req)
185+
if err != nil {
186+
return nil, err
187+
}
188+
return handleHttpResp(resp)
195189
}

coordinator/internal/controller/proxy/client_manager.go

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"fmt"
7-
"net/http"
87
"sync"
98
"time"
109

@@ -18,7 +17,7 @@ import (
1817

1918
type Client interface {
2019
Client(context.Context) *upClient
21-
PeekClient() *upClient
20+
Reset(cli *upClient)
2221
}
2322

2423
type ClientManager struct {
@@ -78,7 +77,7 @@ func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) ti
7877

7978
for {
8079
log.Info("attempting login to upstream coordinator", "name", cliMgr.name)
81-
loginResult, err := loginCli.Login(ctx)
80+
loginResult, err := loginCli.Login(ctx, cliMgr.genLoginParam)
8281
if err == nil && loginResult != nil {
8382
log.Info("login to upstream coordinator successful", "name", cliMgr.name, "time", loginResult.Time)
8483
return loginResult.Time
@@ -96,11 +95,13 @@ func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) ti
9695
}
9796
}
9897

99-
func (cliMgr *ClientManager) PeekClient() *upClient {
100-
cliMgr.cachedCli.RLock()
101-
defer cliMgr.cachedCli.RUnlock()
102-
103-
return cliMgr.cachedCli.cli
98+
func (cliMgr *ClientManager) Reset(cli *upClient) {
99+
cliMgr.cachedCli.Lock()
100+
if cliMgr.cachedCli.cli == cli {
101+
cliMgr.cachedCli.cli = nil
102+
}
103+
cliMgr.cachedCli.Unlock()
104+
log.Info("cached client cleared", "name", cliMgr.name)
104105
}
105106

106107
func (cliMgr *ClientManager) Client(ctx context.Context) *upClient {
@@ -124,51 +125,52 @@ func (cliMgr *ClientManager) Client(ctx context.Context) *upClient {
124125
} else {
125126
// Set new completion context and launch login goroutine
126127
ctx, completionDone := context.WithCancel(context.TODO())
127-
loginCli := newUpClient(cliMgr.cfg, cliMgr)
128+
loginCli := newUpClient(cliMgr.cfg)
128129
cliMgr.cachedCli.completionCtx = context.WithValue(ctx, "cli", loginCli)
129130

130-
// Launch login goroutine
131+
// Launch keep-login goroutine
131132
go func() {
132133
defer completionDone()
133134
expiredT := cliMgr.doLogin(context.Background(), loginCli)
135+
log.Info("login compeleted", "name", cliMgr.name, "expired", expiredT)
134136

135137
cliMgr.cachedCli.Lock()
136138
cliMgr.cachedCli.cli = loginCli
137139
cliMgr.cachedCli.completionCtx = nil
138140

139141
// Launch waiting thread to clear cached client before expiration
140-
go func() {
141-
now := time.Now()
142-
clearTime := expiredT.Add(-10 * time.Second) // 10s before expiration
143-
144-
// If clear time is too soon (less than 10s from now), set it to 10s from now
145-
if clearTime.Before(now.Add(10 * time.Second)) {
146-
clearTime = now.Add(10 * time.Second)
147-
log.Error("token expiration time is too close, delaying clear time",
148-
"name", cliMgr.name,
149-
"expiredT", expiredT,
150-
"adjustedClearTime", clearTime)
151-
}
152-
153-
waitDuration := time.Until(clearTime)
154-
log.Info("token expiration monitor started",
155-
"name", cliMgr.name,
156-
"expiredT", expiredT,
157-
"clearTime", clearTime,
158-
"waitDuration", waitDuration)
159-
160-
timer := time.NewTimer(waitDuration)
161-
select {
162-
case <-ctx.Done():
163-
timer.Stop()
164-
log.Info("token expiration monitor cancelled", "name", cliMgr.name)
165-
case <-timer.C:
166-
log.Info("clearing cached client before token expiration",
167-
"name", cliMgr.name,
168-
"expiredT", expiredT)
169-
cliMgr.clearCachedCli(loginCli)
170-
}
171-
}()
142+
// go func() {
143+
// now := time.Now()
144+
// clearTime := expiredT.Add(-10 * time.Second) // 10s before expiration
145+
146+
// // If clear time is too soon (less than 10s from now), set it to 10s from now
147+
// if clearTime.Before(now.Add(10 * time.Second)) {
148+
// clearTime = now.Add(10 * time.Second)
149+
// log.Error("token expiration time is too close, delaying clear time",
150+
// "name", cliMgr.name,
151+
// "expiredT", expiredT,
152+
// "adjustedClearTime", clearTime)
153+
// }
154+
155+
// waitDuration := time.Until(clearTime)
156+
// log.Info("token expiration monitor started",
157+
// "name", cliMgr.name,
158+
// "expiredT", expiredT,
159+
// "clearTime", clearTime,
160+
// "waitDuration", waitDuration)
161+
162+
// timer := time.NewTimer(waitDuration)
163+
// select {
164+
// case <-ctx.Done():
165+
// timer.Stop()
166+
// log.Info("token expiration monitor cancelled", "name", cliMgr.name)
167+
// case <-timer.C:
168+
// log.Info("clearing cached client before token expiration",
169+
// "name", cliMgr.name,
170+
// "expiredT", expiredT)
171+
// cliMgr.clearCachedCli(loginCli)
172+
// }
173+
// }()
172174

173175
cliMgr.cachedCli.Unlock()
174176

@@ -186,24 +188,7 @@ func (cliMgr *ClientManager) Client(ctx context.Context) *upClient {
186188
}
187189
}
188190

189-
func (cliMgr *ClientManager) clearCachedCli(cli *upClient) {
190-
cliMgr.cachedCli.Lock()
191-
if cliMgr.cachedCli.cli == cli {
192-
cliMgr.cachedCli.cli = nil
193-
cliMgr.cachedCli.completionCtx = nil
194-
log.Info("cached client cleared due to forbidden response", "name", cliMgr.name)
195-
}
196-
cliMgr.cachedCli.Unlock()
197-
}
198-
199-
func (cliMgr *ClientManager) OnResp(cli *upClient, resp *http.Response) {
200-
if resp.StatusCode == http.StatusForbidden {
201-
log.Info("cached client cleared due to forbidden response", "name", cliMgr.name)
202-
cliMgr.clearCachedCli(cli)
203-
}
204-
}
205-
206-
func (cliMgr *ClientManager) GenLoginParam(challenge string) (*types.LoginParameter, error) {
191+
func (cliMgr *ClientManager) genLoginParam(challenge string) (*types.LoginParameter, error) {
207192

208193
// Generate public key string
209194
publicKeyHex := common.Bytes2Hex(crypto.CompressPubkey(&cliMgr.privKey.PublicKey))

0 commit comments

Comments
 (0)