Skip to content

Commit 51e6f66

Browse files
authored
feat: use fai-code-indexing callback to store sessionId at the domain level (#4638)
1 parent 608a4d4 commit 51e6f66

18 files changed

+504
-443
lines changed

fern/apis/fai/openapi.json

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3165,6 +3165,117 @@
31653165
"internal"
31663166
]
31673167
}
3168+
},
3169+
"/sources/github/{domain}/index": {
3170+
"post": {
3171+
"tags": [
3172+
"Sources"
3173+
],
3174+
"summary": "Index Github Source Repos",
3175+
"description": "Start indexing a GitHub repository for a domain.",
3176+
"operationId": "index_github_source_repos",
3177+
"parameters": [
3178+
{
3179+
"name": "domain",
3180+
"in": "path",
3181+
"required": true,
3182+
"schema": {
3183+
"type": "string",
3184+
"title": "Domain"
3185+
}
3186+
}
3187+
],
3188+
"requestBody": {
3189+
"required": true,
3190+
"content": {
3191+
"application/json": {
3192+
"schema": {
3193+
"$ref": "#/components/schemas/IndexGithubRequest"
3194+
}
3195+
}
3196+
}
3197+
},
3198+
"responses": {
3199+
"200": {
3200+
"description": "Successful Response",
3201+
"content": {
3202+
"application/json": {
3203+
"schema": {
3204+
"$ref": "#/components/schemas/IndexGithubResponse"
3205+
}
3206+
}
3207+
}
3208+
},
3209+
"422": {
3210+
"description": "Validation Error",
3211+
"content": {
3212+
"application/json": {
3213+
"schema": {
3214+
"$ref": "#/components/schemas/HTTPValidationError"
3215+
}
3216+
}
3217+
}
3218+
}
3219+
},
3220+
"x-fern-audiences": [
3221+
"internal"
3222+
]
3223+
}
3224+
},
3225+
"/sources/github/{domain}/lambda/callback": {
3226+
"post": {
3227+
"tags": [
3228+
"Sources"
3229+
],
3230+
"summary": "Indexing Callback",
3231+
"operationId": "indexing_callback",
3232+
"parameters": [
3233+
{
3234+
"name": "domain",
3235+
"in": "path",
3236+
"required": true,
3237+
"schema": {
3238+
"type": "string",
3239+
"title": "Domain"
3240+
}
3241+
}
3242+
],
3243+
"requestBody": {
3244+
"required": true,
3245+
"content": {
3246+
"application/json": {
3247+
"schema": {
3248+
"$ref": "#/components/schemas/IndexingCallbackRequest"
3249+
}
3250+
}
3251+
}
3252+
},
3253+
"responses": {
3254+
"200": {
3255+
"description": "Successful Response",
3256+
"content": {
3257+
"application/json": {
3258+
"schema": {
3259+
"$ref": "#/components/schemas/IndexingCallbackResponse"
3260+
}
3261+
}
3262+
}
3263+
},
3264+
"422": {
3265+
"description": "Validation Error",
3266+
"content": {
3267+
"application/json": {
3268+
"schema": {
3269+
"$ref": "#/components/schemas/HTTPValidationError"
3270+
}
3271+
}
3272+
}
3273+
}
3274+
},
3275+
"x-fern-audiences": [
3276+
"internal"
3277+
]
3278+
}
31683279
}
31693280
},
31703281
"components": {
@@ -4313,6 +4424,86 @@
43134424
],
43144425
"title": "HistogramAnalyticsBar"
43154426
},
4427+
"IndexGithubRequest": {
4428+
"properties": {
4429+
"repo_urls": {
4430+
"items": {
4431+
"type": "string"
4432+
},
4433+
"type": "array",
4434+
"title": "Repo Urls",
4435+
"description": "GitHub repository URLs to index"
4436+
}
4437+
},
4438+
"type": "object",
4439+
"required": [
4440+
"repo_urls"
4441+
],
4442+
"title": "IndexGithubRequest"
4443+
},
4444+
"IndexGithubResponse": {
4445+
"properties": {
4446+
"job_id": {
4447+
"type": "string",
4448+
"title": "Job Id",
4449+
"description": "Job ID for tracking indexing progress"
4450+
},
4451+
"repo_urls": {
4452+
"items": {
4453+
"type": "string"
4454+
},
4455+
"type": "array",
4456+
"title": "Repo Urls",
4457+
"description": "GitHub repository URLs being indexed"
4458+
}
4459+
},
4460+
"type": "object",
4461+
"required": [
4462+
"job_id",
4463+
"repo_urls"
4464+
],
4465+
"title": "IndexGithubResponse"
4466+
},
4467+
"IndexingCallbackRequest": {
4468+
"properties": {
4469+
"session_id": {
4470+
"type": "string",
4471+
"title": "Session Id",
4472+
"description": "Session ID from the code indexing operation"
4473+
},
4474+
"status": {
4475+
"type": "string",
4476+
"title": "Status",
4477+
"description": "Status of indexing operation: 'success' or 'failed'"
4478+
}
4479+
},
4480+
"type": "object",
4481+
"required": [
4482+
"session_id",
4483+
"status"
4484+
],
4485+
"title": "IndexingCallbackRequest"
4486+
},
4487+
"IndexingCallbackResponse": {
4488+
"properties": {
4489+
"status": {
4490+
"type": "string",
4491+
"title": "Status",
4492+
"description": "Callback processing status"
4493+
},
4494+
"status_code": {
4495+
"type": "integer",
4496+
"title": "Status Code",
4497+
"description": "HTTP status code"
4498+
}
4499+
},
4500+
"type": "object",
4501+
"required": [
4502+
"status",
4503+
"status_code"
4504+
],
4505+
"title": "IndexingCallbackResponse"
4506+
},
43164507
"InsightExample": {
43174508
"properties": {
43184509
"query": {

servers/fai-lambda/fai-code-indexing/src/handler.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import logging
4+
import os
45
from datetime import (
56
UTC,
67
datetime,
@@ -10,17 +11,24 @@
1011
Literal,
1112
)
1213

14+
import httpx
1315
from shared.utils.validation import validate_body_param_or_throw
1416

1517
from .operations import (
1618
run_code_search_tool_call,
1719
setup_repos_for_domain,
1820
)
21+
from .operations.execute_command import call_shell_command
1922

2023
logger = logging.getLogger()
2124
logger.setLevel(logging.INFO)
2225

2326

27+
async def _post_callback(url: str, data: dict[str, Any]) -> None:
28+
async with httpx.AsyncClient() as client:
29+
await client.post(url, json=data, timeout=30.0)
30+
31+
2432
def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
2533
"""Lambda handler for code indexing."""
2634
logger.info(f"Event: {json.dumps(event)}")
@@ -29,28 +37,70 @@ def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
2937
try:
3038
body = json.loads(event.get("body", "{}"))
3139
domain = validate_body_param_or_throw(body, "domain")
32-
event_type: Literal["codeSearch", "indexRepo"] = validate_body_param_or_throw(body, "eventType")
40+
event_type: Literal["codeSearch", "indexRepo", "executeCommand"] = validate_body_param_or_throw(
41+
body, "eventType"
42+
)
43+
44+
if event_type == "executeCommand":
45+
command = validate_body_param_or_throw(body, "command")
46+
working_dir = os.environ.get("HOME", "/mnt/efs")
47+
48+
try:
49+
execute_command_result = asyncio.run(call_shell_command(command, working_dir))
50+
51+
response_body = {
52+
"message": "Command executed",
53+
"command": command,
54+
"stdout": execute_command_result["stdout"],
55+
"stderr": execute_command_result["stderr"],
56+
"returncode": execute_command_result["returncode"],
57+
"timestamp": datetime.now(UTC).isoformat(),
58+
"requestId": context.aws_request_id,
59+
}
60+
61+
except Exception as cmd_error:
62+
response_body = {
63+
"message": "Command execution failed",
64+
"command": command,
65+
"error": str(cmd_error),
66+
"timestamp": datetime.now(UTC).isoformat(),
67+
"requestId": context.aws_request_id,
68+
}
3369

34-
if event_type == "indexRepo":
70+
elif event_type == "indexRepo":
3571
repo_urls = validate_body_param_or_throw(body, "repoUrls", list[str])
36-
asyncio.run(setup_repos_for_domain(domain=domain, repo_urls=repo_urls))
72+
callback_url = validate_body_param_or_throw(body, "callbackUrl")
73+
setup_result = asyncio.run(setup_repos_for_domain(domain=domain, repo_urls=repo_urls))
74+
75+
try:
76+
callback_data = {
77+
"session_id": setup_result.session_id,
78+
"status": setup_result.status,
79+
}
80+
asyncio.run(_post_callback(callback_url, callback_data))
81+
logger.info(f"Successfully posted callback to {callback_url}")
82+
except Exception as callback_error:
83+
logger.error(f"Failed to post callback: {str(callback_error)}", exc_info=True)
3784

3885
response_body = {
3986
"message": f"Successfully indexed {len(repo_urls)} repositories",
4087
"timestamp": datetime.now(UTC).isoformat(),
4188
"requestId": context.aws_request_id,
89+
"session_id": setup_result.session_id,
4290
}
4391

4492
elif event_type == "codeSearch":
4593
question = validate_body_param_or_throw(body, "question")
4694
session_id = body.get("sessionId")
47-
result = asyncio.run(run_code_search_tool_call(domain=domain, question=question, session_id=session_id))
95+
code_search_result = asyncio.run(
96+
run_code_search_tool_call(domain=domain, question=question, session_id=session_id)
97+
)
4898

4999
response_body = {
50100
"message": "Code search completed successfully",
51101
"timestamp": datetime.now(UTC).isoformat(),
52102
"requestId": context.aws_request_id,
53-
"answer": result.get("answer", None),
103+
"answer": code_search_result.get("answer", None),
54104
}
55105

56106
return {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
import shlex
3+
from typing import Any
4+
5+
6+
async def call_shell_command(command: str, working_dir: str) -> dict[str, Any]:
7+
"""Execute a shell command asynchronously."""
8+
try:
9+
args = shlex.split(command)
10+
proc = await asyncio.create_subprocess_exec(
11+
*args,
12+
cwd=working_dir,
13+
stdout=asyncio.subprocess.PIPE,
14+
stderr=asyncio.subprocess.PIPE,
15+
)
16+
17+
stdout, stderr = await proc.communicate()
18+
return {
19+
"stdout": stdout.decode() if stdout else "",
20+
"stderr": stderr.decode() if stderr else "",
21+
"returncode": proc.returncode,
22+
}
23+
except Exception as e:
24+
raise RuntimeError(f"Failed to execute command: {e}")

servers/fai-lambda/fai-code-indexing/src/operations/search.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
Question: {question}
3030
"""
3131

32+
3233
async def run_code_search_tool_call(domain: str, question: str, session_id: str | None = None) -> dict[str, Any]:
3334
"""Run a code search tool call for a domain.
3435
@@ -48,26 +49,22 @@ async def run_code_search_tool_call(domain: str, question: str, session_id: str
4849
async for message in query(
4950
prompt=CLAUDE_ANSWER_CODE_QUESTION_USER_PROMPT.format(question=question),
5051
options=ClaudeAgentOptions(
51-
cwd=str(domain_folder),
52-
disallowed_tools=["Write", "Delete", "Rename"],
53-
resume=session_id,
54-
fork_session=True
55-
)
52+
cwd=str(domain_folder), disallowed_tools=["Write", "Delete", "Rename"], resume=session_id, fork_session=True
53+
),
5654
):
57-
if hasattr(message, 'subtype') and message.subtype == 'init':
58-
session_id = message.data.get('session_id')
59-
print(f"Session started with ID: {session_id}")
55+
if hasattr(message, "subtype") and message.subtype == "init":
56+
session_id = message.data.get("session_id")
57+
logger.info(f"Session started with ID: {session_id}")
6058

6159
if isinstance(message, AssistantMessage):
6260
for content in message.content:
6361
if isinstance(content, ToolUseBlock):
64-
print(f"Tool used: {content.name}")
65-
print(f"Tool input: {content.input}")
62+
logger.info(f"Tool used: {content.name}")
63+
logger.info(f"Tool input: {content.input}")
6664
if isinstance(content, TextBlock):
67-
print(f"Text: {content.text}")
65+
logger.info(f"Text: {content.text}")
6866
answer = content.text
6967

70-
7168
return {
7269
"domain": domain,
7370
"status": "success",

0 commit comments

Comments
 (0)