Skip to content

Commit 5e53a6e

Browse files
authored
feat(scribe): Add STARTUP state to Lambda to prevent race condition. (#4613)
1 parent a5fc1dd commit 5e53a6e

File tree

9 files changed

+116
-47
lines changed

9 files changed

+116
-47
lines changed

servers/fai-lambda/fai-scribe/src/handler.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
)
1616
from .utils.agent import (
1717
SessionInterruptedError,
18-
check_if_interrupted,
1918
run_editing_session,
20-
update_session_status,
2119
)
2220
from .utils.git import setup_editing_repo
2321

@@ -77,11 +75,6 @@ async def handle_editing_request(
7775
)
7876
LOGGER.info(f"Repository ready at: {repo_path}")
7977

80-
if await check_if_interrupted(editing_id):
81-
LOGGER.warning(f"Session interrupted after git operations: {editing_id}")
82-
await update_session_status(editing_id, "waiting")
83-
raise SessionInterruptedError(f"Editing session {editing_id} was interrupted")
84-
8578
try:
8679
session_id, pr_url = await run_editing_session(
8780
repo_path=repo_path,

servers/fai-lambda/fai-scribe/src/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
load_dotenv()
88
LOGGER = logging.getLogger()
9+
LOGGER.setLevel(logging.INFO)
910

1011

1112
class Settings:

servers/fai-lambda/fai-scribe/src/utils/agent.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,6 @@ async def run_editing_session(
9696
setup_persistent_claude_storage(repo_path)
9797
configure_git_auth(repo_path)
9898

99-
await update_session_status(editing_id, "active")
100-
101-
if await check_if_interrupted(editing_id):
102-
LOGGER.warning(f"Session interrupted before starting: {editing_id}")
103-
await update_session_status(editing_id, "waiting")
104-
raise SessionInterruptedError(f"Editing session {editing_id} was interrupted")
105-
10699
if existing_pr_url:
107100
full_prompt = f"""{user_prompt}
108101
@@ -144,6 +137,10 @@ async def run_editing_session(
144137
async with ClaudeSDKClient(options=options) as client:
145138
await client.query(full_prompt)
146139

140+
if resume_session_id:
141+
await update_session_status(editing_id, "active")
142+
LOGGER.info(f"Transitioned resumed session {editing_id} from STARTUP to ACTIVE")
143+
147144
async for message in client.receive_response():
148145
if hasattr(message, "subtype") and message.subtype == "init":
149146
if hasattr(message, "data") and isinstance(message.data, dict):
@@ -152,10 +149,13 @@ async def run_editing_session(
152149
session_id = init_session_id
153150
LOGGER.info(f"Captured session_id from init message: {session_id}")
154151
await update_session_metadata(editing_id, session_id=session_id)
152+
await update_session_status(editing_id, "active")
153+
LOGGER.info(f"Transitioned new session {editing_id} from STARTUP to ACTIVE")
155154

156155
if await check_if_interrupted(editing_id):
157156
LOGGER.warning(f"Session interrupted: {editing_id}")
158-
await update_session_status(editing_id, "waiting")
157+
if session_id is not None:
158+
await update_session_status(editing_id, "waiting")
159159
raise SessionInterruptedError(f"Editing session {editing_id} was interrupted")
160160

161161
if isinstance(message, AssistantMessage):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""add_startup_status_to_editing_sessions
2+
3+
Revision ID: af951c45da91
4+
Revises: 2d743e49aaa1
5+
Create Date: 2025-10-31 11:19:09.069432
6+
7+
"""
8+
9+
from typing import (
10+
Sequence,
11+
Union,
12+
)
13+
14+
import sqlalchemy as sa
15+
from alembic import op
16+
17+
# revision identifiers, used by Alembic.
18+
revision: str = "af951c45da91"
19+
down_revision: Union[str, Sequence[str], None] = "2d743e49aaa1"
20+
branch_labels: Union[str, Sequence[str], None] = None
21+
depends_on: Union[str, Sequence[str], None] = None
22+
23+
24+
def upgrade() -> None:
25+
"""Upgrade schema."""
26+
# Add 'STARTUP' value to the editingsessionstatus enum
27+
op.execute("ALTER TYPE editingsessionstatus ADD VALUE IF NOT EXISTS 'STARTUP'")
28+
29+
30+
def downgrade() -> None:
31+
"""Downgrade schema."""
32+
# PostgreSQL does not support removing enum values easily
33+
# This would require recreating the enum type, which is complex
34+
# For now, we'll leave the enum value in place
35+
pass

servers/fai/src/fai/models/db/editing_session_db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class EditingSessionDb(Base):
3131
id = Column(String, primary_key=True)
3232

3333
session_id = Column(String, nullable=True)
34-
status = Column(Enum(EditingSessionStatus), nullable=False, default=EditingSessionStatus.WAITING)
34+
status = Column(Enum(EditingSessionStatus), nullable=False, default=EditingSessionStatus.STARTUP)
3535

3636
# Repository information
3737
repository = Column(String, nullable=False)

servers/fai/src/fai/models/types/editing_session_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
class EditingSessionStatus(str, Enum):
88
"""Status of an editing session."""
99

10+
STARTUP = "startup"
1011
WAITING = "waiting"
1112
ACTIVE = "active"
1213
INTERRUPTED = "interrupted"

servers/fai/src/fai/routes/editing_sessions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def create_editing_session(
5555
base_branch=request.base_branch,
5656
working_branch=working_branch,
5757
pr_url=None,
58-
status=EditingSessionStatus.WAITING,
58+
status=EditingSessionStatus.STARTUP,
5959
created_at=datetime.now(UTC),
6060
updated_at=datetime.now(UTC),
6161
)

servers/fai/src/fai/utils/slack/edit_handler.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,30 @@ async def get_editing_session_status(editing_id: str) -> EditingSessionStatus |
9696
async def interrupt_editing_session(editing_id: str) -> bool:
9797
"""Interrupt an active editing session. Returns True if successful."""
9898
try:
99+
status = await get_editing_session_status(editing_id)
100+
101+
if status == EditingSessionStatus.STARTUP:
102+
logger.info(f"Session {editing_id} is in STARTUP state, waiting for it to become ACTIVE")
103+
max_wait: int = 30
104+
poll_interval: float = 0.5
105+
elapsed: float = 0
106+
107+
while elapsed < max_wait:
108+
await asyncio.sleep(poll_interval)
109+
elapsed += poll_interval
110+
111+
status = await get_editing_session_status(editing_id)
112+
if status == EditingSessionStatus.ACTIVE:
113+
logger.info(f"Session {editing_id} transitioned to ACTIVE, proceeding with interruption")
114+
break
115+
elif status not in [EditingSessionStatus.STARTUP, EditingSessionStatus.ACTIVE]:
116+
logger.warning(f"Session {editing_id} in unexpected state {status}, aborting interruption")
117+
return False
118+
119+
if status != EditingSessionStatus.ACTIVE:
120+
logger.warning(f"Timeout waiting for session {editing_id} to become ACTIVE")
121+
return False
122+
99123
async with httpx.AsyncClient(timeout=10.0) as client:
100124
response = await client.post(f"{CONFIG.FAI_SERVER_URL}/editing-sessions/{editing_id}/interrupt")
101125

@@ -176,27 +200,6 @@ async def create_editing_session(repository: str, base_branch: str = "main") ->
176200
return None
177201

178202

179-
async def mark_session_as_active(editing_id: str) -> bool:
180-
"""Mark an editing session as ACTIVE before Lambda invocation."""
181-
try:
182-
async with httpx.AsyncClient(timeout=10.0) as client:
183-
response = await client.put(
184-
f"{CONFIG.FAI_SERVER_URL}/editing-sessions/{editing_id}",
185-
json={"status": "active"},
186-
)
187-
188-
if response.status_code != 200:
189-
logger.error(f"Failed to mark session as active: {response.status_code} - {response.text}")
190-
return False
191-
192-
logger.info(f"Marked editing session {editing_id} as ACTIVE")
193-
return True
194-
195-
except Exception as e:
196-
logger.error(f"Error marking session as active: {e}", exc_info=True)
197-
return False
198-
199-
200203
async def invoke_editing_lambda(
201204
prompt: str,
202205
domain: str,

servers/fai/src/fai/utils/slack/message_handler.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from dataclasses import dataclass
23
from datetime import (
34
UTC,
@@ -43,7 +44,6 @@
4344
get_or_create_editing_session_for_thread,
4445
interrupt_editing_session,
4546
invoke_editing_lambda,
46-
mark_session_as_active,
4747
store_editing_session_for_thread,
4848
wait_for_interruption,
4949
)
@@ -401,7 +401,50 @@ async def handle_slack_message(
401401
query_id=None,
402402
user_id=context.user,
403403
)
404-
elif session_status and session_status.value in ["active", "waiting"]:
404+
elif session_status and session_status.value == "startup":
405+
LOGGER.info(f"Session {existing_editing_id} is in STARTUP state, waiting for it to become ACTIVE")
406+
max_wait: int = 30
407+
poll_interval: float = 0.5
408+
elapsed: float = 0
409+
410+
while elapsed < max_wait:
411+
await asyncio.sleep(poll_interval)
412+
elapsed += poll_interval
413+
414+
session_status = await get_editing_session_status(existing_editing_id)
415+
if session_status and session_status.value in ["active", "waiting"]:
416+
LOGGER.info(
417+
f"Session {existing_editing_id} transitioned to {session_status.value}, "
418+
f"proceeding with interruption"
419+
)
420+
break
421+
elif session_status and session_status.value not in ["startup", "active", "waiting"]:
422+
LOGGER.warning(
423+
f"Session {existing_editing_id} in unexpected state {session_status.value}, aborting"
424+
)
425+
error_text = "❌ Session is in an unexpected state. Please try again."
426+
return SlackMessageResponse(
427+
response_text=error_text,
428+
channel=context.channel,
429+
thread_ts=context.thread_ts,
430+
bot_token=integration.slack_bot_token,
431+
query_id=None,
432+
user_id=context.user,
433+
)
434+
435+
if not session_status or session_status.value not in ["active", "waiting"]:
436+
LOGGER.warning(f"Timeout waiting for session {existing_editing_id} to become ACTIVE/WAITING")
437+
error_text = "⏳ Previous edit session is still starting up. Please wait a moment and try again."
438+
return SlackMessageResponse(
439+
response_text=error_text,
440+
channel=context.channel,
441+
thread_ts=context.thread_ts,
442+
bot_token=integration.slack_bot_token,
443+
query_id=None,
444+
user_id=context.user,
445+
)
446+
447+
if session_status and session_status.value in ["active", "waiting"]:
405448
LOGGER.info(f"Interrupting {session_status.value} session {existing_editing_id} for new edit request")
406449

407450
interrupt_success = await interrupt_editing_session(existing_editing_id)
@@ -508,13 +551,6 @@ async def handle_slack_message(
508551
except Exception as e:
509552
LOGGER.error(f"Failed to store editing session mapping: {e}", exc_info=True)
510553

511-
if existing_editing_id:
512-
mark_success = await mark_session_as_active(existing_editing_id)
513-
if not mark_success:
514-
LOGGER.warning(
515-
f"Failed to mark session {existing_editing_id} as ACTIVE, but proceeding with Lambda invocation"
516-
)
517-
518554
result = await invoke_editing_lambda(
519555
prompt=context.text,
520556
domain=domain_to_use,

0 commit comments

Comments
 (0)