Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 285 additions & 2 deletions api/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
Expand All @@ -22,10 +23,49 @@
else:
logger.warning("GOOGLE_API_KEY not found in environment variables")

# Initialize FastAPI app

# --- Lifespan Context Manager ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for FastAPI application.

Handles startup and shutdown events for the sync scheduler.
"""
# Startup: Start the sync scheduler
logger.info("Starting sync scheduler...")
try:
# Import here to avoid circular imports
from api.sync_scheduler import start_scheduler, stop_scheduler

# Check if sync is enabled via environment variable
sync_enabled = os.environ.get("DEEPWIKI_SYNC_ENABLED", "true").lower() == "true"

if sync_enabled:
await start_scheduler()
logger.info("Sync scheduler started successfully")
else:
logger.info("Sync scheduler is disabled via DEEPWIKI_SYNC_ENABLED=false")
except Exception as e:
logger.error(f"Failed to start sync scheduler: {e}")

yield # Application runs here

# Shutdown: Stop the sync scheduler
logger.info("Stopping sync scheduler...")
try:
from api.sync_scheduler import stop_scheduler
await stop_scheduler()
logger.info("Sync scheduler stopped successfully")
except Exception as e:
logger.error(f"Error stopping sync scheduler: {e}")


# Initialize FastAPI app with lifespan
app = FastAPI(
title="Streaming API",
description="API for streaming chat completions"
description="API for streaming chat completions",
lifespan=lifespan
)

# Configure CORS
Expand Down Expand Up @@ -122,6 +162,10 @@ class ModelConfig(BaseModel):
defaultProvider: str = Field(..., description="ID of the default provider")

from api.config import configs
from api.sync_scheduler import (
get_scheduler, start_scheduler, stop_scheduler,
SyncStatus, SyncMetadata
)
Comment on lines +165 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are several unused imports here. start_scheduler and stop_scheduler are imported locally within the lifespan function, and SyncStatus and SyncMetadata are not used in this file. Removing them will clean up the code and avoid potential confusion.


@app.get("/models/config", response_model=ModelConfig)
async def get_model_config():
Expand Down Expand Up @@ -496,6 +540,16 @@ async def root():
"GET /api/wiki_cache - Retrieve cached wiki data",
"POST /api/wiki_cache - Store wiki data to cache"
],
"Sync": [
"GET /api/sync/status - Get sync scheduler status",
"GET /api/sync/projects - List all sync projects",
"POST /api/sync/projects - Add project for periodic sync",
"GET /api/sync/projects/{repo_type}/{owner}/{repo} - Get project sync status",
"PUT /api/sync/projects/{repo_type}/{owner}/{repo} - Update project sync settings",
"DELETE /api/sync/projects/{repo_type}/{owner}/{repo} - Remove project from sync",
"POST /api/sync/projects/{repo_type}/{owner}/{repo}/trigger - Manually trigger sync",
"GET /api/sync/projects/{repo_type}/{owner}/{repo}/check - Check for updates"
],
"LocalRepo": [
"GET /local_repo/structure - Get structure of a local repository (with path parameter)",
],
Expand Down Expand Up @@ -564,3 +618,232 @@ async def get_processed_projects():
except Exception as e:
logger.error(f"Error listing processed projects from {WIKI_CACHE_DIR}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to list processed projects from server cache.")


# --- Sync API Models ---

class SyncProjectRequest(BaseModel):
"""Request model for adding a project for periodic sync"""
repo_url: str = Field(..., description="Full URL of the repository")
owner: str = Field(..., description="Repository owner/organization")
repo: str = Field(..., description="Repository name")
repo_type: str = Field(default="github", description="Type of repository (github, gitlab, bitbucket)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better validation and API documentation, it's recommended to use typing.Literal for the repo_type field. This will ensure only valid repository types are accepted and will be reflected in the OpenAPI schema, improving robustness.

Suggested change
repo_type: str = Field(default="github", description="Type of repository (github, gitlab, bitbucket)")
repo_type: Literal["github", "gitlab", "bitbucket"] = Field(default="github", description="Type of repository (github, gitlab, bitbucket)")

sync_interval_minutes: int = Field(default=60, description="How often to sync (in minutes)")
access_token: Optional[str] = Field(default=None, description="Access token for private repositories")
enabled: bool = Field(default=True, description="Whether sync is enabled")


class SyncProjectUpdateRequest(BaseModel):
"""Request model for updating sync settings"""
sync_interval_minutes: Optional[int] = Field(default=None, description="How often to sync (in minutes)")
enabled: Optional[bool] = Field(default=None, description="Whether sync is enabled")


class SyncStatusResponse(BaseModel):
"""Response model for sync status"""
repo_url: str
owner: str
repo: str
repo_type: str
last_synced: Optional[str] = None
last_commit_hash: Optional[str] = None
sync_status: str
sync_interval_minutes: int
document_count: int
embedding_count: int
error_message: Optional[str] = None
next_sync: Optional[str] = None
enabled: bool
created_at: Optional[str] = None
updated_at: Optional[str] = None


class SyncTriggerResponse(BaseModel):
"""Response model for sync trigger"""
success: bool
skipped: Optional[bool] = None
reason: Optional[str] = None
document_count: Optional[int] = None
embedding_count: Optional[int] = None
commit_hash: Optional[str] = None
error: Optional[str] = None


class UpdateCheckResponse(BaseModel):
"""Response model for update check"""
has_updates: bool
remote_commit: Optional[str] = None
local_commit: Optional[str] = None
changed_files: List[str] = []
reason: Optional[str] = None
error: Optional[str] = None


# --- Sync API Endpoints ---

@app.post("/api/sync/projects", response_model=SyncStatusResponse)
async def add_sync_project(request: SyncProjectRequest):
"""
Add a project for periodic index synchronization.

This endpoint registers a repository for automatic periodic sync.
The scheduler will periodically check for changes and re-index when updates are detected.
"""
try:
scheduler = get_scheduler()
metadata = scheduler.add_project(
repo_url=request.repo_url,
owner=request.owner,
repo=request.repo,
repo_type=request.repo_type,
sync_interval_minutes=request.sync_interval_minutes,
access_token=request.access_token,
enabled=request.enabled
)
logger.info(f"Added project for sync: {request.owner}/{request.repo}")
return SyncStatusResponse(**metadata.to_dict())
except Exception as e:
logger.error(f"Error adding project for sync: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/sync/projects", response_model=List[SyncStatusResponse])
async def get_all_sync_projects():
"""
Get all projects registered for periodic synchronization.
"""
try:
scheduler = get_scheduler()
projects = scheduler.get_all_projects()
return [SyncStatusResponse(**p) for p in projects]
except Exception as e:
logger.error(f"Error getting sync projects: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/sync/projects/{repo_type}/{owner}/{repo}", response_model=SyncStatusResponse)
async def get_sync_project_status(repo_type: str, owner: str, repo: str):
"""
Get sync status for a specific project.
"""
try:
scheduler = get_scheduler()
status = scheduler.get_project_status(owner, repo, repo_type)
if not status:
raise HTTPException(status_code=404, detail="Project not found")
return SyncStatusResponse(**status)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting project status: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.put("/api/sync/projects/{repo_type}/{owner}/{repo}", response_model=SyncStatusResponse)
async def update_sync_project(repo_type: str, owner: str, repo: str, request: SyncProjectUpdateRequest):
"""
Update sync settings for a project.
"""
try:
scheduler = get_scheduler()
status = scheduler.update_project_settings(
owner=owner,
repo=repo,
repo_type=repo_type,
sync_interval_minutes=request.sync_interval_minutes,
enabled=request.enabled
)
if not status:
raise HTTPException(status_code=404, detail="Project not found")
return SyncStatusResponse(**status)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating project settings: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.delete("/api/sync/projects/{repo_type}/{owner}/{repo}")
async def remove_sync_project(repo_type: str, owner: str, repo: str):
"""
Remove a project from periodic synchronization.
"""
try:
scheduler = get_scheduler()
success = scheduler.remove_project(owner, repo, repo_type)
if not success:
raise HTTPException(status_code=404, detail="Project not found or could not be removed")
return {"message": f"Project {owner}/{repo} removed from sync"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error removing project: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/sync/projects/{repo_type}/{owner}/{repo}/trigger", response_model=SyncTriggerResponse)
async def trigger_sync(repo_type: str, owner: str, repo: str):
"""
Manually trigger a sync for a project.

This will force a re-index of the repository, regardless of whether changes are detected.
"""
try:
scheduler = get_scheduler()
result = await scheduler.trigger_sync(owner, repo, repo_type)
return SyncTriggerResponse(**result)
except Exception as e:
logger.error(f"Error triggering sync: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/sync/projects/{repo_type}/{owner}/{repo}/check", response_model=UpdateCheckResponse)
async def check_for_updates(repo_type: str, owner: str, repo: str):
"""
Check if a project has updates without actually syncing.

This is useful to preview what would happen during a sync.
"""
try:
scheduler = get_scheduler()
result = scheduler.check_for_updates(owner, repo, repo_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The scheduler.check_for_updates method is a synchronous function that performs blocking I/O operations (like running subprocess). Calling it directly from an async endpoint will block the entire server's event loop, preventing it from handling other requests. You should run this blocking call in a separate thread using asyncio.to_thread.

Suggested change
result = scheduler.check_for_updates(owner, repo, repo_type)
result = await asyncio.to_thread(scheduler.check_for_updates, owner, repo, repo_type)

return UpdateCheckResponse(**result)
except Exception as e:
logger.error(f"Error checking for updates: {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/sync/status")
async def get_scheduler_status():
"""
Get the overall status of the sync scheduler.
"""
try:
scheduler = get_scheduler()
projects = scheduler.get_all_projects()

# Count projects by status
status_counts = {
"pending": 0,
"in_progress": 0,
"completed": 0,
"failed": 0,
"disabled": 0
}

for p in projects:
status = p.get("sync_status", "pending")
if not p.get("enabled", True):
status_counts["disabled"] += 1
elif status in status_counts:
status_counts[status] += 1

return {
"scheduler_running": scheduler._running,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Accessing the 'private' attribute _running directly is against encapsulation principles. It would be better to expose a public property on the SyncScheduler class to check its status. I've added a separate comment in api/sync_scheduler.py with a suggestion to add an is_running property.

"total_projects": len(projects),
"status_counts": status_counts,
"check_interval_seconds": scheduler.check_interval
}
except Exception as e:
logger.error(f"Error getting scheduler status: {e}")
raise HTTPException(status_code=500, detail=str(e))
Loading