Skip to content

Commit 06d9dfd

Browse files
authored
feat(fai): add incremental sync to target function (#5825)
1 parent c44488d commit 06d9dfd

File tree

3 files changed

+112
-1
lines changed

3 files changed

+112
-1
lines changed

servers/fai/src/fai/models/api/index_api.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ class SyncIndexRequest(BaseModel):
1414
index_name: str = Field(description="The name of the index to sync")
1515

1616

17+
class SyncIndexIncrementalRequest(BaseModel):
18+
index_name: str = Field(description="The name of the index to sync")
19+
parent_ids: list[str] = Field(description="List of parent_ids to incrementally sync")
20+
21+
1722
class SyncIndexResponse(BaseModel):
1823
job_id: str = Field(description="The ID of the sync job")
1924

servers/fai/src/fai/routes/index.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from fai.models.api.index_api import (
1414
JobStatusResponse,
1515
ReconstructIndexResponse,
16+
SyncIndexIncrementalRequest,
1617
SyncIndexRequest,
1718
SyncIndexResponse,
1819
)
@@ -23,7 +24,7 @@
2324
)
2425
from fai.utils.turbopuffer.namespace import get_query_index_name
2526
from fai.utils.turbopuffer.reconstruct import reconstruct_query_index_for_domain
26-
from fai.utils.turbopuffer.sync import sync_index_to_target
27+
from fai.utils.turbopuffer.sync import sync_index_to_target, sync_index_to_target_incremental
2728

2829

2930
@fai_app.post(
@@ -69,6 +70,38 @@ async def sync_index_to_query_index(
6970
return JSONResponse(status_code=500, content={"detail": str(e)})
7071

7172

73+
@fai_app.post(
74+
"/index/{domain}/sync-incremental",
75+
response_model=SyncIndexResponse,
76+
openapi_extra={"x-fern-audiences": ["internal"], "security": [{"bearerAuth": []}]},
77+
)
78+
async def sync_index_to_query_index_incremental(
79+
domain: str,
80+
body: SyncIndexIncrementalRequest,
81+
db: AsyncSession = Depends(get_db),
82+
_: None = Depends(verify_token),
83+
) -> JSONResponse:
84+
try:
85+
job_id = await job_manager.create_job(db)
86+
87+
asyncio.create_task(
88+
job_manager.execute_job(
89+
job_id,
90+
sync_index_to_target_incremental,
91+
domain,
92+
body.index_name,
93+
get_query_index_name(),
94+
body.parent_ids,
95+
)
96+
)
97+
98+
return JSONResponse(jsonable_encoder(SyncIndexResponse(job_id=job_id)))
99+
100+
except Exception as e:
101+
LOGGER.exception("Failed to create incremental sync job")
102+
return JSONResponse(status_code=500, content={"detail": str(e)})
103+
104+
72105
@fai_app.get(
73106
"/jobs/{job_id}/status", response_model=JobStatusResponse, openapi_extra={"x-fern-audiences": ["internal"]}
74107
)

servers/fai/src/fai/utils/turbopuffer/sync.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,76 @@ async def sync_index_to_target(domain: str, source_index_name: str, target_index
461461
if len(result.rows) < 1000:
462462
break
463463
last_id = result.rows[-1].id
464+
465+
466+
async def sync_index_to_target_incremental(
467+
domain: str, source_index_name: str, target_index_name: str, parent_ids: list[str]
468+
) -> None:
469+
"""
470+
Incrementally sync only specific parent_ids from source index to target query index.
471+
This is more efficient than full sync when only a subset of content has changed.
472+
"""
473+
if not parent_ids:
474+
LOGGER.info("No parent_ids provided for incremental sync, skipping")
475+
return
476+
477+
source_namespace_id = get_tpuf_namespace(domain, source_index_name)
478+
target_namespace_id = get_tpuf_namespace(domain, target_index_name)
479+
LOGGER.info(
480+
f"Incrementally syncing {len(parent_ids)} parent_ids from {source_namespace_id} to {target_namespace_id}"
481+
)
482+
483+
async with AsyncTurbopuffer(
484+
region=CONFIG.TURBOPUFFER_DEFAULT_REGION,
485+
api_key=VARIABLES.TURBOPUFFER_API_KEY,
486+
) as tpuf_client:
487+
source_ns = tpuf_client.namespace(source_namespace_id)
488+
target_ns = tpuf_client.namespace(target_namespace_id)
489+
490+
for parent_id in parent_ids:
491+
try:
492+
await target_ns.write(
493+
delete_by_filter=[["source", "Eq", source_index_name], ["parent_id", "Eq", parent_id]]
494+
)
495+
except Exception as e:
496+
# The parent_id might not exist in target yet (net new)
497+
LOGGER.warning(f"Failed to delete parent_id {parent_id} from target: {e}")
498+
499+
LOGGER.info(f"Deleted old records for {len(parent_ids)} parent_ids from target")
500+
501+
source_ns_exists = await source_ns.exists()
502+
if not source_ns_exists:
503+
LOGGER.warning(f"Source namespace {source_namespace_id} does not exist")
504+
return
505+
506+
total_synced = 0
507+
for parent_id in parent_ids:
508+
last_id = None
509+
while True:
510+
result = await source_ns.query(
511+
rank_by=("id", "asc"),
512+
top_k=1000,
513+
include_attributes=True,
514+
filters=[
515+
["parent_id", "Eq", parent_id],
516+
["id", "Gt", last_id] if last_id is not None else NOT_GIVEN,
517+
],
518+
)
519+
520+
prefixed_rows = []
521+
for row in result.rows:
522+
new_row = Row.from_dict(row.model_dump())
523+
new_row.id = prefixed_id(source_namespace_id, row.id)
524+
new_row.source = source_index_name
525+
prefixed_rows.append(new_row)
526+
527+
await target_ns.write(
528+
upsert_rows=prefixed_rows, distance_metric="cosine_distance", schema=get_query_index_tpuf_schema()
529+
)
530+
total_synced += len(prefixed_rows)
531+
532+
if len(result.rows) < 1000:
533+
break
534+
last_id = result.rows[-1].id
535+
536+
LOGGER.info(f"Incremental sync completed: {total_synced} total records synced to {target_namespace_id}")

0 commit comments

Comments
 (0)