Skip to content

Commit d20e677

Browse files
authored
feat: TipSetWorker accepts all storage systems (#1035)
- closes #1029
1 parent 45f15be commit d20e677

File tree

4 files changed

+55
-25
lines changed

4 files changed

+55
-25
lines changed

chain/indexer/distributed/queue/tasks/gapfill.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,24 @@ func NewGapFillTipSetTask(ctx context.Context, ts *types.TipSet, tasks []string)
5959
return asynq.NewTask(TypeGapFillTipSet, payload), nil
6060
}
6161

62-
type AsynqGapFillTipSetTaskHandler struct {
62+
func NewGapFillHandler(indexer indexer.Indexer, db *storage.Database) *GapFillTipSetHandler {
63+
return &GapFillTipSetHandler{indexer: indexer, db: db}
64+
}
65+
66+
type GapFillTipSetHandler struct {
6367
indexer indexer.Indexer
6468
db *storage.Database
6569
}
6670

67-
func NewGapFillHandler(indexer indexer.Indexer, db *storage.Database) *AsynqGapFillTipSetTaskHandler {
68-
return &AsynqGapFillTipSetTaskHandler{indexer: indexer, db: db}
71+
func (gh *GapFillTipSetHandler) Handler() asynq.HandlerFunc {
72+
return gh.HandleGapFillTipSetTask
73+
}
74+
75+
func (gh *GapFillTipSetHandler) Type() string {
76+
return TypeGapFillTipSet
6977
}
7078

71-
func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Context, t *asynq.Task) error {
79+
func (gh *GapFillTipSetHandler) HandleGapFillTipSetTask(ctx context.Context, t *asynq.Task) error {
7280
var p GapFillTipSetPayload
7381
if err := json.Unmarshal(t.Payload(), &p); err != nil {
7482
return err

chain/indexer/distributed/queue/tasks/index.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,23 @@ func NewIndexTipSetTask(ctx context.Context, ts *types.TipSet, tasks []string) (
6161
return asynq.NewTask(TypeIndexTipSet, payload), nil
6262
}
6363

64-
type AsynqTipSetTaskHandler struct {
64+
func NewIndexHandler(i indexer.Indexer) *TipSetTaskHandler {
65+
return &TipSetTaskHandler{indexer: i}
66+
}
67+
68+
type TipSetTaskHandler struct {
6569
indexer indexer.Indexer
6670
}
6771

68-
func NewIndexHandler(i indexer.Indexer) *AsynqTipSetTaskHandler {
69-
return &AsynqTipSetTaskHandler{indexer: i}
72+
func (ih *TipSetTaskHandler) Type() string {
73+
return TypeIndexTipSet
74+
}
75+
76+
func (ih *TipSetTaskHandler) Handler() asynq.HandlerFunc {
77+
return ih.HandleIndexTipSetTask
7078
}
7179

72-
func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t *asynq.Task) error {
80+
func (ih *TipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t *asynq.Task) error {
7381
var p IndexTipSetPayload
7482
if err := json.Unmarshal(t.Payload(), &p); err != nil {
7583
return err

chain/indexer/distributed/queue/worker.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,30 @@ import (
1111
"go.opentelemetry.io/otel/trace"
1212
"go.uber.org/zap"
1313

14-
"github.com/filecoin-project/lily/chain/indexer"
1514
"github.com/filecoin-project/lily/chain/indexer/distributed"
1615
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
1716
"github.com/filecoin-project/lily/metrics"
18-
"github.com/filecoin-project/lily/storage"
1917
)
2018

2119
var log = logging.Logger("lily/distributed/worker")
2220

2321
type AsynqWorker struct {
2422
done chan struct{}
2523

26-
name string
27-
server *distributed.TipSetWorker
28-
index indexer.Indexer
29-
db *storage.Database
24+
name string
25+
server *distributed.TipSetWorker
26+
handlers []TaskHandler
27+
}
28+
type TaskHandler interface {
29+
Type() string
30+
Handler() asynq.HandlerFunc
3031
}
3132

32-
func NewAsynqWorker(name string, i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
33+
func NewAsynqWorker(name string, server *distributed.TipSetWorker, handlers ...TaskHandler) *AsynqWorker {
3334
return &AsynqWorker{
34-
name: name,
35-
server: server,
36-
index: i,
37-
db: db,
35+
name: name,
36+
server: server,
37+
handlers: handlers,
3838
}
3939
}
4040

@@ -43,8 +43,10 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
4343
defer close(t.done)
4444

4545
mux := asynq.NewServeMux()
46-
mux.HandleFunc(tasks.TypeIndexTipSet, tasks.NewIndexHandler(t.index).HandleIndexTipSetTask)
47-
mux.HandleFunc(tasks.TypeGapFillTipSet, tasks.NewGapFillHandler(t.index, t.db).HandleGapFillTipSetTask)
46+
for _, handler := range t.handlers {
47+
log.Infow("registered task handler", "type", handler.Type())
48+
mux.HandleFunc(handler.Type(), handler.Handler())
49+
}
4850

4951
t.server.ServerConfig.Logger = log.With("name", t.name)
5052
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}

lens/lily/impl.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/filecoin-project/lily/chain/indexer"
2626
"github.com/filecoin-project/lily/chain/indexer/distributed"
2727
"github.com/filecoin-project/lily/chain/indexer/distributed/queue"
28+
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
2829
"github.com/filecoin-project/lily/chain/indexer/integrated"
2930
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
3031
"github.com/filecoin-project/lily/chain/walk"
@@ -102,9 +103,20 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
102103
return nil, err
103104
}
104105

105-
db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.JobConfig.Storage, md)
106-
if err != nil {
107-
return nil, err
106+
handlers := []queue.TaskHandler{tasks.NewIndexHandler(im)}
107+
// check if queue config contains configuration for gap fill tasks and if it expects the tasks to be processed. This
108+
// is specified by giving the Fill queue a priority greater than 1.
109+
priority, ok := worker.ServerConfig.Queues[indexer.Fill.String()]
110+
if ok {
111+
if priority > 0 {
112+
// if gap fill tasks have a priority storage must be a database.
113+
db, ok := strg.(*storage.Database)
114+
if !ok {
115+
return nil, fmt.Errorf("storage type (%T) is unsupported when %s queue is enable", strg, indexer.Fill.String())
116+
}
117+
// add gap fill handler to set of worker handlers.
118+
handlers = append(handlers, tasks.NewGapFillHandler(im, db))
119+
}
108120
}
109121

110122
res := m.Scheduler.Submit(&schedule.JobConfig{
@@ -114,7 +126,7 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
114126
"queue": cfg.Queue,
115127
"storage": cfg.JobConfig.Storage,
116128
},
117-
Job: queue.NewAsynqWorker(cfg.JobConfig.Name, im, db, worker),
129+
Job: queue.NewAsynqWorker(cfg.JobConfig.Name, worker, handlers...),
118130
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
119131
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
120132
RestartDelay: cfg.JobConfig.RestartDelay,

0 commit comments

Comments
 (0)