diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 0215c3669..2bfef8a38 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -170,30 +170,81 @@ type agentMessageResponse struct { } func ensureAgentReady(ctx context.Context, agentId string) (*AgentInfo, *goakt.PID, error) { - info, pid, err := getAgentInfo(ctx, agentId) - if pid != nil { - return info, pid, nil - } - if info == nil { - return info, nil, err - } + const maxRetries = 8 + const baseDelay = 25 * time.Millisecond + const maxDelay = 500 * time.Millisecond + + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + info, pid, err := getAgentInfo(ctx, agentId) + + if pid != nil { + actor := pid.Actor().(*wasmAgentActor) + if actor.status == AgentStatusRunning { + return info, pid, nil + } + lastErr = fmt.Errorf("agent %s not ready (status: %s)", agentId, actor.status) + } else if info != nil { + switch info.Status { + case AgentStatusStarting: + lastErr = fmt.Errorf("agent %s is still starting", agentId) + + case AgentStatusSuspended: + // Try to resume the agent (only on first attempt to avoid spam) + if attempt == 0 { + host := wasmhost.GetWasmHost(ctx) + plugin, ok := plugins.GetPluginFromContext(ctx) + if !ok { + return info, nil, fmt.Errorf("no plugin found in context for agent %s", agentId) + } + + // Resume asynchronously + go func() { + if _, err := spawnActorForAgent(host, plugin, agentId, info.Name, false); err != nil { + logger.Err(context.Background(), err).Msgf("Failed to resume agent %s", agentId) + } + }() + } + lastErr = fmt.Errorf("agent %s is resuming", agentId) + + case AgentStatusTerminated: + // Permanent failure - don't retry + return info, nil, fmt.Errorf("agent %s is terminated", agentId) + + default: + // Other statuses - don't retry + return info, nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, info.Status) + } + } else if err != nil { + // Handle the error from getAgentInfo + lastErr = err + } else { + // Agent doesn't exist at all - permanent failure + return nil, nil, fmt.Errorf("agent %s not found", agentId) + } + + // Exit early if this is the last attempt + if attempt == maxRetries-1 { + break + } + + // Wait before retrying with exponential backoff + delay := time.Duration(float64(baseDelay) * (1.5*float64(attempt) + 1)) + if delay > maxDelay { + delay = maxDelay + } - switch info.Status { - case AgentStatusSuspended: - // the actor is suspended, so we can try to resume it - host := wasmhost.GetWasmHost(ctx) - plugin, ok := plugins.GetPluginFromContext(ctx) - if !ok { - return info, nil, fmt.Errorf("no plugin found in context for agent %s", agentId) + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-time.After(delay): + // Continue to next attempt } - pid, err := spawnActorForAgent(host, plugin, agentId, info.Name, false) - return info, pid, err - case AgentStatusTerminated: - return info, nil, fmt.Errorf("agent %s is terminated", agentId) - default: - // this means the agent is running on another node - TODO: handle this somehow - return info, nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, info.Status) } + + // All retries exhausted + return nil, nil, fmt.Errorf("agent %s not ready after %d attempts: %v", agentId, maxRetries, lastErr) } func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) {