Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,45 +1210,6 @@ func SelectWorkflow(c *gin.Context) {
branch = "main"
}

// Call runner to clone and activate the workflow (if session is running)
status, _ := item.Object["status"].(map[string]interface{})
phase, _ := status["phase"].(string)
if phase == "Running" {
runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/workflow", sessionName, project)
runnerReq := map[string]string{
"gitUrl": req.GitURL,
"branch": branch,
"path": req.Path,
}
reqBody, _ := json.Marshal(runnerReq)

log.Printf("Calling runner to activate workflow: %s@%s (path: %s) -> %s", req.GitURL, branch, req.Path, runnerURL)
httpReq, err := http.NewRequestWithContext(c.Request.Context(), "POST", runnerURL, bytes.NewReader(reqBody))
if err != nil {
log.Printf("Failed to create runner request: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create runner request"})
return
}
httpReq.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: 120 * time.Second} // Allow time for clone
resp, err := client.Do(httpReq)
if err != nil {
log.Printf("Failed to call runner to activate workflow: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to activate workflow (runner not reachable)"})
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Printf("Runner failed to activate workflow (status %d): %s", resp.StatusCode, string(body))
c.JSON(resp.StatusCode, gin.H{"error": fmt.Sprintf("Failed to activate workflow: %s", string(body))})
return
}
log.Printf("Runner successfully activated workflow %s@%s for session %s", req.GitURL, branch, sessionName)
}

// Update activeWorkflow in spec
spec, ok := item.Object["spec"].(map[string]interface{})
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export function useWorkflowManagement({
}, []);

// Activate the pending workflow (or a workflow passed directly)
const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string) => {
const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string, retryCount = 0) => {
const workflow = workflowToActivate || pendingWorkflow;
if (!workflow) return false;

Expand All @@ -52,7 +52,10 @@ export function useWorkflowManagement({
return true; // Don't return false - we've queued it successfully
}

setWorkflowActivating(true);
// Only set loading state on first attempt (not retries)
if (retryCount === 0) {
setWorkflowActivating(true);
}

try {
// Update CR with workflow configuration
Expand All @@ -68,6 +71,16 @@ export function useWorkflowManagement({

if (!response.ok) {
const errorData = await response.json();

// If runner not ready and we haven't retried too many times, retry with backoff
if (errorData.retryable && retryCount < 5) {
const delay = Math.min(1000 * Math.pow(1.5, retryCount), 5000); // Exponential backoff, max 5s
console.log(`Runner not ready, retrying in ${delay}ms (attempt ${retryCount + 1}/5)...`);
await new Promise(resolve => setTimeout(resolve, delay));
// Retry without resetting loading state
return activateWorkflow(workflow, phase, retryCount + 1);
}

throw new Error(errorData.error || "Failed to update workflow");
}

Expand All @@ -80,15 +93,14 @@ export function useWorkflowManagement({

onWorkflowActivated?.();

setWorkflowActivating(false);
return true;
} catch (error) {
console.error("Failed to activate workflow:", error);
errorToast(error instanceof Error ? error.message : "Failed to activate workflow");
sessionQueue.clearWorkflow();
setWorkflowActivating(false);
return false;
} finally {
setWorkflowActivating(false);
}
}, [pendingWorkflow, projectName, sessionName, sessionPhase, sessionQueue, onWorkflowActivated]);

Expand Down
3 changes: 2 additions & 1 deletion components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,9 +1662,10 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec
reconciledWorkflowRaw, _, _ := unstructured.NestedMap(status, "reconciledWorkflow")
reconciledGitURL, _ := reconciledWorkflowRaw["gitUrl"].(string)
reconciledBranch, _ := reconciledWorkflowRaw["branch"].(string)
reconciledPath, _ := reconciledWorkflowRaw["path"].(string)

// Detect drift: workflow changed
if reconciledGitURL == gitURL && reconciledBranch == branch {
if reconciledGitURL == gitURL && reconciledBranch == branch && reconciledPath == path {
return nil
}

Expand Down
69 changes: 42 additions & 27 deletions components/runners/claude-code-runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str):

# Track if adapter has been initialized
_adapter_initialized = False
# Prevent duplicate workflow updates/greetings from concurrent calls
_workflow_change_lock = asyncio.Lock()


@app.post("/")
Expand Down Expand Up @@ -511,35 +513,48 @@ async def change_workflow(request: Request):
raise HTTPException(status_code=503, detail="Adapter not initialized")

body = await request.json()
git_url = body.get("gitUrl", "")
branch = body.get("branch", "main")
path = body.get("path", "")
git_url = (body.get("gitUrl") or "").strip()
branch = (body.get("branch") or "main").strip() or "main"
path = (body.get("path") or "").strip()

logger.info(f"Workflow change request: {git_url}@{branch} (path: {path})")

# Clone the workflow repository at runtime
# This is needed because the init container only runs once at pod startup
if git_url:
success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path)
if not success:
logger.warning("Failed to clone workflow, will use default workflow directory")

# Update environment variables
os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url
os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch
os.environ["ACTIVE_WORKFLOW_PATH"] = path

# Reset adapter state to force reinitialization on next run
_adapter_initialized = False
adapter._first_run = True

logger.info("Workflow updated, adapter will reinitialize on next run")

# Trigger a new run to greet user with workflow context
# This runs in background via backend POST
asyncio.create_task(trigger_workflow_greeting(git_url, branch, path))

return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path}

async with _workflow_change_lock:
current_git_url = os.getenv("ACTIVE_WORKFLOW_GIT_URL", "").strip()
current_branch = os.getenv("ACTIVE_WORKFLOW_BRANCH", "main").strip() or "main"
current_path = os.getenv("ACTIVE_WORKFLOW_PATH", "").strip()

if (
current_git_url == git_url
and current_branch == branch
and current_path == path
):
logger.info("Workflow unchanged; skipping reinit and greeting")
return {"message": "Workflow already active", "gitUrl": git_url, "branch": branch, "path": path}

# Clone the workflow repository at runtime
# This is needed because the init container only runs once at pod startup
if git_url:
success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path)
if not success:
logger.warning("Failed to clone workflow, will use default workflow directory")

# Update environment variables
os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url
os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch
os.environ["ACTIVE_WORKFLOW_PATH"] = path

# Reset adapter state to force reinitialization on next run
_adapter_initialized = False
adapter._first_run = True

logger.info("Workflow updated, adapter will reinitialize on next run")

# Trigger a new run to greet user with workflow context
# This runs in background via backend POST
asyncio.create_task(trigger_workflow_greeting(git_url, branch, path))

return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path}


async def get_default_branch(repo_path: str) -> str:
Expand Down
Loading