Skip to content

Commit 30d9244

Browse files
authored
feat: batch index/clone example repos (#4628)
1 parent 34e0bcd commit 30d9244

File tree

8 files changed

+163
-95
lines changed

8 files changed

+163
-95
lines changed

servers/fai-lambda/fai-code-indexing/src/handler.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from .operations import (
1616
run_code_search_tool_call,
17-
setup_repo_for_domain,
17+
setup_repos_for_domain,
1818
)
1919

2020
logger = logging.getLogger()
@@ -32,9 +32,10 @@ def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
3232
event_type: Literal["codeSearch", "indexRepo"] = validate_body_param_or_throw(body, "eventType")
3333

3434
if event_type == "indexRepo":
35-
repo_url = validate_body_param_or_throw(body, "repoUrl")
36-
asyncio.run(setup_repo_for_domain(domain=domain, repo_url=repo_url))
37-
message = "Repository indexed successfully"
35+
repo_urls = validate_body_param_or_throw(body, "repoUrls", list[str])
36+
asyncio.run(setup_repos_for_domain(domain=domain, repo_urls=repo_urls))
37+
38+
message = f"Successfully indexed {len(repo_urls)} repositories"
3839
elif event_type == "codeSearch":
3940
asyncio.run(run_code_search_tool_call(domain=domain))
4041
message = "Code search completed successfully"
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .analysis import analyze_repositories_for_domain
2-
from .indexing import setup_repo_for_domain
2+
from .indexing import setup_repos_for_domain
33
from .search import run_code_search_tool_call
44

5-
__all__ = ["analyze_repositories_for_domain", "setup_repo_for_domain", "run_code_search_tool_call"]
5+
__all__ = ["analyze_repositories_for_domain", "setup_repos_for_domain", "run_code_search_tool_call"]

servers/fai-lambda/fai-code-indexing/src/operations/indexing.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@
77
logger = logging.getLogger()
88

99

10-
async def setup_repo_for_domain(domain: str, repo_url: str) -> SetupRepoResult:
11-
"""Set up a repository for a domain by cloning and indexing it.
10+
async def setup_repos_for_domain(domain: str, repo_urls: list[str]) -> SetupRepoResult:
11+
"""Set up repositories for a domain by cloning and indexing them.
1212
1313
Args:
14-
domain: The domain to associate the repository with
15-
repo_url: The GitHub repository URL to clone and index
14+
domain: The domain to associate the repositories with
15+
repo_urls: List of GitHub repository URLs to clone and index
1616
1717
Returns:
1818
Dictionary with setup results
1919
"""
20-
logger.info(f"Setting up repository for domain: {domain}, repo: {repo_url}")
20+
logger.info(f"Setting up repositories for domain: {domain}, repos: {repo_urls}")
2121

22-
repo_path = clone_repo_to_domain(domain=domain, repo_url=repo_url)
23-
logger.info(f"Repository cloned to: {repo_path}")
22+
for repo_url in repo_urls:
23+
await clone_repo_to_domain(domain=domain, repo_url=repo_url)
2424

2525
analysis_result = await analyze_repositories_for_domain(domain=domain)
2626
logger.info(f"Analysis completed for domain {domain} repositories")
Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1+
import asyncio
12
import logging
23
import os
3-
import subprocess
44
from pathlib import Path
55

66
logger = logging.getLogger()
77

88

9-
def clone_repo_to_domain(domain: str, repo_url: str) -> str:
9+
async def clone_repo_to_domain(domain: str, repo_url: str) -> str:
1010
"""Clone a GitHub repository into EFS under a domain folder.
1111
1212
Args:
@@ -33,40 +33,69 @@ def clone_repo_to_domain(domain: str, repo_url: str) -> str:
3333
if repo_path.exists():
3434
logger.info(f"Repository already exists at {repo_path}, pulling latest changes")
3535
try:
36-
subprocess.run(
37-
["git", "config", "--global", "--add", "safe.directory", str(repo_path)],
38-
capture_output=True,
39-
text=True,
36+
config_process = await asyncio.create_subprocess_exec(
37+
"git",
38+
"config",
39+
"--global",
40+
"--add",
41+
"safe.directory",
42+
str(repo_path),
43+
stdout=asyncio.subprocess.PIPE,
44+
stderr=asyncio.subprocess.PIPE,
4045
)
41-
subprocess.run(
42-
["git", "-C", str(repo_path), "fetch", "origin"],
43-
check=True,
44-
capture_output=True,
45-
text=True,
46+
_, _ = await config_process.communicate()
47+
48+
process = await asyncio.create_subprocess_exec(
49+
"git",
50+
"-C",
51+
str(repo_path),
52+
"fetch",
53+
"origin",
54+
stdout=asyncio.subprocess.PIPE,
55+
stderr=asyncio.subprocess.PIPE,
4656
)
47-
subprocess.run(
48-
["git", "-C", str(repo_path), "pull", "origin"],
49-
check=True,
50-
capture_output=True,
51-
text=True,
57+
_, stderr = await process.communicate()
58+
if process.returncode != 0:
59+
raise RuntimeError(f"Failed to fetch: {stderr.decode()}")
60+
61+
process = await asyncio.create_subprocess_exec(
62+
"git",
63+
"-C",
64+
str(repo_path),
65+
"pull",
66+
"origin",
67+
stdout=asyncio.subprocess.PIPE,
68+
stderr=asyncio.subprocess.PIPE,
5269
)
70+
_, stderr = await process.communicate()
71+
if process.returncode != 0:
72+
raise RuntimeError(f"Failed to pull: {stderr.decode()}")
73+
5374
logger.info(f"Successfully pulled latest changes at {repo_path}")
54-
except subprocess.CalledProcessError as e:
55-
logger.error(f"Failed to pull repository: {e.stderr}")
56-
raise RuntimeError(f"Failed to pull latest changes: {e.stderr}")
75+
except Exception as e:
76+
logger.error(f"Failed to pull repository: {e}")
77+
raise RuntimeError(f"Failed to pull latest changes: {e}")
5778
else:
5879
clone_url = f"https://x-access-token:{github_token}@github.com/{repo_identifier}.git"
5980

6081
logger.info(f"Cloning {repo_identifier} into {repo_path} (shallow)")
6182
try:
62-
subprocess.run(
63-
["git", "clone", "--depth", "1", clone_url, str(repo_path)],
64-
check=True,
65-
capture_output=True,
66-
text=True,
83+
process = await asyncio.create_subprocess_exec(
84+
"git",
85+
"clone",
86+
"--depth",
87+
"1",
88+
clone_url,
89+
str(repo_path),
90+
stdout=asyncio.subprocess.PIPE,
91+
stderr=asyncio.subprocess.PIPE,
6792
)
68-
except subprocess.CalledProcessError as e:
69-
logger.error(f"Failed to clone repository: {e.stderr}")
70-
raise RuntimeError(f"Failed to clone {repo_identifier}: {e.stderr}")
93+
_, stderr = await process.communicate()
94+
if process.returncode != 0:
95+
raise RuntimeError(f"Failed to clone {repo_identifier}: {stderr.decode()}")
96+
logger.info(f"Repository cloned to: {repo_path}")
97+
except Exception as e:
98+
logger.error(f"Failed to clone repository: {e}")
99+
raise RuntimeError(f"Failed to clone {repo_identifier}: {e}")
71100

72101
return str(repo_path)
Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,20 @@
1-
from typing import Any
1+
from typing import Any, Type, TypeVar, get_origin
22

3+
T = TypeVar("T")
34

4-
def validate_body_param_or_throw(body: dict[str, Any], param_name: str) -> str:
5+
6+
def validate_body_param_or_throw(body: dict[str, Any], param_name: str, expected_type: Type[T] = str) -> T:
57
value = body.get(param_name)
6-
if not isinstance(value, str) or not value:
8+
9+
if value is None:
710
raise ValueError(f"Missing required request body field: {param_name}")
11+
12+
origin = get_origin(expected_type)
13+
if origin is not None:
14+
if not isinstance(value, origin):
15+
raise ValueError(f"Field '{param_name}' must be of type {expected_type}, got {type(value).__name__}")
16+
else:
17+
if not isinstance(value, expected_type):
18+
raise ValueError(f"Field '{param_name}' must be of type {expected_type.__name__}, got {type(value).__name__}")
19+
820
return value

servers/fai/src/fai/models/api/github_source_api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88

99
class IndexGithubRequest(BaseModel):
10-
repo_url: str = Field(description="GitHub repository URL to index")
10+
repo_urls: list[str] = Field(description="GitHub repository URLs to index")
1111

1212

1313
class IndexGithubResponse(BaseModel):
1414
job_id: str = Field(description="Job ID for tracking indexing progress")
15-
repo_url: str = Field(description="GitHub repository URL being indexed")
15+
repo_urls: list[str] = Field(description="GitHub repository URLs being indexed")
1616

1717

1818
class GithubIndexStatusResponse(BaseModel):
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from . import github
2+
3+
__all__ = ["github"]
Lines changed: 73 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1-
from fastapi import Depends
2-
from fastapi.responses import JSONResponse
1+
import json
2+
import logging
3+
import uuid
4+
from datetime import (
5+
UTC,
6+
datetime,
7+
)
8+
9+
import aioboto3
10+
from botocore.exceptions import ClientError
11+
from fastapi import (
12+
Depends,
13+
HTTPException,
14+
)
315
from sqlalchemy.ext.asyncio import AsyncSession
416

517
from fai.app import fai_app
@@ -9,12 +21,18 @@
921
verify_token,
1022
)
1123
from fai.models.api.github_source_api import (
12-
GithubIndexStatusResponse,
1324
IndexGithubRequest,
1425
IndexGithubResponse,
15-
ReindexGithubRequest,
16-
ReindexGithubResponse,
1726
)
27+
from fai.models.db.index_source_db import (
28+
IndexSourceDb,
29+
IndexSourceStatus,
30+
SourceType,
31+
)
32+
33+
logger = logging.getLogger(__name__)
34+
35+
LAMBDA_FUNCTION_NAME = "fai-code-indexing-dev2"
1836

1937

2038
@fai_app.post(
@@ -23,60 +41,65 @@
2341
dependencies=[Depends(verify_token)],
2442
openapi_extra={"x-fern-audiences": ["internal"]},
2543
)
26-
async def index_github_source(
44+
async def index_github_source_repos(
2745
domain: str,
2846
request: IndexGithubRequest,
2947
db: AsyncSession = Depends(get_db),
30-
) -> JSONResponse:
48+
) -> IndexGithubResponse:
3149
"""Start indexing a GitHub repository for a domain."""
32-
strip_domain(domain)
50+
stripped_domain = strip_domain(domain)
3351

34-
# TODO: Implement GitHub indexing logic
35-
# - Create IndexSourceDb record
36-
# - Start background indexing job
37-
# - Return job_id and repo_url
52+
job_id = str(uuid.uuid4())
3853

39-
raise NotImplementedError("GitHub indexing not yet implemented")
54+
now = datetime.now(UTC)
55+
for repo_url in request.repo_urls:
56+
index_source = IndexSourceDb(
57+
id=str(uuid.uuid4()),
58+
domain=stripped_domain,
59+
source_type=SourceType.GITHUB,
60+
source_identifier=repo_url,
61+
config={},
62+
job_id=job_id,
63+
status=IndexSourceStatus.INDEXING,
64+
metrics={},
65+
created_at=now,
66+
updated_at=now,
67+
)
68+
db.add(index_source)
4069

70+
await db.commit()
4171

42-
@fai_app.get(
43-
"/sources/github/{domain}/status",
44-
response_model=GithubIndexStatusResponse,
45-
dependencies=[Depends(verify_token)],
46-
openapi_extra={"x-fern-audiences": ["internal"]},
47-
)
48-
async def get_github_index_status(
49-
domain: str,
50-
db: AsyncSession = Depends(get_db),
51-
) -> JSONResponse:
52-
"""Get the indexing status for a GitHub repository."""
53-
strip_domain(domain)
54-
55-
# TODO: Implement status check logic
56-
# - Query IndexSourceDb for the domain
57-
# - Return current status and metrics
58-
59-
raise NotImplementedError("GitHub status check not yet implemented")
72+
try:
73+
session = aioboto3.Session()
74+
async with session.client("lambda") as lambda_client:
75+
payload = {
76+
"domain": stripped_domain,
77+
"eventType": "indexRepo",
78+
"repoUrls": request.repo_urls,
79+
}
6080

81+
response = await lambda_client.invoke(
82+
FunctionName=LAMBDA_FUNCTION_NAME,
83+
InvocationType="Event",
84+
Payload=json.dumps(payload),
85+
)
6186

62-
@fai_app.post(
63-
"/sources/github/{domain}/reindex",
64-
response_model=ReindexGithubResponse,
65-
dependencies=[Depends(verify_token)],
66-
openapi_extra={"x-fern-audiences": ["internal"]},
67-
)
68-
async def reindex_github_source(
69-
domain: str,
70-
request: ReindexGithubRequest,
71-
db: AsyncSession = Depends(get_db),
72-
) -> JSONResponse:
73-
"""Delete existing index and start a new indexing job for a GitHub repository."""
74-
strip_domain(domain)
87+
logger.info(
88+
f"Successfully invoked code indexing Lambda. "
89+
f"StatusCode: {response.get('StatusCode')}, "
90+
f"Domain: {stripped_domain}, "
91+
f"RepoUrls: {request.repo_urls}, "
92+
f"JobId: {job_id}"
93+
)
7594

76-
# TODO: Implement reindexing logic
77-
# - Delete old IndexSourceDb records and indexed content
78-
# - Create new IndexSourceDb record
79-
# - Start new background indexing job
80-
# - Return new job_id
95+
except ClientError as e:
96+
logger.error(
97+
f"Failed to invoke code indexing Lambda: {e.response['Error']['Code']} - {e.response['Error']['Message']}",
98+
exc_info=True,
99+
)
100+
raise HTTPException(status_code=500, detail="Failed to start indexing job")
101+
except Exception as e:
102+
logger.error(f"Unexpected error invoking code indexing Lambda: {str(e)}", exc_info=True)
103+
raise HTTPException(status_code=500, detail="Failed to start indexing job")
81104

82-
raise NotImplementedError("GitHub reindexing not yet implemented")
105+
return IndexGithubResponse(job_id=job_id, repo_urls=request.repo_urls)

0 commit comments

Comments
 (0)