Skip to content

Commit 903f020

Browse files
authored
Refactor workflow activation logic and improve error handling (#511)
## Summary\n- remove redundant workflow activation in backend session handler\n- add retry logic with exponential backoff in workflow activation hook\n- avoid duplicate workflow updates by checking current state\n- improve logging and error handling around workflow activation\n\n## Testing\n- not run (not requested)
1 parent 1e3db39 commit 903f020

File tree

4 files changed

+60
-71
lines changed

4 files changed

+60
-71
lines changed

components/backend/handlers/sessions.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,45 +1210,6 @@ func SelectWorkflow(c *gin.Context) {
12101210
branch = "main"
12111211
}
12121212

1213-
// Call runner to clone and activate the workflow (if session is running)
1214-
status, _ := item.Object["status"].(map[string]interface{})
1215-
phase, _ := status["phase"].(string)
1216-
if phase == "Running" {
1217-
runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/workflow", sessionName, project)
1218-
runnerReq := map[string]string{
1219-
"gitUrl": req.GitURL,
1220-
"branch": branch,
1221-
"path": req.Path,
1222-
}
1223-
reqBody, _ := json.Marshal(runnerReq)
1224-
1225-
log.Printf("Calling runner to activate workflow: %s@%s (path: %s) -> %s", req.GitURL, branch, req.Path, runnerURL)
1226-
httpReq, err := http.NewRequestWithContext(c.Request.Context(), "POST", runnerURL, bytes.NewReader(reqBody))
1227-
if err != nil {
1228-
log.Printf("Failed to create runner request: %v", err)
1229-
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create runner request"})
1230-
return
1231-
}
1232-
httpReq.Header.Set("Content-Type", "application/json")
1233-
1234-
client := &http.Client{Timeout: 120 * time.Second} // Allow time for clone
1235-
resp, err := client.Do(httpReq)
1236-
if err != nil {
1237-
log.Printf("Failed to call runner to activate workflow: %v", err)
1238-
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to activate workflow (runner not reachable)"})
1239-
return
1240-
}
1241-
defer resp.Body.Close()
1242-
1243-
if resp.StatusCode != http.StatusOK {
1244-
body, _ := io.ReadAll(resp.Body)
1245-
log.Printf("Runner failed to activate workflow (status %d): %s", resp.StatusCode, string(body))
1246-
c.JSON(resp.StatusCode, gin.H{"error": fmt.Sprintf("Failed to activate workflow: %s", string(body))})
1247-
return
1248-
}
1249-
log.Printf("Runner successfully activated workflow %s@%s for session %s", req.GitURL, branch, sessionName)
1250-
}
1251-
12521213
// Update activeWorkflow in spec
12531214
spec, ok := item.Object["spec"].(map[string]interface{})
12541215
if !ok {

components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export function useWorkflowManagement({
3232
}, []);
3333

3434
// Activate the pending workflow (or a workflow passed directly)
35-
const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string) => {
35+
const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string, retryCount = 0) => {
3636
const workflow = workflowToActivate || pendingWorkflow;
3737
if (!workflow) return false;
3838

@@ -52,7 +52,10 @@ export function useWorkflowManagement({
5252
return true; // Don't return false - we've queued it successfully
5353
}
5454

55-
setWorkflowActivating(true);
55+
// Only set loading state on first attempt (not retries)
56+
if (retryCount === 0) {
57+
setWorkflowActivating(true);
58+
}
5659

5760
try {
5861
// Update CR with workflow configuration
@@ -68,6 +71,16 @@ export function useWorkflowManagement({
6871

6972
if (!response.ok) {
7073
const errorData = await response.json();
74+
75+
// If runner not ready and we haven't retried too many times, retry with backoff
76+
if (errorData.retryable && retryCount < 5) {
77+
const delay = Math.min(1000 * Math.pow(1.5, retryCount), 5000); // Exponential backoff, max 5s
78+
console.log(`Runner not ready, retrying in ${delay}ms (attempt ${retryCount + 1}/5)...`);
79+
await new Promise(resolve => setTimeout(resolve, delay));
80+
// Retry without resetting loading state
81+
return activateWorkflow(workflow, phase, retryCount + 1);
82+
}
83+
7184
throw new Error(errorData.error || "Failed to update workflow");
7285
}
7386

@@ -80,15 +93,14 @@ export function useWorkflowManagement({
8093

8194
onWorkflowActivated?.();
8295

96+
setWorkflowActivating(false);
8397
return true;
8498
} catch (error) {
8599
console.error("Failed to activate workflow:", error);
86100
errorToast(error instanceof Error ? error.message : "Failed to activate workflow");
87101
sessionQueue.clearWorkflow();
88102
setWorkflowActivating(false);
89103
return false;
90-
} finally {
91-
setWorkflowActivating(false);
92104
}
93105
}, [pendingWorkflow, projectName, sessionName, sessionPhase, sessionQueue, onWorkflowActivated]);
94106

components/operator/internal/handlers/sessions.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1662,9 +1662,10 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec
16621662
reconciledWorkflowRaw, _, _ := unstructured.NestedMap(status, "reconciledWorkflow")
16631663
reconciledGitURL, _ := reconciledWorkflowRaw["gitUrl"].(string)
16641664
reconciledBranch, _ := reconciledWorkflowRaw["branch"].(string)
1665+
reconciledPath, _ := reconciledWorkflowRaw["path"].(string)
16651666

16661667
// Detect drift: workflow changed
1667-
if reconciledGitURL == gitURL && reconciledBranch == branch {
1668+
if reconciledGitURL == gitURL && reconciledBranch == branch && reconciledPath == path {
16681669
return nil
16691670
}
16701671

components/runners/claude-code-runner/main.py

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str):
193193

194194
# Track if adapter has been initialized
195195
_adapter_initialized = False
196+
# Prevent duplicate workflow updates/greetings from concurrent calls
197+
_workflow_change_lock = asyncio.Lock()
196198

197199

198200
@app.post("/")
@@ -511,35 +513,48 @@ async def change_workflow(request: Request):
511513
raise HTTPException(status_code=503, detail="Adapter not initialized")
512514

513515
body = await request.json()
514-
git_url = body.get("gitUrl", "")
515-
branch = body.get("branch", "main")
516-
path = body.get("path", "")
516+
git_url = (body.get("gitUrl") or "").strip()
517+
branch = (body.get("branch") or "main").strip() or "main"
518+
path = (body.get("path") or "").strip()
517519

518520
logger.info(f"Workflow change request: {git_url}@{branch} (path: {path})")
519-
520-
# Clone the workflow repository at runtime
521-
# This is needed because the init container only runs once at pod startup
522-
if git_url:
523-
success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path)
524-
if not success:
525-
logger.warning("Failed to clone workflow, will use default workflow directory")
526-
527-
# Update environment variables
528-
os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url
529-
os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch
530-
os.environ["ACTIVE_WORKFLOW_PATH"] = path
531-
532-
# Reset adapter state to force reinitialization on next run
533-
_adapter_initialized = False
534-
adapter._first_run = True
535-
536-
logger.info("Workflow updated, adapter will reinitialize on next run")
537-
538-
# Trigger a new run to greet user with workflow context
539-
# This runs in background via backend POST
540-
asyncio.create_task(trigger_workflow_greeting(git_url, branch, path))
541-
542-
return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path}
521+
522+
async with _workflow_change_lock:
523+
current_git_url = os.getenv("ACTIVE_WORKFLOW_GIT_URL", "").strip()
524+
current_branch = os.getenv("ACTIVE_WORKFLOW_BRANCH", "main").strip() or "main"
525+
current_path = os.getenv("ACTIVE_WORKFLOW_PATH", "").strip()
526+
527+
if (
528+
current_git_url == git_url
529+
and current_branch == branch
530+
and current_path == path
531+
):
532+
logger.info("Workflow unchanged; skipping reinit and greeting")
533+
return {"message": "Workflow already active", "gitUrl": git_url, "branch": branch, "path": path}
534+
535+
# Clone the workflow repository at runtime
536+
# This is needed because the init container only runs once at pod startup
537+
if git_url:
538+
success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path)
539+
if not success:
540+
logger.warning("Failed to clone workflow, will use default workflow directory")
541+
542+
# Update environment variables
543+
os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url
544+
os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch
545+
os.environ["ACTIVE_WORKFLOW_PATH"] = path
546+
547+
# Reset adapter state to force reinitialization on next run
548+
_adapter_initialized = False
549+
adapter._first_run = True
550+
551+
logger.info("Workflow updated, adapter will reinitialize on next run")
552+
553+
# Trigger a new run to greet user with workflow context
554+
# This runs in background via backend POST
555+
asyncio.create_task(trigger_workflow_greeting(git_url, branch, path))
556+
557+
return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path}
543558

544559

545560
async def get_default_branch(repo_path: str) -> str:

0 commit comments

Comments
 (0)