Skip to content

Commit 51e7c41

Browse files
TerryhungTerry
andauthored
feat: add filter for actor changes (#1195)
* Add threshold for processing actor changes * Refine the error message text * let threshold be more flexible * fix lint * Move the check to startActor * Refine the order of filtering result * Remove useless function * Add new end line of gitignore * Update gitignore and function name --------- Co-authored-by: Terry <[email protected]>
1 parent 0c54ee4 commit 51e7c41

File tree

3 files changed

+35
-6
lines changed

3 files changed

+35
-6
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ visor
2121
build/.*
2222
support/tools/bin
2323

24+
# Deployment
25+
*.env

chain/indexer/integrated/processor/state.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,6 @@ func (sp *StateProcessor) startTipSets(ctx context.Context, current, executed *t
312312
// startActor starts all ActorProcessor's in parallel, their results are emitted on the `results` channel.
313313
// A list containing all executed task names is returned.
314314
func (sp *StateProcessor) startActor(ctx context.Context, current, executed *types.TipSet, results chan *Result) []string {
315-
316315
if len(sp.actorProcessors) == 0 {
317316
return nil
318317
}

tasks/actorstate/task.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package actorstate
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
"strconv"
68
"sync"
79
"time"
810

@@ -41,6 +43,27 @@ func NewTaskWithTransformer(node tasks.DataSource, extractorMap ActorExtractorMa
4143
}
4244
}
4345

46+
const actorChangeLimiterEnv = "LILY_ACTOR_CHANGE_LIMITER"
47+
48+
const defaultActorChangeLimit = 10000
49+
50+
// Skip task processing when actor state changes abnormally.
51+
func (t *Task) exceedsActorChangeLimit(changes map[address.Address]tasks.ActorStateChange) bool {
52+
// The 10,000 is a default threshold
53+
limit := defaultActorChangeLimit
54+
55+
// Access the custom threshold set in the environment variable
56+
s, ok := os.LookupEnv(actorChangeLimiterEnv)
57+
if ok {
58+
v, err := strconv.ParseInt(s, 10, 64)
59+
if err == nil {
60+
limit = int(v)
61+
}
62+
}
63+
64+
return len(changes) > limit
65+
}
66+
4467
func (t *Task) ProcessActors(ctx context.Context, current *types.TipSet, executed *types.TipSet, candidates tasks.ActorStateChangeDiff) (model.Persistable, *visormodel.ProcessingReport, error) {
4568
ctx, span := otel.Tracer("").Start(ctx, "ProcessActors")
4669
defer span.End()
@@ -60,15 +83,20 @@ func (t *Task) ProcessActors(ctx context.Context, current *types.TipSet, execute
6083
ll := log.With("height", int64(current.Height()))
6184
ll.Debug("processing actor state changes")
6285

63-
data := make(model.PersistableList, 0, len(actors))
64-
errorsDetected := make([]*ActorStateError, 0, len(actors))
65-
skippedActors := 0
66-
6786
if len(actors) == 0 {
6887
ll.Debug("no actor state changes found")
69-
return data, report, nil
88+
return model.PersistableList{}, report, nil
89+
}
90+
91+
if t.exceedsActorChangeLimit(actors) {
92+
err := fmt.Errorf("task skipped - max limit for handling actor state changes")
93+
return nil, report, err
7094
}
7195

96+
data := make(model.PersistableList, 0, len(actors))
97+
errorsDetected := make([]*ActorStateError, 0, len(actors))
98+
skippedActors := 0
99+
72100
start := time.Now()
73101
ll.Debug("found actor state changes", "count", len(actors))
74102

0 commit comments

Comments
 (0)