Skip to content

Commit 1ddbb45

Browse files
committed
add latest handler updates.
1 parent ea21ded commit 1ddbb45

File tree

4 files changed

+117
-24
lines changed

4 files changed

+117
-24
lines changed

servers/fai-lambda/fai-scribe/src/utils/sqs_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import logging
32
from typing import Any
43

54
import boto3
@@ -73,7 +72,7 @@ def has_interrupt_message(self) -> tuple[bool, str | None]:
7372
for msg in messages:
7473
body = msg["body"]
7574
if body.get("type") == "INTERRUPT":
76-
LOGGER.info(f"Found INTERRUPT message in queue")
75+
LOGGER.info("Found INTERRUPT message in queue")
7776
return True, msg["receipt_handle"]
7877
else:
7978
self._cached_messages.append(msg)
@@ -95,7 +94,10 @@ def get_resume_messages(self) -> list[dict[str, Any]]:
9594
resume_messages.append({"body": body, "receipt_handle": msg["receipt_handle"]})
9695

9796
if resume_messages:
98-
LOGGER.info(f"Found {len(resume_messages)} RESUME messages ({len([m for m in self._cached_messages if m['body'].get('type') == 'RESUME'])} cached, {len([m for m in messages if m['body'].get('type') == 'RESUME'])} new)")
97+
num_resume = len(resume_messages)
98+
num_cached = len([m for m in self._cached_messages if m["body"].get("type") == "RESUME"])
99+
num_new = len([m for m in messages if m["body"].get("type") == "RESUME"])
100+
LOGGER.info(f"Found {num_resume} RESUME messages ({num_cached} cached, {num_new} new)")
99101

100102
self._cached_messages.clear()
101103

servers/fai/src/fai/routes/slack.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
)
6565
from fai.utils.slack.response_qa import log_message_for_qa
6666

67-
MESSAGE_CACHE_TTL = 30
67+
MESSAGE_CACHE_TTL = 600
6868

6969

7070
async def cleanup_message_cache() -> None:

servers/fai/src/fai/utils/slack/edit_handler.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
VARIABLES,
1919
)
2020
from fai.utils.github_utils import get_repo_from_docs_domain
21+
from fai.utils.sqs_utils import (
22+
ResumeMessage,
23+
send_message_to_queue,
24+
)
2125

2226

2327
async def get_or_create_editing_session_for_thread(team_id: str, channel_id: str, thread_ts: str) -> str | None:
@@ -112,11 +116,38 @@ async def interrupt_editing_session(editing_id: str) -> bool:
112116

113117

114118
async def send_resume_message(editing_id: str, prompts: list[str]) -> bool:
115-
# TODO: Implement direct SQS sending from FAI
116-
# For now, the Lambda will resume immediately after interruption is detected
117-
LOGGER.info(f"Prepared {len(prompts)} prompts for resume (session {editing_id})")
118-
LOGGER.debug(f"Prompts: {prompts}")
119-
return True
119+
try:
120+
async with httpx.AsyncClient(timeout=10.0) as client:
121+
response = await client.get(f"{CONFIG.FAI_SERVER_URL}/editing-sessions/{editing_id}")
122+
123+
if response.status_code != 200:
124+
LOGGER.error(f"Failed to fetch session for RESUME message: {response.status_code}")
125+
return False
126+
127+
session_data = response.json()
128+
queue_url = session_data["editing_session"].get("queue_url")
129+
130+
if not queue_url:
131+
LOGGER.error(f"No queue_url found for session {editing_id}, cannot send RESUME message")
132+
return False
133+
134+
resume_msg = ResumeMessage(
135+
editing_id=editing_id,
136+
prompts=prompts,
137+
timestamp=datetime.now(UTC).isoformat(),
138+
)
139+
140+
success = await send_message_to_queue(queue_url, resume_msg)
141+
if success:
142+
LOGGER.info(f"Sent RESUME message with {len(prompts)} prompts to queue for session {editing_id}")
143+
else:
144+
LOGGER.error(f"Failed to send RESUME message to queue for session {editing_id}")
145+
146+
return success
147+
148+
except Exception as e:
149+
LOGGER.error(f"Error sending RESUME message for session {editing_id}: {e}", exc_info=True)
150+
return False
120151

121152

122153
async def create_editing_session(repository: str, base_branch: str = "main") -> str | None:

servers/fai/src/fai/utils/slack/message_handler.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,11 @@ async def handle_slack_message(
384384
LOGGER.info(f"Existing session {existing_editing_id} status: {session_status}")
385385

386386
if session_status and session_status.value == "interrupted":
387-
LOGGER.info(f"Session {existing_editing_id} is INTERRUPTED, message already queued")
388-
info_text = "⏳ Previous edit is being interrupted, your request will be processed next."
387+
LOGGER.info(f"Session {existing_editing_id} is INTERRUPTED, queuing new request via RESUME message")
388+
389+
await send_resume_message(existing_editing_id, [context.text])
390+
391+
info_text = "⏳ Previous edit is being interrupted, your request has been queued."
389392
return SlackMessageResponse(
390393
response_text=info_text,
391394
channel=context.channel,
@@ -396,13 +399,11 @@ async def handle_slack_message(
396399
)
397400

398401
if session_status and session_status.value in ["startup", "active", "waiting"]:
399-
if session_status.value in ["startup", "active"]:
400-
LOGGER.info(
401-
f"Interrupting {session_status.value} session {existing_editing_id} for new edit request"
402-
)
402+
if session_status.value == "active":
403+
LOGGER.info(f"Interrupting active session {existing_editing_id} for new edit request")
403404
interrupt_success = await interrupt_editing_session(existing_editing_id)
404405

405-
if not interrupt_success and session_status.value != "waiting":
406+
if not interrupt_success:
406407
LOGGER.error(f"Failed to interrupt session {existing_editing_id}")
407408
error_text = "❌ Failed to interrupt the previous edit session. Please try again."
408409
return SlackMessageResponse(
@@ -415,24 +416,83 @@ async def handle_slack_message(
415416
)
416417

417418
if integration.slack_bot_token:
418-
if session_status.value == "startup":
419-
interruption_msg = "⚠️ Queued interrupt for starting session - will apply when ready"
420-
else:
421-
interruption_msg = "⚠️ Interrupted previous edit session to process new request"
422419
try:
423420
await send_slack_message(
424421
channel=context.channel,
425-
text=interruption_msg,
422+
text="⚠️ Interrupted previous edit session to process new request",
426423
bot_token=integration.slack_bot_token,
427424
thread_ts=context.thread_ts,
428425
)
429426
LOGGER.info(f"Posted interruption message to thread {context.thread_ts}")
430427
except Exception as e:
431428
LOGGER.error(f"Failed to post interruption message: {e}", exc_info=True)
432-
else:
433-
LOGGER.info(f"Session {existing_editing_id} is WAITING and ready to resume")
434429

435-
await send_resume_message(existing_editing_id, [context.text])
430+
await send_resume_message(existing_editing_id, [context.text])
431+
432+
elif session_status.value == "startup":
433+
LOGGER.info(f"Interrupting STARTUP session {existing_editing_id} for new edit request")
434+
interrupt_success = await interrupt_editing_session(existing_editing_id)
435+
436+
if not interrupt_success:
437+
LOGGER.error(f"Failed to interrupt STARTUP session {existing_editing_id}")
438+
error_text = "❌ Failed to interrupt the previous edit session. Please try again."
439+
return SlackMessageResponse(
440+
response_text=error_text,
441+
channel=context.channel,
442+
thread_ts=context.thread_ts,
443+
bot_token=integration.slack_bot_token,
444+
query_id=None,
445+
user_id=context.user,
446+
)
447+
448+
if integration.slack_bot_token:
449+
try:
450+
await send_slack_message(
451+
channel=context.channel,
452+
text="⚠️ Interrupted previous edit session to process new request",
453+
bot_token=integration.slack_bot_token,
454+
thread_ts=context.thread_ts,
455+
)
456+
LOGGER.info(f"Posted interruption message to thread {context.thread_ts}")
457+
except Exception as e:
458+
LOGGER.error(f"Failed to post interruption message: {e}", exc_info=True)
459+
460+
await send_resume_message(existing_editing_id, [context.text])
461+
462+
return SlackMessageResponse(
463+
response_text="",
464+
channel=context.channel,
465+
thread_ts=context.thread_ts,
466+
bot_token=integration.slack_bot_token,
467+
query_id=None,
468+
user_id=context.user,
469+
)
470+
471+
elif session_status.value == "waiting":
472+
LOGGER.info(f"Session {existing_editing_id} is WAITING, " f"queuing request without interrupting")
473+
474+
await send_resume_message(existing_editing_id, [context.text])
475+
476+
if integration.slack_bot_token:
477+
try:
478+
await send_slack_message(
479+
channel=context.channel,
480+
text="⏳ Previous edit is waiting - your request has been queued",
481+
bot_token=integration.slack_bot_token,
482+
thread_ts=context.thread_ts,
483+
)
484+
LOGGER.info(f"Posted queued message to thread {context.thread_ts}")
485+
except Exception as e:
486+
LOGGER.error(f"Failed to post queued message: {e}", exc_info=True)
487+
488+
return SlackMessageResponse(
489+
response_text="",
490+
channel=context.channel,
491+
thread_ts=context.thread_ts,
492+
bot_token=integration.slack_bot_token,
493+
query_id=None,
494+
user_id=context.user,
495+
)
436496

437497
else:
438498
status_str = session_status.value if session_status else "unknown"

0 commit comments

Comments
 (0)