Skip to content

Commit 2ce5d5d

Browse files
ItalyPaleAleyaron2
andauthored
Remove ActorScanInterval (dapr#7654)
* Remove ActorScanInterval Signed-off-by: ItalyPaleAle <[email protected]> * Rename file Signed-off-by: ItalyPaleAle <[email protected]> --------- Signed-off-by: ItalyPaleAle <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent a524889 commit 2ce5d5d

File tree

20 files changed

+245
-206
lines changed

20 files changed

+245
-206
lines changed

pkg/actors/actors.go

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"github.com/dapr/dapr/pkg/retry"
5454
"github.com/dapr/dapr/pkg/runtime/compstore"
5555
"github.com/dapr/dapr/pkg/security"
56+
eventqueue "github.com/dapr/kit/events/queue"
5657
"github.com/dapr/kit/logger"
5758
"github.com/dapr/kit/ptr"
5859
)
@@ -63,6 +64,9 @@ const (
6364

6465
errStateStoreNotFound = "actors: state store does not exist or incorrectly configured"
6566
errStateStoreNotConfigured = `actors: state store does not exist or incorrectly configured. Have you set the property '{"name": "actorStateStore", "value": "true"}' in your state store component file?`
67+
68+
// If an idle actor is getting deactivated, but it's still busy, will be re-enqueued with its idle timeout increased by this duration.
69+
actorBusyReEnqueueInterval = 10 * time.Second
6670
)
6771

6872
var (
@@ -110,6 +114,8 @@ type Actors interface {
110114
type GRPCConnectionFn func(ctx context.Context, address string, id string, namespace string, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(destroy bool), error)
111115

112116
type actorsRuntime struct {
117+
idleActorProcessor *eventqueue.Processor[string, *actor]
118+
113119
appChannel channel.AppChannel
114120
placement internal.PlacementService
115121
placementEnabled bool
@@ -223,6 +229,7 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
223229

224230
a.timers.SetExecuteTimerFn(a.executeTimer)
225231

232+
a.idleActorProcessor = eventqueue.NewProcessor[string, *actor](a.idleProcessorExecuteFn).WithClock(clock)
226233
return a, nil
227234
}
228235

@@ -305,14 +312,7 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
305312
a.placement.Start(ctx)
306313
}()
307314

308-
a.wg.Add(1)
309-
go func() {
310-
defer a.wg.Done()
311-
a.deactivationTicker(a.actorsConfig, a.haltActor)
312-
}()
313-
314-
log.Infof("Actor runtime started. Actor idle timeout: %v. Actor scan interval: %v",
315-
a.actorsConfig.Config.ActorIdleTimeout, a.actorsConfig.Config.ActorDeactivationScanInterval)
315+
log.Infof("Actor runtime started. Idle timeout: %v", a.actorsConfig.Config.ActorIdleTimeout)
316316

317317
return nil
318318
}
@@ -464,50 +464,11 @@ func (a *actorsRuntime) deactivateActor(act *actor) error {
464464
return nil
465465
}
466466

467-
func (a *actorsRuntime) removeActorFromTable(actorType, actorID string) {
468-
a.actorsTable.Delete(constructCompositeKey(actorType, actorID))
469-
}
470-
471467
func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) {
472468
typ, id, _ := strings.Cut(key, daprSeparator)
473469
return typ, id
474470
}
475471

476-
func (a *actorsRuntime) deactivationTicker(configuration Config, haltFn internal.HaltActorFn) {
477-
ticker := a.clock.NewTicker(configuration.ActorDeactivationScanInterval)
478-
ch := ticker.C()
479-
defer ticker.Stop()
480-
481-
for {
482-
select {
483-
case t := <-ch:
484-
a.actorsTable.Range(func(key, value any) bool {
485-
actorInstance := value.(*actor)
486-
487-
if actorInstance.isBusy() {
488-
return true
489-
}
490-
491-
if !t.Before(actorInstance.ScheduledTime()) {
492-
a.wg.Add(1)
493-
go func(actorKey string) {
494-
defer a.wg.Done()
495-
actorType, actorID := a.getActorTypeAndIDFromKey(actorKey)
496-
err := haltFn(actorType, actorID)
497-
if err != nil {
498-
log.Errorf("failed to deactivate actor %s: %s", actorKey, err)
499-
}
500-
}(key.(string))
501-
}
502-
503-
return true
504-
})
505-
case <-a.closeCh:
506-
return
507-
}
508-
}
509-
}
510-
511472
// Returns an internal actor instance, allocating it if needed.
512473
// If the actor type does not correspond to an internal actor, the returned boolean is false
513474
func (a *actorsRuntime) getInternalActor(actorType string, actorID string) (InternalActor, bool) {
@@ -648,6 +609,10 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *internalv1pb.In
648609
if err != nil {
649610
return nil, status.Error(codes.ResourceExhausted, err.Error())
650611
}
612+
err = a.idleActorProcessor.Enqueue(act)
613+
if err != nil {
614+
return nil, fmt.Errorf("failed to enqueue actor in idle processor: %w", err)
615+
}
651616
defer act.unlock()
652617

653618
// Replace method to actors method.
@@ -1302,6 +1267,12 @@ func (a *actorsRuntime) Close() error {
13021267
errs = append(errs, fmt.Errorf("failed to close placement service: %w", err))
13031268
}
13041269
}
1270+
if a.idleActorProcessor != nil {
1271+
err := a.idleActorProcessor.Close()
1272+
if err != nil {
1273+
errs = append(errs, fmt.Errorf("failed to close actor idle processor: %w", err))
1274+
}
1275+
}
13051276
}
13061277

13071278
return errors.Join(errs...)

0 commit comments

Comments
 (0)