11import asyncio
2+ import uuid
3+ from datetime import (
4+ UTC ,
5+ datetime ,
6+ )
27
38from fastapi import (
49 Body ,
3338 ReindexWebsiteRequest ,
3439 ReindexWebsiteResponse ,
3540)
41+ from fai .models .db .index_source_db import (
42+ IndexSourceDb ,
43+ SourceType ,
44+ )
3645from fai .models .db .website_db import WebsiteDb
3746from fai .settings import LOGGER
3847from fai .utils .jobs import job_manager
@@ -63,29 +72,44 @@ async def index_website(
6372 Returns a job_id to track the crawling progress.
6473 """
6574 try :
75+ # Check if IndexSourceDb already exists for this base_url
76+ result = await db .execute (
77+ select (IndexSourceDb ).where (
78+ IndexSourceDb .domain == domain ,
79+ IndexSourceDb .source_type == SourceType .WEBSITE ,
80+ IndexSourceDb .source_identifier == body .base_url ,
81+ )
82+ )
83+ index_source = result .scalar_one_or_none ()
84+
6685 # Create a job to track the crawling process
6786 job_id = await job_manager .create_job (db )
6887
69- # TODO: Implement the actual crawling logic as a background task
70- # This would typically involve:
71- # 1. Using a web crawler (e.g., Firecrawl, Scrapy, Playwright)
72- # 2. Following links up to max_depth
73- # 3. Applying include/exclude patterns
74- # 4. Extracting content from each page
75- # 5. Chunking if needed
76- # 6. Creating WebsiteSourceDb rows
77- # 7. Syncing to Turbopuffer
78- # 8. Updating job status
79-
80- # For now, we'll create a placeholder task
81- asyncio .create_task (
82- _crawl_website_job (
83- job_id = job_id ,
88+ if index_source :
89+ # Update existing source
90+ index_source .status = "indexing"
91+ index_source .last_job_id = job_id
92+ index_source .config = body .model_dump ()
93+ index_source .updated_at = datetime .now (UTC )
94+ else :
95+ # Create new source
96+ source_id = str (uuid .uuid4 ())
97+ index_source = IndexSourceDb (
98+ id = source_id ,
8499 domain = domain ,
85- config = body ,
86- db = db ,
100+ source_type = SourceType .WEBSITE ,
101+ source_identifier = body .base_url ,
102+ config = body .model_dump (),
103+ status = "indexing" ,
104+ last_job_id = job_id ,
105+ created_at = datetime .now (UTC ),
106+ updated_at = datetime .now (UTC ),
87107 )
88- )
108+ db .add (index_source )
109+
110+ await db .commit ()
111+
112+ asyncio .create_task (job_manager .execute_job (job_id , _crawl_website_job , index_source .id , domain , body , db ))
89113
90114 LOGGER .info (f"Started website crawl job { job_id } for domain: { domain } , base_url: { body .base_url } " )
91115 return JSONResponse (
@@ -99,6 +123,7 @@ async def index_website(
99123
100124async def _crawl_website_job (
101125 job_id : str ,
126+ source_id : str ,
102127 domain : str ,
103128 config : IndexWebsiteRequest ,
104129 db : AsyncSession ,
@@ -115,9 +140,6 @@ async def _crawl_website_job(
115140 6. Update job status
116141 """
117142 try :
118- # Update job status to processing
119- # await job_manager.update_job_status(db, job_id, "PROCESSING")
120-
121143 # Placeholder for actual crawling logic
122144 # pages = await crawl_website(config.base_url, config.max_depth, ...)
123145
@@ -129,13 +151,40 @@ async def _crawl_website_job(
129151 # await sync_website_db_to_tpuf(domain, db)
130152 # await sync_index_to_target(domain, get_website_index_name(), get_query_index_name())
131153
132- # Update job status to completed
133- # await job_manager.update_job_status(db, job_id, "COMPLETED")
154+ # Update IndexSourceDb on success
155+ result = await db .execute (select (IndexSourceDb ).where (IndexSourceDb .id == source_id ))
156+ index_source = result .scalar_one_or_none ()
157+
158+ if index_source :
159+ index_source .status = "active"
160+ index_source .last_indexed_at = datetime .now (UTC )
161+ index_source .updated_at = datetime .now (UTC )
162+ index_source .last_error = None
163+ index_source .last_error_at = None
164+
165+ # Update metrics with crawl results
166+ # This should be populated with actual results from the crawling logic
167+ index_source .metrics = {
168+ "pages_indexed" : 0 , # TODO: Update with actual count
169+ "pages_failed" : 0 , # TODO: Update with actual count
170+ }
171+
172+ await db .commit ()
134173
135174 LOGGER .info (f"Completed website crawl job { job_id } for domain: { domain } " )
136175 except Exception as e :
137176 LOGGER .exception (f"Failed to complete website crawl job { job_id } " )
138- # await job_manager.update_job_status(db, job_id, "FAILED", error=str(e))
177+
178+ # Update IndexSourceDb on failure
179+ result = await db .execute (select (IndexSourceDb ).where (IndexSourceDb .id == source_id ))
180+ index_source = result .scalar_one_or_none ()
181+
182+ if index_source :
183+ index_source .status = "failed"
184+ index_source .last_error = str (e )
185+ index_source .last_error_at = datetime .now (UTC )
186+ index_source .updated_at = datetime .now (UTC )
187+ await db .commit ()
139188
140189
141190@fai_app .get (
@@ -159,19 +208,33 @@ async def get_website_status(
159208 if not job :
160209 return JSONResponse (status_code = 404 , content = {"detail" : "Job not found" })
161210
162- # TODO: Get actual page counts from the database
163- # Count pages indexed for this job's base_url
164- pages_indexed = 0
165- pages_failed = 0
166- base_url = "" # Should be stored with the job
211+ # Find the IndexSourceDb that corresponds to this job
212+ result = await db .execute (select (IndexSourceDb ).where (IndexSourceDb .last_job_id == job_id ))
213+ index_source = result .scalar_one_or_none ()
214+
215+ if not index_source :
216+ return JSONResponse (status_code = 404 , content = {"detail" : "Source not found for this job" })
217+
218+ # Get metrics from IndexSourceDb
219+ metrics = index_source .metrics or {}
220+ pages_indexed = metrics .get ("pages_indexed" , 0 )
221+ pages_failed = metrics .get ("pages_failed" , 0 )
222+
223+ # Determine status: use job status if in progress, otherwise use source status
224+ if job .status .value in ["pending" , "in_progress" ]:
225+ status = job .status .value
226+ error = None
227+ else :
228+ status = index_source .status
229+ error = index_source .last_error or job .error
167230
168231 response = GetWebsiteStatusResponse (
169232 job_id = job .id ,
170- status = job . status . value ,
171- base_url = base_url ,
233+ status = status ,
234+ base_url = index_source . source_identifier ,
172235 pages_indexed = pages_indexed ,
173236 pages_failed = pages_failed ,
174- error = job . error ,
237+ error = error ,
175238 )
176239
177240 return JSONResponse (jsonable_encoder (response ))
@@ -300,11 +363,50 @@ async def reindex_website(
300363
301364 LOGGER .info (f"Deleted { len (websites )} pages from { body .base_url } for domain: { domain } " )
302365
366+ # Find or create IndexSourceDb for this base_url
367+ result = await db .execute (
368+ select (IndexSourceDb ).where (
369+ IndexSourceDb .domain == domain ,
370+ IndexSourceDb .source_type == SourceType .WEBSITE ,
371+ IndexSourceDb .source_identifier == body .base_url ,
372+ )
373+ )
374+ index_source = result .scalar_one_or_none ()
375+
303376 # Create a new crawl job
304377 job_id = await job_manager .create_job (db )
305378
306- # TODO: Start the crawling job similar to index_website
307- # asyncio.create_task(_crawl_website_job(...))
379+ if index_source :
380+ # Update existing source
381+ index_source .status = "indexing"
382+ index_source .last_job_id = job_id
383+ index_source .updated_at = datetime .now (UTC )
384+ # Reset metrics for reindexing
385+ index_source .metrics = {}
386+ else :
387+ # Create new source if it doesn't exist
388+ source_id = str (uuid .uuid4 ())
389+ index_source = IndexSourceDb (
390+ id = source_id ,
391+ domain = domain ,
392+ source_type = SourceType .WEBSITE ,
393+ source_identifier = body .base_url ,
394+ config = {"base_url" : body .base_url },
395+ status = "indexing" ,
396+ last_job_id = job_id ,
397+ created_at = datetime .now (UTC ),
398+ updated_at = datetime .now (UTC ),
399+ )
400+ db .add (index_source )
401+
402+ await db .commit ()
403+
404+ # Start the crawling job
405+ asyncio .create_task (
406+ job_manager .execute_job (
407+ job_id , _crawl_website_job , index_source .id , domain , IndexWebsiteRequest (base_url = body .base_url ), db
408+ )
409+ )
308410
309411 LOGGER .info (f"Started website re-crawl job { job_id } for domain: { domain } , base_url: { body .base_url } " )
310412 return JSONResponse (
0 commit comments