Skip to content

Commit 586a7b3

Browse files
authored
fix: dont throttle tpuf during incremental sync (#5832)
1 parent f55104a commit 586a7b3

File tree

1 file changed

+29
-25
lines changed
  • servers/fai/src/fai/utils/turbopuffer

1 file changed

+29
-25
lines changed

servers/fai/src/fai/utils/turbopuffer/sync.py

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -508,34 +508,38 @@ async def sync_index_to_target_incremental(
508508
return
509509

510510
total_synced = 0
511-
for parent_id in parent_ids:
512-
last_id = None
513-
while True:
514-
filter_conditions = [("parent_id", "Eq", parent_id)]
515-
if last_id is not None:
516-
filter_conditions.append(("id", "Gt", last_id))
511+
last_id = None
512+
513+
while True:
514+
filter_conditions = [("parent_id", "In", parent_ids)]
515+
if last_id is not None:
516+
filter_conditions.append(("id", "Gt", last_id))
517+
518+
result = await source_ns.query(
519+
rank_by=("id", "asc"),
520+
top_k=10000,
521+
include_attributes=True,
522+
filters=("And", filter_conditions),
523+
)
517524

518-
result = await source_ns.query(
519-
rank_by=("id", "asc"),
520-
top_k=1000,
521-
include_attributes=True,
522-
filters=("And", filter_conditions),
523-
)
525+
if not result.rows:
526+
break
524527

525-
prefixed_rows = []
526-
for row in result.rows:
527-
new_row = Row.from_dict(row.model_dump())
528-
new_row.id = prefixed_id(source_namespace_id, row.id)
529-
new_row.source = source_index_name
530-
prefixed_rows.append(new_row)
528+
prefixed_rows = []
529+
for row in result.rows:
530+
new_row = Row.from_dict(row.model_dump())
531+
new_row.id = prefixed_id(source_namespace_id, row.id)
532+
new_row.source = source_index_name
533+
prefixed_rows.append(new_row)
531534

532-
await target_ns.write(
533-
upsert_rows=prefixed_rows, distance_metric="cosine_distance", schema=get_query_index_tpuf_schema()
534-
)
535-
total_synced += len(prefixed_rows)
535+
await target_ns.write(
536+
upsert_rows=prefixed_rows, distance_metric="cosine_distance", schema=get_query_index_tpuf_schema()
537+
)
538+
total_synced += len(prefixed_rows)
539+
LOGGER.info(f"Synced batch: {len(prefixed_rows)} records (total: {total_synced})")
536540

537-
if len(result.rows) < 1000:
538-
break
539-
last_id = result.rows[-1].id
541+
if len(result.rows) < 10000:
542+
break
543+
last_id = result.rows[-1].id
540544

541545
LOGGER.info(f"Incremental sync completed: {total_synced} total records synced to {target_namespace_id}")

0 commit comments

Comments
 (0)