Skip to content

Commit 0a9108c

Browse files
gautamg795Convex, Inc.
authored andcommitted
Commit every 1M rows when using COPY FROM STDIN (#39308)
GitOrigin-RevId: d546e0f851e3c1d85cbd2fbeabbad8d662a785d8
1 parent 844c8bd commit 0a9108c

File tree

1 file changed

+75
-39
lines changed

1 file changed

+75
-39
lines changed

crates/postgres/src/lib.rs

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ use crate::{
156156
metrics::QueryIndexStats,
157157
};
158158

159+
const ROWS_PER_COPY_BATCH: usize = 1_000_000;
160+
159161
pub struct PostgresPersistence {
160162
newly_created: AtomicBool,
161163
lease: Lease,
@@ -643,27 +645,46 @@ impl Persistence for PostgresPersistence {
643645
STDIN BINARY",
644646
)
645647
.await?;
646-
let sink = conn.copy_in(&stmt).await?;
647-
let writer = BinaryCopyInWriter::new(
648-
sink,
649-
&[
650-
Type::BYTEA,
651-
Type::INT8,
652-
Type::BYTEA,
653-
Type::BYTEA,
654-
Type::BOOL,
655-
Type::INT8,
656-
],
657-
);
658-
pin_mut!(writer);
659-
while let Some(chunk) = documents.next().await {
660-
for document in chunk {
661-
let params =
662-
document_params(document.ts, document.id, &document.value, document.prev_ts)?;
663-
writer.as_mut().write_raw(params).await?;
648+
649+
'outer: loop {
650+
let sink = conn.copy_in(&stmt).await?;
651+
let writer = BinaryCopyInWriter::new(
652+
sink,
653+
&[
654+
Type::BYTEA,
655+
Type::INT8,
656+
Type::BYTEA,
657+
Type::BYTEA,
658+
Type::BOOL,
659+
Type::INT8,
660+
],
661+
);
662+
pin_mut!(writer);
663+
664+
let mut batch_count = 0;
665+
666+
while let Some(chunk) = documents.next().await {
667+
for document in chunk {
668+
let params = document_params(
669+
document.ts,
670+
document.id,
671+
&document.value,
672+
document.prev_ts,
673+
)?;
674+
writer.as_mut().write_raw(params).await?;
675+
batch_count += 1;
676+
}
677+
678+
if batch_count >= ROWS_PER_COPY_BATCH {
679+
writer.finish().await?;
680+
continue 'outer;
681+
}
664682
}
683+
684+
writer.finish().await?;
685+
break;
665686
}
666-
writer.finish().await?;
687+
667688
Ok(())
668689
}
669690

@@ -682,28 +703,43 @@ impl Persistence for PostgresPersistence {
682703
deleted, table_id, document_id) FROM STDIN BINARY",
683704
)
684705
.await?;
685-
let sink = conn.copy_in(&stmt).await?;
686-
let writer = BinaryCopyInWriter::new(
687-
sink,
688-
&[
689-
Type::BYTEA,
690-
Type::INT8,
691-
Type::BYTEA,
692-
Type::BYTEA,
693-
Type::BYTEA,
694-
Type::BOOL,
695-
Type::BYTEA,
696-
Type::BYTEA,
697-
],
698-
);
699-
pin_mut!(writer);
700-
while let Some(chunk) = indexes.next().await {
701-
for index in chunk {
702-
let params = index_params(&index);
703-
writer.as_mut().write_raw(params).await?;
706+
707+
'outer: loop {
708+
let sink = conn.copy_in(&stmt).await?;
709+
let writer = BinaryCopyInWriter::new(
710+
sink,
711+
&[
712+
Type::BYTEA,
713+
Type::INT8,
714+
Type::BYTEA,
715+
Type::BYTEA,
716+
Type::BYTEA,
717+
Type::BOOL,
718+
Type::BYTEA,
719+
Type::BYTEA,
720+
],
721+
);
722+
pin_mut!(writer);
723+
724+
let mut batch_count = 0;
725+
726+
while let Some(chunk) = indexes.next().await {
727+
for index in chunk {
728+
let params = index_params(&index);
729+
writer.as_mut().write_raw(params).await?;
730+
batch_count += 1;
731+
}
732+
733+
if batch_count >= ROWS_PER_COPY_BATCH {
734+
writer.finish().await?;
735+
continue 'outer;
736+
}
704737
}
738+
739+
writer.finish().await?;
740+
break;
705741
}
706-
writer.finish().await?;
742+
707743
Ok(())
708744
}
709745

0 commit comments

Comments
 (0)