Skip to content

Commit b79e255

Browse files
authored
Merge branch 'main' into full-replication
Signed-off-by: Sam Liokumovich <65994425+samliok@users.noreply.github.com>
2 parents f1a1d3e + 7b64aa1 commit b79e255

File tree

8 files changed

+976
-404
lines changed

8 files changed

+976
-404
lines changed

block_scheduler.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package simplex
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"maps"
10+
"slices"
11+
"sync"
12+
"sync/atomic"
13+
14+
"go.uber.org/zap"
15+
)
16+
17+
var ErrTooManyPendingVerifications = errors.New("too many blocks being verified to ingest another one")
18+
19+
type Scheduler interface {
20+
Schedule(task Task)
21+
Size() int
22+
Close()
23+
}
24+
25+
// BlockDependencyManager manages block verification tasks with dependencies on previous blocks and empty rounds.
26+
// It schedules tasks when their dependencies are resolved.
27+
type BlockDependencyManager struct {
28+
lock sync.Mutex
29+
logger Logger
30+
scheduler Scheduler
31+
32+
dependencies []*TaskWithDependents
33+
maxDeps uint64
34+
closed atomic.Bool
35+
}
36+
37+
type TaskWithDependents struct {
38+
Task Task
39+
40+
blockSeq uint64 // the seq of the block being verified
41+
prevBlock *Digest
42+
emptyRounds map[uint64]struct{}
43+
}
44+
45+
func (t *TaskWithDependents) isReady() bool {
46+
return t.prevBlock == nil && len(t.emptyRounds) == 0
47+
}
48+
49+
func (t *TaskWithDependents) String() string {
50+
return fmt.Sprintf("BlockVerificationTask{blockSeq: %d, prevBlock: %v, emptyRounds: %v}", t.blockSeq, t.prevBlock, slices.Collect(maps.Keys(t.emptyRounds)))
51+
}
52+
53+
func NewBlockVerificationScheduler(logger Logger, maxDeps uint64, scheduler Scheduler) *BlockDependencyManager {
54+
b := &BlockDependencyManager{
55+
logger: logger,
56+
maxDeps: maxDeps,
57+
scheduler: scheduler,
58+
}
59+
60+
b.logger.Debug("Created BlockVerificationScheduler", zap.Uint64("maxDeps", maxDeps))
61+
return b
62+
}
63+
64+
// ExecuteBlockDependents removes the given digest from dependent tasks and schedules any whose dependencies are now resolved.
65+
func (bs *BlockDependencyManager) ExecuteBlockDependents(prev Digest) {
66+
bs.lock.Lock()
67+
defer bs.lock.Unlock()
68+
69+
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
70+
71+
for _, taskWithDeps := range bs.dependencies {
72+
if taskWithDeps.prevBlock != nil && *taskWithDeps.prevBlock == prev {
73+
taskWithDeps.prevBlock = nil
74+
}
75+
76+
if taskWithDeps.isReady() {
77+
bs.logger.Debug("Scheduling block verification task as all dependencies are met", zap.Stringer("taskID", prev))
78+
bs.scheduler.Schedule(taskWithDeps.Task)
79+
continue
80+
}
81+
82+
bs.logger.Debug("Block verification task has unsatisfied dependencies",
83+
zap.Any("prevBlock", prev),
84+
zap.Stringer("task", taskWithDeps),
85+
)
86+
87+
remainingDeps = append(remainingDeps, taskWithDeps)
88+
}
89+
90+
bs.dependencies = remainingDeps
91+
}
92+
93+
// ExecuteEmptyRoundDependents removes the given empty round from dependent tasks and schedules any whose dependencies are now resolved.
94+
func (bs *BlockDependencyManager) ExecuteEmptyRoundDependents(emptyRound uint64) {
95+
bs.lock.Lock()
96+
defer bs.lock.Unlock()
97+
98+
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
99+
100+
for _, taskWithDeps := range bs.dependencies {
101+
delete(taskWithDeps.emptyRounds, emptyRound)
102+
103+
if taskWithDeps.isReady() {
104+
bs.logger.Debug("Scheduling block verification task as all dependencies are met", zap.Stringer("task", taskWithDeps))
105+
bs.scheduler.Schedule(taskWithDeps.Task)
106+
continue
107+
}
108+
109+
bs.logger.Debug("Block verification task has unsatisfied dependencies",
110+
zap.Any("emptyRound", emptyRound),
111+
zap.Stringer("task", taskWithDeps),
112+
)
113+
remainingDeps = append(remainingDeps, taskWithDeps)
114+
}
115+
116+
bs.dependencies = remainingDeps
117+
}
118+
119+
func (bs *BlockDependencyManager) ScheduleTaskWithDependencies(task Task, blockSeq uint64, prev *Digest, emptyRounds []uint64) error {
120+
bs.lock.Lock()
121+
defer bs.lock.Unlock()
122+
123+
if bs.closed.Load() {
124+
return nil
125+
}
126+
127+
wrappedTask := func() Digest {
128+
id := task()
129+
bs.ExecuteBlockDependents(id)
130+
return id
131+
}
132+
133+
totalSize := uint64(len(bs.dependencies) + bs.scheduler.Size())
134+
if totalSize >= bs.maxDeps {
135+
bs.logger.Warn("Too many blocks being verified to ingest another one", zap.Uint64("pendingBlocks", totalSize))
136+
return fmt.Errorf("%w: %d pending verifications (max %d)", ErrTooManyPendingVerifications, totalSize, bs.maxDeps)
137+
}
138+
139+
if prev == nil && len(emptyRounds) == 0 {
140+
bs.logger.Debug("Scheduling block verification task with no dependencies", zap.Uint64("blockSeq", blockSeq))
141+
bs.scheduler.Schedule(wrappedTask)
142+
return nil
143+
}
144+
145+
bs.logger.Debug("Adding block verification task with dependencies", zap.Any("prevBlock", prev), zap.Uint64s("emptyRounds", emptyRounds))
146+
emptyRoundsSet := make(map[uint64]struct{})
147+
for _, round := range emptyRounds {
148+
emptyRoundsSet[round] = struct{}{}
149+
}
150+
151+
bs.dependencies = append(bs.dependencies, &TaskWithDependents{
152+
Task: wrappedTask,
153+
prevBlock: prev,
154+
emptyRounds: emptyRoundsSet,
155+
blockSeq: blockSeq,
156+
})
157+
158+
return nil
159+
}
160+
161+
// We can remove all tasks that have an empty notarization dependency for a round that has been finalized.
162+
func (bs *BlockDependencyManager) RemoveOldTasks(seq uint64) {
163+
bs.lock.Lock()
164+
defer bs.lock.Unlock()
165+
166+
remainingDeps := make([]*TaskWithDependents, 0, len(bs.dependencies))
167+
for _, taskWithDeps := range bs.dependencies {
168+
if taskWithDeps.blockSeq <= seq {
169+
bs.logger.Debug("Removing block verification task as its block seq is less than or equal to finalized seq", zap.Uint64("blockSeq", taskWithDeps.blockSeq), zap.Uint64("finalizedSeq", seq))
170+
continue
171+
}
172+
remainingDeps = append(remainingDeps, taskWithDeps)
173+
}
174+
175+
bs.dependencies = remainingDeps
176+
}
177+
178+
func (bs *BlockDependencyManager) Close() {
179+
bs.closed.Store(true)
180+
bs.scheduler.Close()
181+
}

0 commit comments

Comments
 (0)