Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 72 additions & 21 deletions runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,81 @@ type agentMessageResponse struct {
}

func ensureAgentReady(ctx context.Context, agentId string) (*AgentInfo, *goakt.PID, error) {
info, pid, err := getAgentInfo(ctx, agentId)
if pid != nil {
return info, pid, nil
}
if info == nil {
return info, nil, err
}
const maxRetries = 8
const baseDelay = 25 * time.Millisecond
const maxDelay = 500 * time.Millisecond

var lastErr error

for attempt := 0; attempt < maxRetries; attempt++ {
info, pid, err := getAgentInfo(ctx, agentId)

if pid != nil {
actor := pid.Actor().(*wasmAgentActor)
if actor.status == AgentStatusRunning {
return info, pid, nil
}
lastErr = fmt.Errorf("agent %s not ready (status: %s)", agentId, actor.status)
} else if info != nil {
switch info.Status {
case AgentStatusStarting:
lastErr = fmt.Errorf("agent %s is still starting", agentId)

case AgentStatusSuspended:
// Try to resume the agent (only on first attempt to avoid spam)
if attempt == 0 {
host := wasmhost.GetWasmHost(ctx)
plugin, ok := plugins.GetPluginFromContext(ctx)
if !ok {
return info, nil, fmt.Errorf("no plugin found in context for agent %s", agentId)
}

// Resume asynchronously
go func() {
if _, err := spawnActorForAgent(host, plugin, agentId, info.Name, false); err != nil {
logger.Err(context.Background(), err).Msgf("Failed to resume agent %s", agentId)
}
}()
Comment on lines +203 to +207
Copy link

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using context.Background() for logging drops metadata (tracing, cancellation) from the original context. Consider deriving a child context or using the existing ctx for richer diagnostics.

Suggested change
go func() {
if _, err := spawnActorForAgent(host, plugin, agentId, info.Name, false); err != nil {
logger.Err(context.Background(), err).Msgf("Failed to resume agent %s", agentId)
}
}()
go func(parentCtx context.Context) {
derivedCtx := context.WithValue(parentCtx, "operation", "resumeAgent")
if _, err := spawnActorForAgent(host, plugin, agentId, info.Name, false); err != nil {
logger.Err(derivedCtx, err).Msgf("Failed to resume agent %s", agentId)
}
}(ctx)

Copilot uses AI. Check for mistakes.
}
lastErr = fmt.Errorf("agent %s is resuming", agentId)

case AgentStatusTerminated:
// Permanent failure - don't retry
return info, nil, fmt.Errorf("agent %s is terminated", agentId)

default:
// Other statuses - don't retry
return info, nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, info.Status)
}
} else if err != nil {
// Handle the error from getAgentInfo
lastErr = err
} else {
// Agent doesn't exist at all - permanent failure
return nil, nil, fmt.Errorf("agent %s not found", agentId)
}

// Exit early if this is the last attempt
if attempt == maxRetries-1 {
break
}

// Wait before retrying with exponential backoff
delay := time.Duration(float64(baseDelay) * (1.5*float64(attempt) + 1))
Copy link

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current formula yields linear backoff rather than true exponential backoff. To match the PR description, consider using a power function (e.g., baseDelay * time.Duration(math.Pow(2, float64(attempt)))).

Copilot uses AI. Check for mistakes.
if delay > maxDelay {
delay = maxDelay
}

switch info.Status {
case AgentStatusSuspended:
// the actor is suspended, so we can try to resume it
host := wasmhost.GetWasmHost(ctx)
plugin, ok := plugins.GetPluginFromContext(ctx)
if !ok {
return info, nil, fmt.Errorf("no plugin found in context for agent %s", agentId)
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(delay):
// Continue to next attempt
}
pid, err := spawnActorForAgent(host, plugin, agentId, info.Name, false)
return info, pid, err
case AgentStatusTerminated:
return info, nil, fmt.Errorf("agent %s is terminated", agentId)
default:
// this means the agent is running on another node - TODO: handle this somehow
return info, nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, info.Status)
}

// All retries exhausted
return nil, nil, fmt.Errorf("agent %s not ready after %d attempts: %v", agentId, maxRetries, lastErr)
}

func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) {
Expand Down
Loading