diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 8f72bc929..5caf1d3ef 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -1210,45 +1210,6 @@ func SelectWorkflow(c *gin.Context) { branch = "main" } - // Call runner to clone and activate the workflow (if session is running) - status, _ := item.Object["status"].(map[string]interface{}) - phase, _ := status["phase"].(string) - if phase == "Running" { - runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/workflow", sessionName, project) - runnerReq := map[string]string{ - "gitUrl": req.GitURL, - "branch": branch, - "path": req.Path, - } - reqBody, _ := json.Marshal(runnerReq) - - log.Printf("Calling runner to activate workflow: %s@%s (path: %s) -> %s", req.GitURL, branch, req.Path, runnerURL) - httpReq, err := http.NewRequestWithContext(c.Request.Context(), "POST", runnerURL, bytes.NewReader(reqBody)) - if err != nil { - log.Printf("Failed to create runner request: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create runner request"}) - return - } - httpReq.Header.Set("Content-Type", "application/json") - - client := &http.Client{Timeout: 120 * time.Second} // Allow time for clone - resp, err := client.Do(httpReq) - if err != nil { - log.Printf("Failed to call runner to activate workflow: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to activate workflow (runner not reachable)"}) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - log.Printf("Runner failed to activate workflow (status %d): %s", resp.StatusCode, string(body)) - c.JSON(resp.StatusCode, gin.H{"error": fmt.Sprintf("Failed to activate workflow: %s", string(body))}) - return - } - log.Printf("Runner successfully activated workflow %s@%s for session %s", req.GitURL, branch, sessionName) - } - // Update activeWorkflow in spec spec, ok := item.Object["spec"].(map[string]interface{}) if !ok { diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts index 391aeefe5..1bf7eb409 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts @@ -32,7 +32,7 @@ export function useWorkflowManagement({ }, []); // Activate the pending workflow (or a workflow passed directly) - const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string) => { + const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string, retryCount = 0) => { const workflow = workflowToActivate || pendingWorkflow; if (!workflow) return false; @@ -52,7 +52,10 @@ export function useWorkflowManagement({ return true; // Don't return false - we've queued it successfully } - setWorkflowActivating(true); + // Only set loading state on first attempt (not retries) + if (retryCount === 0) { + setWorkflowActivating(true); + } try { // Update CR with workflow configuration @@ -68,6 +71,16 @@ export function useWorkflowManagement({ if (!response.ok) { const errorData = await response.json(); + + // If runner not ready and we haven't retried too many times, retry with backoff + if (errorData.retryable && retryCount < 5) { + const delay = Math.min(1000 * Math.pow(1.5, retryCount), 5000); // Exponential backoff, max 5s + console.log(`Runner not ready, retrying in ${delay}ms (attempt ${retryCount + 1}/5)...`); + await new Promise(resolve => setTimeout(resolve, delay)); + // Retry without resetting loading state + return activateWorkflow(workflow, phase, retryCount + 1); + } + throw new Error(errorData.error || "Failed to update workflow"); } @@ -80,6 +93,7 @@ export function useWorkflowManagement({ onWorkflowActivated?.(); + setWorkflowActivating(false); return true; } catch (error) { console.error("Failed to activate workflow:", error); @@ -87,8 +101,6 @@ export function useWorkflowManagement({ sessionQueue.clearWorkflow(); setWorkflowActivating(false); return false; - } finally { - setWorkflowActivating(false); } }, [pendingWorkflow, projectName, sessionName, sessionPhase, sessionQueue, onWorkflowActivated]); diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index 28812b3a2..e70040b06 100644 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -1662,9 +1662,10 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec reconciledWorkflowRaw, _, _ := unstructured.NestedMap(status, "reconciledWorkflow") reconciledGitURL, _ := reconciledWorkflowRaw["gitUrl"].(string) reconciledBranch, _ := reconciledWorkflowRaw["branch"].(string) + reconciledPath, _ := reconciledWorkflowRaw["path"].(string) // Detect drift: workflow changed - if reconciledGitURL == gitURL && reconciledBranch == branch { + if reconciledGitURL == gitURL && reconciledBranch == branch && reconciledPath == path { return nil } diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 9bf99ca3e..56701208a 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -193,6 +193,8 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): # Track if adapter has been initialized _adapter_initialized = False +# Prevent duplicate workflow updates/greetings from concurrent calls +_workflow_change_lock = asyncio.Lock() @app.post("/") @@ -511,35 +513,48 @@ async def change_workflow(request: Request): raise HTTPException(status_code=503, detail="Adapter not initialized") body = await request.json() - git_url = body.get("gitUrl", "") - branch = body.get("branch", "main") - path = body.get("path", "") + git_url = (body.get("gitUrl") or "").strip() + branch = (body.get("branch") or "main").strip() or "main" + path = (body.get("path") or "").strip() logger.info(f"Workflow change request: {git_url}@{branch} (path: {path})") - - # Clone the workflow repository at runtime - # This is needed because the init container only runs once at pod startup - if git_url: - success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path) - if not success: - logger.warning("Failed to clone workflow, will use default workflow directory") - - # Update environment variables - os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url - os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch - os.environ["ACTIVE_WORKFLOW_PATH"] = path - - # Reset adapter state to force reinitialization on next run - _adapter_initialized = False - adapter._first_run = True - - logger.info("Workflow updated, adapter will reinitialize on next run") - - # Trigger a new run to greet user with workflow context - # This runs in background via backend POST - asyncio.create_task(trigger_workflow_greeting(git_url, branch, path)) - - return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} + + async with _workflow_change_lock: + current_git_url = os.getenv("ACTIVE_WORKFLOW_GIT_URL", "").strip() + current_branch = os.getenv("ACTIVE_WORKFLOW_BRANCH", "main").strip() or "main" + current_path = os.getenv("ACTIVE_WORKFLOW_PATH", "").strip() + + if ( + current_git_url == git_url + and current_branch == branch + and current_path == path + ): + logger.info("Workflow unchanged; skipping reinit and greeting") + return {"message": "Workflow already active", "gitUrl": git_url, "branch": branch, "path": path} + + # Clone the workflow repository at runtime + # This is needed because the init container only runs once at pod startup + if git_url: + success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path) + if not success: + logger.warning("Failed to clone workflow, will use default workflow directory") + + # Update environment variables + os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url + os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch + os.environ["ACTIVE_WORKFLOW_PATH"] = path + + # Reset adapter state to force reinitialization on next run + _adapter_initialized = False + adapter._first_run = True + + logger.info("Workflow updated, adapter will reinitialize on next run") + + # Trigger a new run to greet user with workflow context + # This runs in background via backend POST + asyncio.create_task(trigger_workflow_greeting(git_url, branch, path)) + + return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} async def get_default_branch(repo_path: str) -> str: