Skip to content

Commit 30f6803

Browse files
committed
coordinator support universal get_task
1 parent d32754e commit 30f6803

File tree

13 files changed

+194
-5
lines changed

13 files changed

+194
-5
lines changed

coordinator/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ clean_libzkp:
2828

2929
libzkp: clean_libzkp internal/libzkp/lib/libzkp.so
3030

31-
coordinator_api: libzkp ## Builds the Coordinator api instance.
31+
coordinator_api: internal/libzkp/lib/libzkp.so ## Builds the Coordinator api instance.
3232
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator_api ./cmd/api
3333

3434
coordinator_cron:

coordinator/internal/config/config.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@ type ProverManager struct {
2828
BundleCollectionTimeSec int `json:"bundle_collection_time_sec"`
2929
}
3030

31+
// l2geth client configuration items
32+
type L2Endpoint struct {
33+
Url string `json:"endpoint"`
34+
}
35+
3136
// L2 loads l2geth configuration items.
3237
type L2 struct {
3338
// l2geth chain_id.
34-
ChainID uint64 `json:"chain_id"`
39+
ChainID uint64 `json:"chain_id"`
40+
Endpoint *L2Endpoint `json:"l2geth"`
3541
}
3642

3743
// Auth provides the auth coordinator

coordinator/internal/controller/api/controller.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package api
22

33
import (
4+
"encoding/json"
5+
46
"github.com/prometheus/client_golang/prometheus"
57
"github.com/scroll-tech/go-ethereum/log"
68
"github.com/scroll-tech/go-ethereum/params"
79
"gorm.io/gorm"
810

911
"scroll-tech/coordinator/internal/config"
12+
"scroll-tech/coordinator/internal/libzkp"
1013
"scroll-tech/coordinator/internal/logic/verifier"
1114
)
1215

@@ -28,7 +31,17 @@ func InitController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.D
2831

2932
log.Info("verifier created", "openVmVerifier", vf.OpenVMVkMap)
3033

34+
l2cfg := cfg.L2.Endpoint
35+
if l2cfg == nil {
36+
panic("l2geth is not specified")
37+
}
38+
l2cfgBytes, err := json.Marshal(l2cfg)
39+
if err != nil {
40+
panic(err)
41+
}
42+
libzkp.InitL2geth(string(l2cfgBytes))
43+
3144
Auth = NewAuthController(db, cfg, vf)
3245
GetTask = NewGetTaskController(cfg, chainCfg, db, reg)
33-
SubmitProof = NewSubmitProofController(cfg, chainCfg, db, vf, reg)
46+
SubmitProof = NewSubmitProofController(cfg, chainCfg, db, vf, reg, GetTask)
3447
}

coordinator/internal/controller/api/get_task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func NewGetTaskController(cfg *config.Config, chainCfg *params.ChainConfig, db *
4747
return ptc
4848
}
4949

50+
func (ptc *GetTaskController) ProverTasks() map[message.ProofType]provertask.ProverTask {
51+
return ptc.proverTasks
52+
}
53+
5054
func (ptc *GetTaskController) incGetTaskAccessCounter(ctx *gin.Context) error {
5155
publicKey, publicKeyExist := ctx.Get(coordinatorType.PublicKey)
5256
if !publicKeyExist {

coordinator/internal/controller/api/submit_proof.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"gorm.io/gorm"
1010

1111
"scroll-tech/common/types"
12+
"scroll-tech/common/types/message"
1213

1314
"scroll-tech/coordinator/internal/config"
1415
"scroll-tech/coordinator/internal/logic/submitproof"
@@ -22,10 +23,15 @@ type SubmitProofController struct {
2223
}
2324

2425
// NewSubmitProofController create the submit proof api controller instance
25-
func NewSubmitProofController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.DB, vf *verifier.Verifier, reg prometheus.Registerer) *SubmitProofController {
26-
return &SubmitProofController{
26+
func NewSubmitProofController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.DB, vf *verifier.Verifier, reg prometheus.Registerer, getTaskController *GetTaskController) *SubmitProofController {
27+
controller := SubmitProofController{
2728
submitProofReceiverLogic: submitproof.NewSubmitProofReceiverLogic(cfg.ProverManager, chainCfg, db, vf, reg),
2829
}
30+
proverTaskIf := getTaskController.ProverTasks()
31+
controller.submitProofReceiverLogic.ChunkTask = proverTaskIf[message.ProofTypeChunk]
32+
controller.submitProofReceiverLogic.BatchTask = proverTaskIf[message.ProofTypeBatch]
33+
controller.submitProofReceiverLogic.BundleTask = proverTaskIf[message.ProofTypeBundle]
34+
return &controller
2935
}
3036

3137
// SubmitProof prover submit the proof to coordinator

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func NewBatchProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
4747
batchOrm: orm.NewBatch(db),
4848
proverTaskOrm: orm.NewProverTask(db),
4949
proverBlockListOrm: orm.NewProverBlockList(db),
50+
taskCache: newCache(128),
5051
},
5152
batchTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
5253
Name: "coordinator_batch_get_task_total",
@@ -174,6 +175,14 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
174175
log.Error("format prover task failure", "task_id", batchTask.Hash, "err", err)
175176
return nil, ErrCoordinatorInternalFailure
176177
}
178+
if getTaskParameter.Universal {
179+
taskMsg, err = bp.applyUniversal(taskMsg)
180+
if err != nil {
181+
bp.recoverActiveAttempts(ctx, batchTask)
182+
log.Error("Generate universal prover task failure", "task_id", batchTask.Hash, "type", "batch")
183+
return nil, ErrCoordinatorInternalFailure
184+
}
185+
}
177186

178187
bp.batchTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
179188
bp.batchTaskGetTaskProver.With(prometheus.Labels{
@@ -185,6 +194,14 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
185194
return taskMsg, nil
186195
}
187196

197+
func (bp *BatchProverTask) GetTaskMetaData(taskID string) (string, error) {
198+
if cached := bp.taskCache.Query(taskID); cached != nil {
199+
return cached.MetaData, nil
200+
}
201+
202+
return "", fmt.Errorf("can not re-acquire the metadata for specified task, see coordinator log for the reason")
203+
}
204+
188205
func (bp *BatchProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, batch *orm.Batch, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
189206
// get chunk from db
190207
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func NewBundleProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *g
4545
bundleOrm: orm.NewBundle(db),
4646
proverTaskOrm: orm.NewProverTask(db),
4747
proverBlockListOrm: orm.NewProverBlockList(db),
48+
taskCache: newCache(128),
4849
},
4950
bundleTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
5051
Name: "coordinator_bundle_get_task_total",
@@ -172,6 +173,14 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
172173
log.Error("format bundle prover task failure", "task_id", bundleTask.Hash, "err", err)
173174
return nil, ErrCoordinatorInternalFailure
174175
}
176+
if getTaskParameter.Universal {
177+
taskMsg, err = bp.applyUniversal(taskMsg)
178+
if err != nil {
179+
bp.recoverActiveAttempts(ctx, bundleTask)
180+
log.Error("Generate universal prover task failure", "task_id", bundleTask.Hash, "type", "bundle")
181+
return nil, ErrCoordinatorInternalFailure
182+
}
183+
}
175184

176185
bp.bundleTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
177186
bp.bundleTaskGetTaskProver.With(prometheus.Labels{
@@ -183,6 +192,14 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
183192
return taskMsg, nil
184193
}
185194

195+
func (bp *BundleProverTask) GetTaskMetaData(taskID string) (string, error) {
196+
if cached := bp.taskCache.Query(taskID); cached != nil {
197+
return cached.MetaData, nil
198+
}
199+
200+
return "", fmt.Errorf("can not re-acquire the metadata for specified task, see coordinator log for the reason")
201+
}
202+
186203
func (bp *BundleProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
187204
// get bundle from db
188205
batches, err := bp.batchOrm.GetBatchesByBundleHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func NewChunkProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
4343
blockOrm: orm.NewL2Block(db),
4444
proverTaskOrm: orm.NewProverTask(db),
4545
proverBlockListOrm: orm.NewProverBlockList(db),
46+
taskCache: newCache(1024),
4647
},
4748
chunkTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
4849
Name: "coordinator_chunk_get_task_total",
@@ -169,6 +170,14 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
169170
log.Error("format prover task failure", "task_id", chunkTask.Hash, "err", err)
170171
return nil, ErrCoordinatorInternalFailure
171172
}
173+
if getTaskParameter.Universal {
174+
taskMsg, err = cp.applyUniversal(taskMsg)
175+
if err != nil {
176+
cp.recoverActiveAttempts(ctx, chunkTask)
177+
log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk")
178+
return nil, ErrCoordinatorInternalFailure
179+
}
180+
}
172181

173182
cp.chunkTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
174183
cp.chunkTaskGetTaskProver.With(prometheus.Labels{
@@ -180,6 +189,14 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
180189
return taskMsg, nil
181190
}
182191

192+
func (cp *ChunkProverTask) GetTaskMetaData(taskID string) (string, error) {
193+
if cached := cp.taskCache.Query(taskID); cached != nil {
194+
return cached.MetaData, nil
195+
}
196+
197+
return "", fmt.Errorf("can not re-acquire the metadata for specified task, see coordinator log for the reason")
198+
}
199+
183200
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, chunk *orm.Chunk, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
184201
// Get block hashes.
185202
blockHashes, dbErr := cp.blockOrm.GetL2BlockHashesByChunkHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/prover_task.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"scroll-tech/common/types/message"
1717

1818
"scroll-tech/coordinator/internal/config"
19+
"scroll-tech/coordinator/internal/libzkp"
1920
"scroll-tech/coordinator/internal/orm"
2021
coordinatorType "scroll-tech/coordinator/internal/types"
2122
)
@@ -33,6 +34,7 @@ var (
3334
// ProverTask the interface of a collector who send data to prover
3435
type ProverTask interface {
3536
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
37+
GetTaskMetaData(taskID string) (string, error)
3638
}
3739

3840
// BaseProverTask a base prover task which contain series functions
@@ -47,6 +49,8 @@ type BaseProverTask struct {
4749
blockOrm *orm.L2Block
4850
proverTaskOrm *orm.ProverTask
4951
proverBlockListOrm *orm.ProverBlockList
52+
53+
taskCache *TaskCache
5054
}
5155

5256
type proverTaskContext struct {
@@ -185,6 +189,26 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e
185189
return &ptc, nil
186190
}
187191

192+
func (b *BaseProverTask) applyUniversal(schema *coordinatorType.GetTaskSchema) (*coordinatorType.GetTaskSchema, error) {
193+
if cached := b.taskCache.Query(schema.TaskID); cached != nil {
194+
schema.TaskData = cached.UTaskData
195+
return schema, nil
196+
}
197+
198+
ok, metadata, uTaskData, _ := libzkp.GenerateUniversalTask(schema.TaskType, schema.TaskData, schema.HardForkName)
199+
if !ok {
200+
return nil, fmt.Errorf("can not generate universal task, see coordinator log for the reason")
201+
}
202+
203+
cacheData := CachedTaskData{
204+
MetaData: metadata,
205+
UTaskData: uTaskData,
206+
}
207+
b.taskCache.Add(schema.TaskID, &cacheData)
208+
schema.TaskData = uTaskData
209+
return schema, nil
210+
}
211+
188212
func newGetTaskCounterVec(factory promauto.Factory, taskType string) *prometheus.CounterVec {
189213
getTaskCounterInitOnce.Do(func() {
190214
getTaskCounterVec = factory.NewCounterVec(prometheus.CounterOpts{
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package provertask
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type CachedTaskData struct {
8+
MetaData string
9+
UTaskData string
10+
}
11+
12+
// very simple size limited lru caching for task
13+
type TaskCache struct {
14+
sync.RWMutex
15+
cache map[string]*CachedTaskData
16+
dropping_cache map[string]*CachedTaskData
17+
limit int
18+
}
19+
20+
func newCache(limit int) *TaskCache {
21+
t := TaskCache{
22+
cache: make(map[string]*CachedTaskData),
23+
dropping_cache: make(map[string]*CachedTaskData),
24+
limit: limit,
25+
}
26+
27+
return &t
28+
}
29+
30+
func (c *TaskCache) Add(k string, itm *CachedTaskData) {
31+
c.Lock()
32+
defer c.Unlock()
33+
34+
if len(c.cache) >= c.limit {
35+
c.dropping_cache = c.cache
36+
c.cache = make(map[string]*CachedTaskData)
37+
}
38+
c.cache[k] = itm
39+
}
40+
41+
func (c *TaskCache) Query(key string) *CachedTaskData {
42+
c.RLock()
43+
if v, ok := c.cache[key]; ok {
44+
c.RUnlock()
45+
return v
46+
}
47+
48+
if v, ok := c.dropping_cache[key]; ok {
49+
c.RUnlock()
50+
c.Add(key, v)
51+
return v
52+
}
53+
54+
c.RUnlock()
55+
return nil
56+
}

0 commit comments

Comments
 (0)