Skip to content

Commit a8d59ce

Browse files
committed
implement resume request batching
1 parent 3da4386 commit a8d59ce

File tree

4 files changed

+45
-12
lines changed

4 files changed

+45
-12
lines changed

servers/fai-lambda-deploy/scripts/fai-scribe-stack.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ export class FaiScribeStack extends Stack {
126126
filesystem: lambda.FileSystem.fromEfsAccessPoint(accessPoint, "/mnt/efs")
127127
});
128128

129-
// Grant SQS permissions for editing session queues
130-
// These queues are created dynamically by the FAI server per editing session
131129
lambdaFunction.addToRolePolicy(
132130
new iam.PolicyStatement({
133131
effect: iam.Effect.ALLOW,
@@ -138,10 +136,7 @@ export class FaiScribeStack extends Stack {
138136
"sqs:GetQueueUrl",
139137
"sqs:ChangeMessageVisibility"
140138
],
141-
resources: [
142-
// Allow access to all editing-session queues in this region
143-
`arn:aws:sqs:${this.region}:${this.account}:editing-session-*.fifo`
144-
]
139+
resources: [`arn:aws:sqs:${this.region}:${this.account}:editing-session-*.fifo`]
145140
})
146141
);
147142

servers/fai-lambda/fai-scribe/src/handler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ async def handle_editing_request(
109109
else:
110110
LOGGER.info(f"Successfully updated editing session: {editing_id}")
111111

112-
# Clean up the SQS queue after successful completion
113112
LOGGER.info(f"Cleaning up SQS queue for session: {editing_id}")
114113
cleanup_response = await client.delete(f"{SETTINGS.FAI_API_URL}/editing-sessions/{editing_id}/queue")
115114
if cleanup_response.status_code == 200:

servers/fai-lambda/fai-scribe/src/utils/agent.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,49 @@ async def run_editing_session(
186186
if session_id is None:
187187
raise RuntimeError("Failed to obtain session ID from Claude")
188188

189+
LOGGER.info(f"Checking for RESUME messages in queue for session {editing_id}")
190+
resume_messages = sqs_client.get_resume_messages()
191+
192+
if resume_messages:
193+
LOGGER.info(f"Found {len(resume_messages)} RESUME messages, batching prompts")
194+
195+
all_prompts = []
196+
for resume_msg in resume_messages:
197+
body = resume_msg["body"]
198+
prompts = body.get("prompts", [])
199+
if prompts:
200+
all_prompts.extend(prompts)
201+
sqs_client.delete_message(resume_msg["receipt_handle"])
202+
203+
if all_prompts:
204+
batched_prompt = "\n\n".join([f"Request {i+1}: {p}" for i, p in enumerate(all_prompts)])
205+
LOGGER.info(f"Processing {len(all_prompts)} batched prompts: {batched_prompt[:200]}...")
206+
207+
async with ClaudeSDKClient(options=options) as client:
208+
await client.query(batched_prompt)
209+
210+
async for msg in client.receive_response():
211+
is_interrupted, receipt_handle = check_if_interrupted_via_sqs(sqs_client)
212+
if is_interrupted:
213+
LOGGER.warning(f"Session interrupted while processing batched prompts: {editing_id}")
214+
if receipt_handle:
215+
sqs_client.delete_message(receipt_handle)
216+
await update_session_status(editing_id, "waiting")
217+
raise SessionInterruptedError(f"Editing session {editing_id} was interrupted")
218+
219+
if isinstance(msg, AssistantMessage):
220+
for block in msg.content:
221+
if isinstance(block, TextBlock):
222+
for line in block.text.split("\n"):
223+
if "PR_URL:" in line:
224+
extracted_text = line.split("PR_URL:", 1)[1].strip()
225+
if extracted_text:
226+
match = GITHUB_PR_URL_PATTERN.search(extracted_text)
227+
if match:
228+
pr_url = match.group(0)
229+
LOGGER.info(f"Updated PR URL: {pr_url}")
230+
await update_session_metadata(editing_id, pr_url=pr_url)
231+
189232
await update_session_status(editing_id, "waiting")
190233

191234
return session_id, pr_url

servers/fai/deploy/src/deploy-stack.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ export class FernAiDeployStack extends Stack {
159159
);
160160
}
161161

162-
// Grant SQS permissions for creating and managing editing session queues
163162
fargateService.taskDefinition.taskRole.addToPrincipalPolicy(
164163
new iam.PolicyStatement({
165164
effect: iam.Effect.ALLOW,
@@ -172,10 +171,7 @@ export class FernAiDeployStack extends Stack {
172171
"sqs:SetQueueAttributes",
173172
"sqs:TagQueue"
174173
],
175-
resources: [
176-
// Allow creating and managing editing-session queues
177-
`arn:aws:sqs:us-east-1:985111089818:editing-session-*.fifo`
178-
]
174+
resources: [`arn:aws:sqs:us-east-1:985111089818:editing-session-*.fifo`]
179175
})
180176
);
181177

0 commit comments

Comments
 (0)