-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add periodic index synchronization feature [Opus5.1] #403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,5 +1,6 @@ | ||||||
| import os | ||||||
| import logging | ||||||
| from contextlib import asynccontextmanager | ||||||
| from fastapi import FastAPI, HTTPException, Query, Request, WebSocket | ||||||
| from fastapi.middleware.cors import CORSMiddleware | ||||||
| from fastapi.responses import JSONResponse, Response | ||||||
|
|
@@ -22,10 +23,49 @@ | |||||
| else: | ||||||
| logger.warning("GOOGLE_API_KEY not found in environment variables") | ||||||
|
|
||||||
| # Initialize FastAPI app | ||||||
|
|
||||||
| # --- Lifespan Context Manager --- | ||||||
| @asynccontextmanager | ||||||
| async def lifespan(app: FastAPI): | ||||||
| """ | ||||||
| Lifespan context manager for FastAPI application. | ||||||
|
|
||||||
| Handles startup and shutdown events for the sync scheduler. | ||||||
| """ | ||||||
| # Startup: Start the sync scheduler | ||||||
| logger.info("Starting sync scheduler...") | ||||||
| try: | ||||||
| # Import here to avoid circular imports | ||||||
| from api.sync_scheduler import start_scheduler, stop_scheduler | ||||||
|
|
||||||
| # Check if sync is enabled via environment variable | ||||||
| sync_enabled = os.environ.get("DEEPWIKI_SYNC_ENABLED", "true").lower() == "true" | ||||||
|
|
||||||
| if sync_enabled: | ||||||
| await start_scheduler() | ||||||
| logger.info("Sync scheduler started successfully") | ||||||
| else: | ||||||
| logger.info("Sync scheduler is disabled via DEEPWIKI_SYNC_ENABLED=false") | ||||||
| except Exception as e: | ||||||
| logger.error(f"Failed to start sync scheduler: {e}") | ||||||
|
|
||||||
| yield # Application runs here | ||||||
|
|
||||||
| # Shutdown: Stop the sync scheduler | ||||||
| logger.info("Stopping sync scheduler...") | ||||||
| try: | ||||||
| from api.sync_scheduler import stop_scheduler | ||||||
| await stop_scheduler() | ||||||
| logger.info("Sync scheduler stopped successfully") | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error stopping sync scheduler: {e}") | ||||||
|
|
||||||
|
|
||||||
| # Initialize FastAPI app with lifespan | ||||||
| app = FastAPI( | ||||||
| title="Streaming API", | ||||||
| description="API for streaming chat completions" | ||||||
| description="API for streaming chat completions", | ||||||
| lifespan=lifespan | ||||||
| ) | ||||||
|
|
||||||
| # Configure CORS | ||||||
|
|
@@ -122,6 +162,10 @@ class ModelConfig(BaseModel): | |||||
| defaultProvider: str = Field(..., description="ID of the default provider") | ||||||
|
|
||||||
| from api.config import configs | ||||||
| from api.sync_scheduler import ( | ||||||
| get_scheduler, start_scheduler, stop_scheduler, | ||||||
| SyncStatus, SyncMetadata | ||||||
| ) | ||||||
|
|
||||||
| @app.get("/models/config", response_model=ModelConfig) | ||||||
| async def get_model_config(): | ||||||
|
|
@@ -496,6 +540,16 @@ async def root(): | |||||
| "GET /api/wiki_cache - Retrieve cached wiki data", | ||||||
| "POST /api/wiki_cache - Store wiki data to cache" | ||||||
| ], | ||||||
| "Sync": [ | ||||||
| "GET /api/sync/status - Get sync scheduler status", | ||||||
| "GET /api/sync/projects - List all sync projects", | ||||||
| "POST /api/sync/projects - Add project for periodic sync", | ||||||
| "GET /api/sync/projects/{repo_type}/{owner}/{repo} - Get project sync status", | ||||||
| "PUT /api/sync/projects/{repo_type}/{owner}/{repo} - Update project sync settings", | ||||||
| "DELETE /api/sync/projects/{repo_type}/{owner}/{repo} - Remove project from sync", | ||||||
| "POST /api/sync/projects/{repo_type}/{owner}/{repo}/trigger - Manually trigger sync", | ||||||
| "GET /api/sync/projects/{repo_type}/{owner}/{repo}/check - Check for updates" | ||||||
| ], | ||||||
| "LocalRepo": [ | ||||||
| "GET /local_repo/structure - Get structure of a local repository (with path parameter)", | ||||||
| ], | ||||||
|
|
@@ -564,3 +618,232 @@ async def get_processed_projects(): | |||||
| except Exception as e: | ||||||
| logger.error(f"Error listing processed projects from {WIKI_CACHE_DIR}: {e}", exc_info=True) | ||||||
| raise HTTPException(status_code=500, detail="Failed to list processed projects from server cache.") | ||||||
|
|
||||||
|
|
||||||
| # --- Sync API Models --- | ||||||
|
|
||||||
| class SyncProjectRequest(BaseModel): | ||||||
| """Request model for adding a project for periodic sync""" | ||||||
| repo_url: str = Field(..., description="Full URL of the repository") | ||||||
| owner: str = Field(..., description="Repository owner/organization") | ||||||
| repo: str = Field(..., description="Repository name") | ||||||
| repo_type: str = Field(default="github", description="Type of repository (github, gitlab, bitbucket)") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For better validation and API documentation, it's recommended to use
Suggested change
|
||||||
| sync_interval_minutes: int = Field(default=60, description="How often to sync (in minutes)") | ||||||
| access_token: Optional[str] = Field(default=None, description="Access token for private repositories") | ||||||
| enabled: bool = Field(default=True, description="Whether sync is enabled") | ||||||
|
|
||||||
|
|
||||||
| class SyncProjectUpdateRequest(BaseModel): | ||||||
| """Request model for updating sync settings""" | ||||||
| sync_interval_minutes: Optional[int] = Field(default=None, description="How often to sync (in minutes)") | ||||||
| enabled: Optional[bool] = Field(default=None, description="Whether sync is enabled") | ||||||
|
|
||||||
|
|
||||||
| class SyncStatusResponse(BaseModel): | ||||||
| """Response model for sync status""" | ||||||
| repo_url: str | ||||||
| owner: str | ||||||
| repo: str | ||||||
| repo_type: str | ||||||
| last_synced: Optional[str] = None | ||||||
| last_commit_hash: Optional[str] = None | ||||||
| sync_status: str | ||||||
| sync_interval_minutes: int | ||||||
| document_count: int | ||||||
| embedding_count: int | ||||||
| error_message: Optional[str] = None | ||||||
| next_sync: Optional[str] = None | ||||||
| enabled: bool | ||||||
| created_at: Optional[str] = None | ||||||
| updated_at: Optional[str] = None | ||||||
|
|
||||||
|
|
||||||
| class SyncTriggerResponse(BaseModel): | ||||||
| """Response model for sync trigger""" | ||||||
| success: bool | ||||||
| skipped: Optional[bool] = None | ||||||
| reason: Optional[str] = None | ||||||
| document_count: Optional[int] = None | ||||||
| embedding_count: Optional[int] = None | ||||||
| commit_hash: Optional[str] = None | ||||||
| error: Optional[str] = None | ||||||
|
|
||||||
|
|
||||||
| class UpdateCheckResponse(BaseModel): | ||||||
| """Response model for update check""" | ||||||
| has_updates: bool | ||||||
| remote_commit: Optional[str] = None | ||||||
| local_commit: Optional[str] = None | ||||||
| changed_files: List[str] = [] | ||||||
| reason: Optional[str] = None | ||||||
| error: Optional[str] = None | ||||||
|
|
||||||
|
|
||||||
| # --- Sync API Endpoints --- | ||||||
|
|
||||||
| @app.post("/api/sync/projects", response_model=SyncStatusResponse) | ||||||
| async def add_sync_project(request: SyncProjectRequest): | ||||||
| """ | ||||||
| Add a project for periodic index synchronization. | ||||||
|
|
||||||
| This endpoint registers a repository for automatic periodic sync. | ||||||
| The scheduler will periodically check for changes and re-index when updates are detected. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| metadata = scheduler.add_project( | ||||||
| repo_url=request.repo_url, | ||||||
| owner=request.owner, | ||||||
| repo=request.repo, | ||||||
| repo_type=request.repo_type, | ||||||
| sync_interval_minutes=request.sync_interval_minutes, | ||||||
| access_token=request.access_token, | ||||||
| enabled=request.enabled | ||||||
| ) | ||||||
| logger.info(f"Added project for sync: {request.owner}/{request.repo}") | ||||||
| return SyncStatusResponse(**metadata.to_dict()) | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error adding project for sync: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.get("/api/sync/projects", response_model=List[SyncStatusResponse]) | ||||||
| async def get_all_sync_projects(): | ||||||
| """ | ||||||
| Get all projects registered for periodic synchronization. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| projects = scheduler.get_all_projects() | ||||||
| return [SyncStatusResponse(**p) for p in projects] | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error getting sync projects: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.get("/api/sync/projects/{repo_type}/{owner}/{repo}", response_model=SyncStatusResponse) | ||||||
| async def get_sync_project_status(repo_type: str, owner: str, repo: str): | ||||||
| """ | ||||||
| Get sync status for a specific project. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| status = scheduler.get_project_status(owner, repo, repo_type) | ||||||
| if not status: | ||||||
| raise HTTPException(status_code=404, detail="Project not found") | ||||||
| return SyncStatusResponse(**status) | ||||||
| except HTTPException: | ||||||
| raise | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error getting project status: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.put("/api/sync/projects/{repo_type}/{owner}/{repo}", response_model=SyncStatusResponse) | ||||||
| async def update_sync_project(repo_type: str, owner: str, repo: str, request: SyncProjectUpdateRequest): | ||||||
| """ | ||||||
| Update sync settings for a project. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| status = scheduler.update_project_settings( | ||||||
| owner=owner, | ||||||
| repo=repo, | ||||||
| repo_type=repo_type, | ||||||
| sync_interval_minutes=request.sync_interval_minutes, | ||||||
| enabled=request.enabled | ||||||
| ) | ||||||
| if not status: | ||||||
| raise HTTPException(status_code=404, detail="Project not found") | ||||||
| return SyncStatusResponse(**status) | ||||||
| except HTTPException: | ||||||
| raise | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error updating project settings: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.delete("/api/sync/projects/{repo_type}/{owner}/{repo}") | ||||||
| async def remove_sync_project(repo_type: str, owner: str, repo: str): | ||||||
| """ | ||||||
| Remove a project from periodic synchronization. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| success = scheduler.remove_project(owner, repo, repo_type) | ||||||
| if not success: | ||||||
| raise HTTPException(status_code=404, detail="Project not found or could not be removed") | ||||||
| return {"message": f"Project {owner}/{repo} removed from sync"} | ||||||
| except HTTPException: | ||||||
| raise | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error removing project: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.post("/api/sync/projects/{repo_type}/{owner}/{repo}/trigger", response_model=SyncTriggerResponse) | ||||||
| async def trigger_sync(repo_type: str, owner: str, repo: str): | ||||||
| """ | ||||||
| Manually trigger a sync for a project. | ||||||
|
|
||||||
| This will force a re-index of the repository, regardless of whether changes are detected. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| result = await scheduler.trigger_sync(owner, repo, repo_type) | ||||||
| return SyncTriggerResponse(**result) | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error triggering sync: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.get("/api/sync/projects/{repo_type}/{owner}/{repo}/check", response_model=UpdateCheckResponse) | ||||||
| async def check_for_updates(repo_type: str, owner: str, repo: str): | ||||||
| """ | ||||||
| Check if a project has updates without actually syncing. | ||||||
|
|
||||||
| This is useful to preview what would happen during a sync. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| result = scheduler.check_for_updates(owner, repo, repo_type) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
| return UpdateCheckResponse(**result) | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error checking for updates: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
|
|
||||||
|
|
||||||
| @app.get("/api/sync/status") | ||||||
| async def get_scheduler_status(): | ||||||
| """ | ||||||
| Get the overall status of the sync scheduler. | ||||||
| """ | ||||||
| try: | ||||||
| scheduler = get_scheduler() | ||||||
| projects = scheduler.get_all_projects() | ||||||
|
|
||||||
| # Count projects by status | ||||||
| status_counts = { | ||||||
| "pending": 0, | ||||||
| "in_progress": 0, | ||||||
| "completed": 0, | ||||||
| "failed": 0, | ||||||
| "disabled": 0 | ||||||
| } | ||||||
|
|
||||||
| for p in projects: | ||||||
| status = p.get("sync_status", "pending") | ||||||
| if not p.get("enabled", True): | ||||||
| status_counts["disabled"] += 1 | ||||||
| elif status in status_counts: | ||||||
| status_counts[status] += 1 | ||||||
|
|
||||||
| return { | ||||||
| "scheduler_running": scheduler._running, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| "total_projects": len(projects), | ||||||
| "status_counts": status_counts, | ||||||
| "check_interval_seconds": scheduler.check_interval | ||||||
| } | ||||||
| except Exception as e: | ||||||
| logger.error(f"Error getting scheduler status: {e}") | ||||||
| raise HTTPException(status_code=500, detail=str(e)) | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several unused imports here.
start_schedulerandstop_schedulerare imported locally within thelifespanfunction, andSyncStatusandSyncMetadataare not used in this file. Removing them will clean up the code and avoid potential confusion.