Skip to content

Commit 8a15836

Browse files
committed
add compatibile mode and more logs
1 parent 4365aaf commit 8a15836

File tree

10 files changed

+112
-46
lines changed

10 files changed

+112
-46
lines changed

coordinator/conf/config_proxy.json

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,7 @@
1111
},
1212
"verifier": {
1313
"min_prover_version": "v4.4.45",
14-
"verifiers": [
15-
{
16-
"assets_path": "assets",
17-
"fork_name": "euclidV2"
18-
},
19-
{
20-
"assets_path": "assets",
21-
"fork_name": "feynman"
22-
}
23-
]
14+
"verifiers": []
2415
}
2516
},
2617
"coordinators": {

coordinator/internal/config/proxy_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type UpStream struct {
3939
RetryCount uint `json:"retry_count"`
4040
RetryWaitTime uint `json:"retry_wait_time_sec"`
4141
ConnectionTimeoutSec uint `json:"connection_timeout_sec"`
42+
CompatibileMode bool `json:"compatibile_mode,omitempty"`
4243
}
4344

4445
// Config load configuration items.

coordinator/internal/controller/proxy/auth.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
6060
}
6161

6262
session := a.proverMgr.GetOrCreate(loginParam.PublicKey)
63+
log.Debug("start handling login", "cli", loginParam.Message.ProverName)
6364

6465
for n, cli := range a.clients {
6566

coordinator/internal/controller/proxy/client.go

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,20 @@ import (
88
"net/http"
99
"time"
1010

11+
"github.com/scroll-tech/go-ethereum/common"
12+
"github.com/scroll-tech/go-ethereum/crypto"
13+
1114
ctypes "scroll-tech/common/types"
1215
"scroll-tech/coordinator/internal/config"
1316
"scroll-tech/coordinator/internal/types"
1417
)
1518

1619
// Client wraps an http client with a preset host for coordinator API calls
1720
type upClient struct {
18-
httpClient *http.Client
19-
baseURL string
20-
loginToken string
21+
httpClient *http.Client
22+
baseURL string
23+
loginToken string
24+
compatibileMode bool
2125
}
2226

2327
// NewClient creates a new Client with the specified host
@@ -26,7 +30,8 @@ func newUpClient(cfg *config.UpStream) *upClient {
2630
httpClient: &http.Client{
2731
Timeout: time.Duration(cfg.ConnectionTimeoutSec) * time.Second,
2832
},
29-
baseURL: cfg.BaseUrl,
33+
baseURL: cfg.BaseUrl,
34+
compatibileMode: cfg.CompatibileMode,
3035
}
3136
}
3237

@@ -40,8 +45,8 @@ type loginSchema struct {
4045
Token string `json:"token"`
4146
}
4247

43-
// FullLogin performs the complete login process: get challenge then login
44-
func (c *upClient) Login(ctx context.Context, genLogin func(string) (*types.LoginParameter, error)) (*types.LoginSchema, error) {
48+
// Login performs the complete login process: get challenge then login
49+
func (c *upClient) Login(ctx context.Context, genLogin func(string) (*types.LoginParameter, error)) (*ctypes.Response, error) {
4550
// Step 1: Get challenge
4651
url := fmt.Sprintf("%s/coordinator/v1/challenge", c.baseURL)
4752

@@ -93,26 +98,7 @@ func (c *upClient) Login(ctx context.Context, genLogin func(string) (*types.Logi
9398
if err != nil {
9499
return nil, fmt.Errorf("failed to perform login request: %w", err)
95100
}
96-
97-
parsedResp, err = handleHttpResp(loginResp)
98-
if err != nil {
99-
return nil, err
100-
} else if parsedResp.ErrCode != 0 {
101-
return nil, fmt.Errorf("login failed: %d (%s)", parsedResp.ErrCode, parsedResp.ErrMsg)
102-
}
103-
104-
var loginResult loginSchema
105-
err = parsedResp.DecodeData(&loginResult)
106-
if err != nil {
107-
return nil, fmt.Errorf("login parsing data fail: %v", err)
108-
}
109-
c.loginToken = loginResult.Token
110-
111-
// TODO: we need to parse time if we start making use of it
112-
113-
return &types.LoginSchema{
114-
Token: loginResult.Token,
115-
}, nil
101+
return handleHttpResp(loginResp)
116102
}
117103

118104
func handleHttpResp(resp *http.Response) (*ctypes.Response, error) {
@@ -130,8 +116,40 @@ func handleHttpResp(resp *http.Response) (*ctypes.Response, error) {
130116
return nil, fmt.Errorf("login request failed with status: %d", resp.StatusCode)
131117
}
132118

119+
func (c *upClient) proxyLoginCompatibleMode(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error) {
120+
mimePrivK, err := buildPrivateKey([]byte(param.PublicKey))
121+
if err != nil {
122+
return nil, err
123+
}
124+
mimePkHex := common.Bytes2Hex(crypto.CompressPubkey(&mimePrivK.PublicKey))
125+
126+
genLoginParam := func(challenge string) (*types.LoginParameter, error) {
127+
128+
// Create login parameter with proxy settings
129+
loginParam := &types.LoginParameter{
130+
Message: param.Message,
131+
PublicKey: mimePkHex,
132+
}
133+
loginParam.Message.Challenge = challenge
134+
135+
// Sign the message with the private key
136+
if err := loginParam.SignWithKey(mimePrivK); err != nil {
137+
return nil, fmt.Errorf("failed to sign login parameter: %w", err)
138+
}
139+
140+
return loginParam, nil
141+
}
142+
143+
return c.Login(ctx, genLoginParam)
144+
}
145+
133146
// ProxyLogin makes a POST request to /v1/proxy_login with LoginParameter
134147
func (c *upClient) ProxyLogin(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error) {
148+
149+
if c.compatibileMode {
150+
return c.proxyLoginCompatibleMode(ctx, param)
151+
}
152+
135153
url := fmt.Sprintf("%s/coordinator/v1/proxy_login", c.baseURL)
136154

137155
jsonData, err := json.Marshal(param)

coordinator/internal/controller/proxy/client_manager.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,13 @@ func NewClientManager(name string, cliCfg *config.ProxyClient, cfg *config.UpStr
6767
}, nil
6868
}
6969

70-
func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) time.Time {
70+
func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) {
71+
if cliMgr.cfg.CompatibileMode {
72+
loginCli.loginToken = "dummy"
73+
log.Info("Skip login process for compatibile mode")
74+
return
75+
}
76+
7177
// Calculate wait time between 2 seconds and cfg.RetryWaitTime
7278
minWait := 2 * time.Second
7379
waitDuration := time.Duration(cliMgr.cfg.RetryWaitTime) * time.Second
@@ -77,18 +83,31 @@ func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) ti
7783

7884
for {
7985
log.Info("attempting login to upstream coordinator", "name", cliMgr.name)
80-
loginResult, err := loginCli.Login(ctx, cliMgr.genLoginParam)
81-
if err == nil && loginResult != nil {
82-
log.Info("login to upstream coordinator successful", "name", cliMgr.name, "time", loginResult.Time)
83-
return loginResult.Time
86+
loginResp, err := loginCli.Login(ctx, cliMgr.genLoginParam)
87+
if err == nil && loginResp.ErrCode == 0 {
88+
var loginResult loginSchema
89+
err = loginResp.DecodeData(&loginResult)
90+
if err != nil {
91+
log.Error("login parsing data fail", "error", err)
92+
} else {
93+
loginCli.loginToken = loginResult.Token
94+
log.Info("login to upstream coordinator successful", "name", cliMgr.name, "time", loginResult.Time)
95+
// TODO: we need to parse time if we start making use of it
96+
return
97+
}
98+
} else if err != nil {
99+
log.Error("login process fail", "error", err)
100+
} else {
101+
log.Error("login get fail resp", "code", loginResp.ErrCode, "msg", loginResp.ErrMsg)
84102
}
103+
85104
log.Info("login to upstream coordinator failed, retrying", "name", cliMgr.name, "error", err, "waitDuration", waitDuration)
86105

87106
timer := time.NewTimer(waitDuration)
88107
select {
89108
case <-ctx.Done():
90109
timer.Stop()
91-
return time.Now()
110+
return
92111
case <-timer.C:
93112
// Continue to next retry
94113
}

coordinator/internal/controller/proxy/get_task.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,16 @@ func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
108108
session := ptc.proverMgr.Get(publicKey)
109109

110110
getTask := func(upStream string, cli Client) (tryNext bool) {
111+
log.Debug("Start get task", "up", upStream, "cli", session.CliName)
111112
resp, err := session.GetTask(ctx, &getTaskParameter, cli, upStream)
112113
if err != nil {
114+
log.Error("Upstream error for get task", "error", err, "up", upStream, "cli", session.CliName)
113115
types.RenderFailure(ctx, types.ErrCoordinatorGetTaskFailure, err)
114116
return
115117
} else if resp.ErrCode != types.ErrCoordinatorEmptyProofData {
116118

117119
if resp.ErrCode != 0 {
120+
log.Error("Upstream has error resp for get task", "code", resp.ErrCode, "msg", resp.ErrMsg, "up", upStream, "cli", session.CliName)
118121
// simply dispatch the error from upstream to prover
119122
types.RenderFailure(ctx, resp.ErrCode, fmt.Errorf("%s", resp.ErrMsg))
120123
return
@@ -124,9 +127,10 @@ func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
124127
if err = resp.DecodeData(&task); err == nil {
125128
task.TaskID = formUpstreamWithTaskName(upStream, task.TaskID)
126129
ptc.priorityUpstream.Set(publicKey, upStream)
127-
// TODO: log the new id in debug level
130+
log.Debug("Upstream get task", "up", upStream, "cli", session.CliName, "taskID", task.TaskID, "taskType", task.TaskType)
128131
types.RenderSuccess(ctx, &task)
129132
} else {
133+
log.Error("Upstream has wrong data for get task", "error", err, "up", upStream, "cli", session.CliName)
130134
types.RenderFailure(ctx, types.InternalServerError, fmt.Errorf("decode task fail: %v", err))
131135
}
132136

@@ -140,10 +144,11 @@ func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
140144
priorityUpstream, exist := ptc.priorityUpstream.Get(publicKey)
141145
if exist {
142146
cli := ptc.clients[priorityUpstream]
147+
log.Debug("Try get task from priority stream", "up", priorityUpstream)
143148
if cli != nil && !getTask(priorityUpstream, cli) {
144149
return
145150
} else if cli == nil {
146-
// TODO: log error
151+
log.Warn("A upstream is removed or lost for some reason while running", "up", priorityUpstream)
147152
}
148153
}
149154
ptc.priorityUpstream.Delete(publicKey)
@@ -166,6 +171,7 @@ func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
166171
}
167172
}
168173

174+
log.Debug("get no task from upstream", "cli", session.CliName)
169175
// if all get task failed, throw empty proof resp
170176
types.RenderFailure(ctx, types.ErrCoordinatorEmptyProofData, fmt.Errorf("get empty prover task"))
171177
}

coordinator/internal/controller/proxy/prover_session.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/scroll-tech/go-ethereum/log"
11+
1012
ctypes "scroll-tech/common/types"
1113
"scroll-tech/coordinator/internal/types"
1214
)
@@ -40,6 +42,7 @@ func (m *ProverManager) GetOrCreate(userKey string) *proverSession {
4042

4143
ret := &proverSession{
4244
proverToken: make(map[string]loginToken),
45+
CliName: "pending for login",
4346
}
4447

4548
m.data[userKey] = ret
@@ -60,6 +63,8 @@ type loginToken struct {
6063

6164
// Client wraps an http client with a preset host for coordinator API calls
6265
type proverSession struct {
66+
CliName string
67+
6368
sync.RWMutex
6469
proverToken map[string]loginToken
6570
completionCtx context.Context
@@ -97,11 +102,18 @@ func (c *proverSession) maintainLogin(ctx context.Context, cliMgr Client, up str
97102
LoginSchema: result,
98103
phase: curPhase + 1,
99104
}
105+
log.Info("maintain login status", "upstream", up, "cli", param.Message.ProverName, "phase", curPhase+1)
100106
}
101107
c.Unlock()
108+
if nerr != nil {
109+
log.Error("maintain login fail", "error", nerr, "upstream", up, "cli", param.Message.ProverName, "phase", curPhase)
110+
}
102111
}()
103112
c.Unlock()
104113

114+
log.Debug("start proxy login process", "upstream", up, "cli", param.Message.ProverName)
115+
c.CliName = param.Message.ProverName
116+
105117
cli := cliMgr.Client(ctx)
106118
if cli == nil {
107119
return nil, fmt.Errorf("get upstream cli fail")

coordinator/internal/controller/proxy/submit_proof.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/gin-gonic/gin"
88
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/scroll-tech/go-ethereum/log"
910

1011
"scroll-tech/common/types"
1112
"scroll-tech/coordinator/internal/config"
@@ -59,22 +60,26 @@ func (spc *SubmitProofController) SubmitProof(ctx *gin.Context) {
5960
upstream, realTaskID := upstreamFromTaskName(submitParameter.TaskID)
6061
cli, existed := spc.clients[upstream]
6162
if !existed {
62-
// TODO: log error
63+
log.Warn("A upstream for submitting is removed or lost for some reason while running", "up", upstream)
6364
nerr := fmt.Errorf("Invalid upstream name (%s) from taskID %s", upstream, submitParameter.TaskID)
6465
types.RenderFailure(ctx, types.ErrCoordinatorParameterInvalidNo, nerr)
6566
return
6667
}
68+
log.Debug("Start submitting", "up", upstream, "cli", session.CliName, "id", realTaskID, "status", submitParameter.Status)
6769
submitParameter.TaskID = realTaskID
6870

6971
resp, err := session.SubmitProof(ctx, &submitParameter, cli, upstream)
7072
if err != nil {
73+
log.Error("Upstream has error resp for submit", "code", resp.ErrCode, "msg", resp.ErrMsg, "up", upstream, "cli", session.CliName)
7174
types.RenderFailure(ctx, types.ErrCoordinatorGetTaskFailure, err)
7275
return
7376
} else if resp.ErrCode != 0 {
77+
log.Error("Upstream has error resp for get task", "code", resp.ErrCode, "msg", resp.ErrMsg, "up", upstream, "cli", session.CliName)
7478
// simply dispatch the error from upstream to prover
7579
types.RenderFailure(ctx, resp.ErrCode, fmt.Errorf("%s", resp.ErrMsg))
7680
return
7781
} else {
82+
log.Debug("Submit proof to upstream", "up", upstream, "cli", session.CliName, "taskID", realTaskID)
7883
spc.priorityUpstream.Delete(upstream)
7984
types.RenderSuccess(ctx, resp.Data)
8085
return

coordinator/internal/logic/submitproof/proof_receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofParameter coor
216216
switch message.ProofType(proofParameter.TaskType) {
217217
case message.ProofTypeChunk:
218218
chunkProof := &message.OpenVMChunkProof{}
219-
if unmarshalErr := json.Unmarshal([]byte(proofParameter.Proof), &chunkProof); unmarshalErr != nil {
219+
if unmarshalErr := json.Unmarshal([]byte(proofParameter.Proof), &chunkProof); unmarshalErr == nil {
220220
return unmarshalErr
221221
}
222222
success, verifyErr = m.verifier.VerifyChunkProof(chunkProof, hardForkName)

coordinator/test/proxy_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ func testProxyClientCfg() *config.ProxyClient {
3131
}
3232
}
3333

34+
var testCompatibileMode bool
35+
3436
func testProxyUpStreamCfg(coordinatorURL string) *config.UpStream {
3537

3638
return &config.UpStream{
3739
BaseUrl: fmt.Sprintf("http://%s", coordinatorURL),
3840
RetryWaitTime: 3,
3941
ConnectionTimeoutSec: 30,
42+
CompatibileMode: testCompatibileMode,
4043
}
4144

4245
}
@@ -263,7 +266,17 @@ func testProxyProof(t *testing.T) {
263266
}
264267

265268
func TestProxyClient(t *testing.T) {
269+
testCompatibileMode = false
270+
// Set up the test environment.
271+
setEnv(t)
272+
t.Run("TestProxyClient", testProxyClient)
273+
t.Run("TestProxyHandshake", testProxyHandshake)
274+
t.Run("TestProxyGetTask", testProxyGetTask)
275+
t.Run("TestProxyValidProof", testProxyProof)
276+
}
266277

278+
func TestProxyClientCompatibleMode(t *testing.T) {
279+
testCompatibileMode = true
267280
// Set up the test environment.
268281
setEnv(t)
269282
t.Run("TestProxyClient", testProxyClient)

0 commit comments

Comments
 (0)