Skip to content

Commit 0042a9c

Browse files
committed
Quick stab at making watchtower tasks generic
1 parent 445465f commit 0042a9c

File tree

3 files changed

+138
-63
lines changed

3 files changed

+138
-63
lines changed

rocketpool-daemon/task/task.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package task
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log/slog"
7+
"sync"
8+
9+
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/state"
10+
)
11+
12+
var (
13+
// ErrAlreadyRunning is returned when a background task is kicked off, but it is already in progress.
14+
ErrAlreadyRunning = errors.New("task is already running")
15+
)
16+
17+
// TaskContext is passed to the Task's Callback function when the invoker wishes the task
18+
// to be kicked off.
19+
//
20+
// Its fields are things that are variable and may change between invokations of a task.
21+
type BackgroundTaskContext struct {
22+
// A context provided by the invoker of this task.
23+
// May be nil, and cancellations should be respected.
24+
Ctx context.Context
25+
// Whether or not the node is on the oDAO at the time the task was invoked
26+
IsOnOdao bool
27+
// A recent network state so each task need not query it redundantly
28+
State *state.NetworkState
29+
}
30+
31+
type BackgroundTask interface {
32+
// Returns a function to call that starts the task in the background
33+
Run(*BackgroundTaskContext) error
34+
// A function that tasks must call when all async portions are completed
35+
Done()
36+
}
37+
38+
type LockingBackgroundTask struct {
39+
logger *slog.Logger
40+
description string
41+
run func(*BackgroundTaskContext) error
42+
43+
lock sync.Mutex
44+
isRunning bool
45+
}
46+
47+
func NewLockingBackgroundTask(logger *slog.Logger, description string, f func(*BackgroundTaskContext) error) *LockingBackgroundTask {
48+
return &LockingBackgroundTask{
49+
description: description,
50+
logger: logger,
51+
run: f,
52+
}
53+
}
54+
55+
func (lbt *LockingBackgroundTask) Run(taskContext *BackgroundTaskContext) error {
56+
lbt.lock.Lock()
57+
defer lbt.lock.Unlock()
58+
59+
lbt.logger.Info("Starting task", "description", lbt.description)
60+
if lbt.isRunning {
61+
lbt.logger.Info("Task is already running", "description", lbt.description)
62+
return ErrAlreadyRunning
63+
}
64+
65+
lbt.isRunning = true
66+
err := lbt.run(taskContext)
67+
if err != nil {
68+
lbt.Done()
69+
}
70+
return err
71+
}
72+
73+
func (lbt *LockingBackgroundTask) Done() {
74+
lbt.lock.Lock()
75+
defer lbt.lock.Unlock()
76+
lbt.isRunning = false
77+
}

rocketpool-daemon/watchtower/generate-rewards-tree.go

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"path/filepath"
99
"strconv"
1010
"strings"
11-
"sync"
1211
"time"
1312

1413
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -24,52 +23,43 @@ import (
2423
rprewards "github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/rewards"
2524
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/services"
2625
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/state"
26+
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/task"
2727
"github.com/rocket-pool/smartnode/v2/shared/config"
2828
"github.com/rocket-pool/smartnode/v2/shared/keys"
2929
sharedtypes "github.com/rocket-pool/smartnode/v2/shared/types"
3030
)
3131

3232
// Generate rewards Merkle Tree task
3333
type GenerateRewardsTree struct {
34-
ctx context.Context
35-
sp *services.ServiceProvider
36-
logger *slog.Logger
37-
cfg *config.SmartNodeConfig
38-
rp *rocketpool.RocketPool
39-
ec eth.IExecutionClient
40-
bc beacon.IBeaconClient
41-
lock *sync.Mutex
42-
isRunning bool
34+
*task.LockingBackgroundTask
35+
sp *services.ServiceProvider
36+
logger *slog.Logger
37+
cfg *config.SmartNodeConfig
38+
rp *rocketpool.RocketPool
39+
ec eth.IExecutionClient
40+
bc beacon.IBeaconClient
4341
}
4442

4543
// Create generate rewards Merkle Tree task
4644
func NewGenerateRewardsTree(ctx context.Context, sp *services.ServiceProvider, logger *log.Logger) *GenerateRewardsTree {
47-
lock := &sync.Mutex{}
48-
return &GenerateRewardsTree{
49-
ctx: ctx,
50-
sp: sp,
51-
logger: logger.With(slog.String(keys.RoutineKey, "Generate Rewards Tree")),
52-
cfg: sp.GetConfig(),
53-
rp: sp.GetRocketPool(),
54-
ec: sp.GetEthClient(),
55-
bc: sp.GetBeaconClient(),
56-
lock: lock,
57-
isRunning: false,
45+
out := &GenerateRewardsTree{
46+
sp: sp,
47+
logger: logger.With(slog.String(keys.RoutineKey, "Generate Rewards Tree")),
48+
cfg: sp.GetConfig(),
49+
rp: sp.GetRocketPool(),
50+
ec: sp.GetEthClient(),
51+
bc: sp.GetBeaconClient(),
5852
}
53+
out.LockingBackgroundTask = task.NewLockingBackgroundTask(
54+
logger.With(slog.String(keys.RoutineKey, "Generate Rewards Tree")),
55+
"manual rewards tree generation request check",
56+
out.Run,
57+
)
58+
return out
5959
}
6060

6161
// Check for generation requests
62-
func (t *GenerateRewardsTree) Run() error {
63-
t.logger.Info("Starting manual rewards tree generation request check.")
64-
65-
// Check if rewards generation is already running
66-
t.lock.Lock()
67-
if t.isRunning {
68-
t.logger.Info("Tree generation is already running.")
69-
t.lock.Unlock()
70-
return nil
71-
}
72-
t.lock.Unlock()
62+
func (t *GenerateRewardsTree) Run(taskCtx *task.BackgroundTaskContext) error {
7363

7464
// Check for requests
7565
requestDir := t.cfg.GetWatchtowerFolder()
@@ -102,10 +92,7 @@ func (t *GenerateRewardsTree) Run() error {
10292
}
10393

10494
// Generate the rewards tree
105-
t.lock.Lock()
106-
t.isRunning = true
107-
t.lock.Unlock()
108-
go t.generateRewardsTree(index)
95+
go t.generateRewardsTree(taskCtx.Ctx, index)
10996

11097
// Return after the first request, do others at other intervals
11198
return nil
@@ -115,7 +102,10 @@ func (t *GenerateRewardsTree) Run() error {
115102
return nil
116103
}
117104

118-
func (t *GenerateRewardsTree) generateRewardsTree(index uint64) {
105+
func (t *GenerateRewardsTree) generateRewardsTree(ctx context.Context, index uint64) {
106+
// This function is the async portion of the task, so make sure to signal completion
107+
defer t.LockingBackgroundTask.Done()
108+
119109
// Begin generation of the tree
120110
logger := t.logger.With(slog.Uint64(keys.IntervalKey, index))
121111
logger.Info("Starting generation of Merkle rewards tree.")
@@ -150,7 +140,7 @@ func (t *GenerateRewardsTree) generateRewardsTree(index uint64) {
150140
}, opts)
151141
if err == nil {
152142
// Create the state manager with using the primary or fallback (not necessarily archive) EC
153-
stateManager, err = state.NewNetworkStateManager(t.ctx, client, t.cfg, t.rp.Client, t.bc, logger)
143+
stateManager, err = state.NewNetworkStateManager(ctx, client, t.cfg, t.rp.Client, t.bc, logger)
154144
if err != nil {
155145
t.handleError(fmt.Errorf("error creating new NetworkStateManager with Archive EC: %w", err), logger)
156146
return
@@ -189,7 +179,7 @@ func (t *GenerateRewardsTree) generateRewardsTree(index uint64) {
189179
return
190180
}
191181
// Create the state manager with the archive EC
192-
stateManager, err = state.NewNetworkStateManager(t.ctx, client, t.cfg, ec, t.bc, logger)
182+
stateManager, err = state.NewNetworkStateManager(ctx, client, t.cfg, ec, t.bc, logger)
193183
if err != nil {
194184
t.handleError(fmt.Errorf("error creating new NetworkStateManager with Archive EC: %w", err), logger)
195185
return
@@ -210,26 +200,26 @@ func (t *GenerateRewardsTree) generateRewardsTree(index uint64) {
210200
}
211201

212202
// Get the state for the target slot
213-
state, err := stateManager.GetStateForSlot(t.ctx, rewardsEvent.ConsensusBlock.Uint64())
203+
state, err := stateManager.GetStateForSlot(ctx, rewardsEvent.ConsensusBlock.Uint64())
214204
if err != nil {
215205
t.handleError(fmt.Errorf("error getting state for beacon slot %d: %w", rewardsEvent.ConsensusBlock.Uint64(), err), logger)
216206
return
217207
}
218208

219209
// Generate the tree
220-
t.generateRewardsTreeImpl(logger, client, index, rewardsEvent, elBlockHeader, state)
210+
t.generateRewardsTreeImpl(ctx, logger, client, index, rewardsEvent, elBlockHeader, state)
221211
}
222212

223213
// Implementation for rewards tree generation using a viable EC
224-
func (t *GenerateRewardsTree) generateRewardsTreeImpl(logger *slog.Logger, rp *rocketpool.RocketPool, index uint64, rewardsEvent rewards.RewardsEvent, elBlockHeader *types.Header, state *state.NetworkState) {
214+
func (t *GenerateRewardsTree) generateRewardsTreeImpl(ctx context.Context, logger *slog.Logger, rp *rocketpool.RocketPool, index uint64, rewardsEvent rewards.RewardsEvent, elBlockHeader *types.Header, state *state.NetworkState) {
225215
// Generate the rewards file
226216
start := time.Now()
227217
treegen, err := rprewards.NewTreeGenerator(t.logger, rp, t.cfg, t.bc, index, rewardsEvent.IntervalStartTime, rewardsEvent.IntervalEndTime, rewardsEvent.ConsensusBlock.Uint64(), elBlockHeader, rewardsEvent.IntervalsPassed.Uint64(), state, nil)
228218
if err != nil {
229219
t.handleError(fmt.Errorf("Error creating Merkle tree generator: %w", err), logger)
230220
return
231221
}
232-
rewardsFile, err := treegen.GenerateTree(t.ctx)
222+
rewardsFile, err := treegen.GenerateTree(ctx)
233223
if err != nil {
234224
t.handleError(fmt.Errorf("%s Error generating Merkle tree: %w", err), logger)
235225
return
@@ -273,14 +263,8 @@ func (t *GenerateRewardsTree) generateRewardsTreeImpl(logger *slog.Logger, rp *r
273263
}
274264

275265
t.logger.Info("Merkle tree generation complete!")
276-
t.lock.Lock()
277-
t.isRunning = false
278-
t.lock.Unlock()
279266
}
280267

281268
func (t *GenerateRewardsTree) handleError(err error, logger *slog.Logger) {
282269
logger.Error("*** Rewards tree generation failed. ***", log.Err(err))
283-
t.lock.Lock()
284-
t.isRunning = false
285-
t.lock.Unlock()
286270
}

rocketpool-daemon/watchtower/watchtower.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/rocket-pool/rocketpool-go/v2/rocketpool"
1212
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/services"
1313
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/state"
14+
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/task"
1415
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/watchtower/collectors"
1516
"github.com/rocket-pool/smartnode/v2/shared/config"
1617
)
@@ -32,8 +33,10 @@ type TaskManager struct {
3233
rp *rocketpool.RocketPool
3334
bc beacon.IBeaconClient
3435

36+
// Generic Tasks to run
37+
tasks []task.BackgroundTask
38+
3539
// Tasks
36-
generateRewardsTree *GenerateRewardsTree
3740
respondChallenges *RespondChallenges
3841
submitRplPrice *SubmitRplPrice
3942
submitNetworkBalances *SubmitNetworkBalances
@@ -84,7 +87,6 @@ func NewTaskManager(
8487
}
8588

8689
// Initialize tasks
87-
generateRewardsTree := NewGenerateRewardsTree(ctx, sp, logger)
8890
respondChallenges := NewRespondChallenges(sp, logger, stateMgr)
8991
submitRplPrice := NewSubmitRplPrice(ctx, sp, logger)
9092
submitNetworkBalances := NewSubmitNetworkBalances(ctx, sp, logger)
@@ -101,13 +103,15 @@ func NewTaskManager(
101103
finalizePdaoProposals := NewFinalizePdaoProposals(sp, logger)
102104

103105
return &TaskManager{
104-
sp: sp,
105-
logger: logger,
106-
ctx: ctx,
107-
cfg: cfg,
108-
rp: rp,
109-
bc: bc,
110-
generateRewardsTree: generateRewardsTree,
106+
sp: sp,
107+
logger: logger,
108+
ctx: ctx,
109+
cfg: cfg,
110+
rp: rp,
111+
bc: bc,
112+
tasks: []task.BackgroundTask{
113+
NewGenerateRewardsTree(ctx, sp, logger).LockingBackgroundTask,
114+
},
111115
respondChallenges: respondChallenges,
112116
submitRplPrice: submitRplPrice,
113117
submitNetworkBalances: submitNetworkBalances,
@@ -146,12 +150,22 @@ func (t *TaskManager) Initialize(stateMgr *state.NetworkStateManager) error {
146150

147151
// Run the task loop
148152
func (t *TaskManager) Run(isOnOdao bool, state *state.NetworkState) error {
149-
// Run the manual rewards tree generation
150-
if err := t.generateRewardsTree.Run(); err != nil {
151-
t.logger.Error(err.Error())
153+
taskCtx := &task.BackgroundTaskContext{
154+
// TODO: having a single global context stemming from
155+
// context.Background is basically the same as passing around nil,
156+
// and we should remove ctx from t and add it to Run()
157+
Ctx: t.ctx,
158+
IsOnOdao: isOnOdao,
159+
State: state,
152160
}
153-
if utils.SleepWithCancel(t.ctx, taskCooldown) {
154-
return nil
161+
// Run the generic tasks
162+
for _, taskItem := range t.tasks {
163+
if err := taskItem.Run(taskCtx); err != nil && err != task.ErrAlreadyRunning {
164+
t.logger.Error(err.Error())
165+
}
166+
if utils.SleepWithCancel(t.ctx, taskCooldown) {
167+
return nil
168+
}
155169
}
156170

157171
if isOnOdao {

0 commit comments

Comments
 (0)