Skip to content

Commit a1f7095

Browse files
authored
feat: add smarter batching for incremental sync based on updated parent_ids (#5862)
1 parent 6894cb0 commit a1f7095

File tree

2 files changed

+87
-49
lines changed
  • servers
    • fai-reindexing/src/services
    • fai/src/fai/utils/turbopuffer

2 files changed

+87
-49
lines changed

servers/fai-reindexing/src/services/sync.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export async function syncToQueryIndexIncremental(domain: string, parentIds: str
4545
}
4646

4747
async function pollJobStatus(jobId: string, domain: string): Promise<void> {
48-
const log = createDomainLogger(domain);
48+
const logger = createDomainLogger(domain);
4949
const maxAttempts = 1000;
5050
let attempts = 0;
5151

@@ -54,15 +54,17 @@ async function pollJobStatus(jobId: string, domain: string): Promise<void> {
5454
const statusResponse = await faiClient.index.getJobStatus(jobId);
5555
const { status, success, error } = statusResponse;
5656

57-
log.info("Job status", { jobId, status, success, error, attempt: attempts + 1, maxAttempts });
57+
logger.info("Job status", { jobId, status, success, error, attempt: attempts + 1, maxAttempts });
5858

5959
if (status === "completed") {
6060
if (success === false) {
61+
logger.error("Sync job completed but failed", { jobId, error });
6162
const errorMessage = typeof error === "object" ? JSON.stringify(error) : error || "Unknown error";
62-
throw new Error(`Sync job completed but failed: ${errorMessage}`);
63+
throw new Error(`Sync job failed: ${errorMessage}`);
6364
}
6465
return;
6566
} else if (status === "failed") {
67+
logger.error("Sync job failed", { jobId, error });
6668
const errorMessage = typeof error === "object" ? JSON.stringify(error) : error || "Unknown error";
6769
throw new Error(`Sync job failed: ${errorMessage}`);
6870
}
@@ -71,16 +73,19 @@ async function pollJobStatus(jobId: string, domain: string): Promise<void> {
7173
attempts++;
7274
} catch (error) {
7375
const errorMessage = error instanceof Error ? error.message : String(error);
76+
if (errorMessage.includes("Sync job failed:")) {
77+
throw error;
78+
}
7479
attempts++;
7580
if (attempts >= maxAttempts) {
76-
log.error("Error polling job status - max attempts exceeded", {
81+
logger.error("Error polling job status - max attempts exceeded", {
7782
error: errorMessage,
7883
attempts,
7984
maxAttempts
8085
});
8186
throw error;
8287
}
83-
log.warn("Error polling job status - retrying", { error: errorMessage, attempt: attempts, maxAttempts });
88+
logger.warn("Error polling job status - retrying", { error: errorMessage, attempt: attempts, maxAttempts });
8489
await new Promise((resolve) => setTimeout(resolve, RETRY_CONFIG.SYNC_RETRY_DELAY_MS));
8590
}
8691
}

servers/fai/src/fai/utils/turbopuffer/sync.py

Lines changed: 77 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sqlalchemy.ext.asyncio import AsyncSession
77
from turbopuffer import (
88
NOT_GIVEN,
9+
APITimeoutError,
910
AsyncTurbopuffer,
1011
)
1112
from turbopuffer.types.row import Row
@@ -483,63 +484,95 @@ async def sync_index_to_target_incremental(
483484
async with AsyncTurbopuffer(
484485
region=CONFIG.TURBOPUFFER_DEFAULT_REGION,
485486
api_key=VARIABLES.TURBOPUFFER_API_KEY,
487+
max_retries=0,
488+
timeout=30.0,
486489
) as tpuf_client:
487490
source_ns = tpuf_client.namespace(source_namespace_id)
491+
await source_ns.hint_cache_warm()
488492
target_ns = tpuf_client.namespace(target_namespace_id)
489493

490-
try:
491-
await target_ns.write(
492-
delete_by_filter=("And", [("source", "Eq", source_index_name), ("parent_id", "In", parent_ids)])
493-
)
494-
LOGGER.info(f"Deleted old records for {len(parent_ids)} parent_ids from target")
495-
except Exception as e:
496-
LOGGER.warning(f"Batch delete failed, trying individual deletes: {e}")
497-
for parent_id in parent_ids:
498-
try:
499-
await target_ns.write(
500-
delete_by_filter=("And", [("source", "Eq", source_index_name), ("parent_id", "Eq", parent_id)])
494+
delete_parent_id_batch_size = len(parent_ids)
495+
while delete_parent_id_batch_size >= 1:
496+
try:
497+
total_deleted = 0
498+
for i in range(0, len(parent_ids), delete_parent_id_batch_size):
499+
batch = parent_ids[i:min(i+delete_parent_id_batch_size, len(parent_ids))]
500+
result = await target_ns.write(
501+
delete_by_filter=("And", [("source", "Eq", source_index_name), ("parent_id", "In", batch)])
501502
)
502-
except Exception as delete_error:
503-
LOGGER.warning(f"Failed to delete parent_id {parent_id}: {delete_error}")
503+
LOGGER.info(
504+
f"Deleted {result.rows_deleted or 0} records for {len(batch)} parent_ids"
505+
)
506+
total_deleted += result.rows_deleted or 0
507+
LOGGER.info(
508+
f"Successfully deleted {total_deleted} records from {target_namespace_id}"
509+
)
510+
break
511+
except APITimeoutError:
512+
LOGGER.warning(
513+
f"Batch delete with size {delete_parent_id_batch_size} timed out, "
514+
f"trying batch size {delete_parent_id_batch_size // 2}"
515+
)
516+
delete_parent_id_batch_size = delete_parent_id_batch_size // 2
517+
if delete_parent_id_batch_size < 1:
518+
LOGGER.error("Failed to delete records even with batch size 1")
519+
raise
504520

505521
source_ns_exists = await source_ns.exists()
506522
if not source_ns_exists:
507523
LOGGER.warning(f"Source namespace {source_namespace_id} does not exist")
508524
return
509525

510526
total_synced = 0
511-
last_id = None
512-
513-
while True:
514-
filter_conditions = [("parent_id", "In", parent_ids)]
515-
if last_id is not None:
516-
filter_conditions.append(("id", "Gt", last_id))
517-
518-
result = await source_ns.query(
519-
rank_by=("id", "asc"),
520-
top_k=10000,
521-
include_attributes=True,
522-
filters=("And", filter_conditions),
523-
)
524-
525-
if not result.rows:
526-
break
527+
sync_parent_id_batch_size = len(parent_ids)
527528

528-
prefixed_rows = []
529-
for row in result.rows:
530-
new_row = Row.from_dict(row.model_dump())
531-
new_row.id = prefixed_id(source_namespace_id, row.id)
532-
new_row.source = source_index_name
533-
prefixed_rows.append(new_row)
534-
535-
await target_ns.write(
536-
upsert_rows=prefixed_rows, distance_metric="cosine_distance", schema=get_query_index_tpuf_schema()
537-
)
538-
total_synced += len(prefixed_rows)
539-
LOGGER.info(f"Synced batch: {len(prefixed_rows)} records (total: {total_synced})")
540-
541-
if len(result.rows) < 10000:
529+
while sync_parent_id_batch_size >= 1:
530+
try:
531+
for parent_idx in range(0, len(parent_ids), sync_parent_id_batch_size):
532+
last_id = None
533+
parent_id_batch = parent_ids[parent_idx:min(parent_idx+sync_parent_id_batch_size, len(parent_ids))]
534+
filter_conditions = [("parent_id", "In", parent_id_batch)]
535+
while True:
536+
if last_id is not None:
537+
filter_conditions.append(("id", "Gt", last_id))
538+
539+
result = await source_ns.query(
540+
rank_by=("id", "asc"),
541+
top_k=1000,
542+
include_attributes=True,
543+
filters=("And", filter_conditions),
544+
)
545+
546+
if not result.rows:
547+
break
548+
549+
prefixed_rows = []
550+
for row in result.rows:
551+
new_row = Row.from_dict(row.model_dump())
552+
new_row.id = prefixed_id(source_namespace_id, row.id)
553+
new_row.source = source_index_name
554+
prefixed_rows.append(new_row)
555+
556+
await target_ns.write(
557+
upsert_rows=prefixed_rows,
558+
distance_metric="cosine_distance",
559+
schema=get_query_index_tpuf_schema(),
560+
)
561+
total_synced += len(prefixed_rows)
562+
LOGGER.info(f"Synced batch: {len(prefixed_rows)} records (total: {total_synced})")
563+
564+
last_id = result.rows[-1].id
565+
if len(result.rows) < 1000:
566+
break
542567
break
543-
last_id = result.rows[-1].id
568+
except APITimeoutError:
569+
LOGGER.warning(
570+
f"Batch sync with size {sync_parent_id_batch_size} timed out, "
571+
f"trying batch size {sync_parent_id_batch_size // 2}"
572+
)
573+
sync_parent_id_batch_size = sync_parent_id_batch_size // 2
574+
if sync_parent_id_batch_size < 1:
575+
LOGGER.error("Failed to sync records even with batch size 1")
576+
raise
544577

545578
LOGGER.info(f"Incremental sync completed: {total_synced} total records synced to {target_namespace_id}")

0 commit comments

Comments
 (0)