Skip to content

Commit d55fec3

Browse files
goffrieConvex, Inc.
authored andcommitted
Use UNNEST to insert into postgres tables (#39403)
GitOrigin-RevId: 7e6f5666e61866a91898d587c5dee9d3fbe608ce
1 parent 99a14bc commit d55fec3

File tree

2 files changed

+82
-192
lines changed

2 files changed

+82
-192
lines changed

crates/postgres/src/lib.rs

Lines changed: 76 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod metrics;
99
mod tests;
1010

1111
use std::{
12+
array,
1213
cmp,
1314
collections::{
1415
BTreeMap,
@@ -98,13 +99,11 @@ use fastrace::{
9899
use futures::{
99100
future::{
100101
self,
101-
Either,
102102
},
103103
pin_mut,
104104
stream::{
105105
self,
106106
BoxStream,
107-
FuturesUnordered,
108107
StreamExt,
109108
TryStreamExt,
110109
},
@@ -397,116 +396,56 @@ impl Persistence for PostgresPersistence {
397396
self.lease
398397
.transact(move |tx| {
399398
async move {
400-
let (insert_document, insert_document_chunk, insert_index, insert_index_chunk) =
401-
try_join!(
402-
match conflict_strategy {
403-
ConflictStrategy::Error => tx.prepare_cached(INSERT_DOCUMENT),
404-
ConflictStrategy::Overwrite =>
405-
tx.prepare_cached(INSERT_OVERWRITE_DOCUMENT),
406-
},
407-
match conflict_strategy {
408-
ConflictStrategy::Error => tx.prepare_cached(INSERT_DOCUMENT_CHUNK),
409-
ConflictStrategy::Overwrite =>
410-
tx.prepare_cached(INSERT_OVERWRITE_DOCUMENT_CHUNK),
411-
},
412-
match conflict_strategy {
413-
ConflictStrategy::Error => tx.prepare_cached(INSERT_INDEX),
414-
ConflictStrategy::Overwrite =>
415-
tx.prepare_cached(INSERT_OVERWRITE_INDEX),
416-
},
417-
match conflict_strategy {
418-
ConflictStrategy::Error => tx.prepare_cached(INSERT_INDEX_CHUNK),
419-
ConflictStrategy::Overwrite =>
420-
tx.prepare_cached(INSERT_OVERWRITE_INDEX_CHUNK),
421-
},
422-
)?;
423-
424-
// Since the documents and indexes already in memory, blast it into the
425-
// Postgres connection as fast as we can with
426-
// unbounded query pipelining. If we hadn't fully
427-
// "hydrated" the inputs to this part of the system already, we
428-
// could use a bounded pipeline here to allow backpressure from Postgres
429-
// (or any intermediate component like TCP, tokio, etc.) to flow up to
430-
// our inputs. But in this case, we just want to get `documents` done as
431-
// quickly as possible.
432-
{
433-
// Use a `FuturesUnordered` to fork off concurrent work.
434-
let mut futures = FuturesUnordered::new();
435-
436-
// First, process all of the full document chunks, forking off
437-
// insertions to our `FuturesUnordered` set
438-
// as we encounter them.
439-
let mut document_chunks = documents.chunks_exact(CHUNK_SIZE);
440-
for chunk in &mut document_chunks {
441-
let mut params = Vec::with_capacity(chunk.len() * NUM_DOCUMENT_PARAMS);
442-
for update in chunk {
443-
params.extend(document_params(
444-
update.ts,
445-
update.id,
446-
&update.value,
447-
update.prev_ts,
448-
)?);
449-
}
450-
let future = async {
451-
let timer = metrics::insert_document_chunk_timer();
452-
tx.execute_raw(&insert_document_chunk, params).await?;
453-
timer.finish();
454-
Ok::<_, anyhow::Error>(())
455-
};
456-
futures.push(Either::Left(Either::Left(future)));
457-
}
399+
let (insert_documents, insert_indexes) = try_join!(
400+
match conflict_strategy {
401+
ConflictStrategy::Error => tx.prepare_cached(INSERT_DOCUMENT),
402+
ConflictStrategy::Overwrite =>
403+
tx.prepare_cached(INSERT_OVERWRITE_DOCUMENT),
404+
},
405+
match conflict_strategy {
406+
ConflictStrategy::Error => tx.prepare_cached(INSERT_INDEX),
407+
ConflictStrategy::Overwrite =>
408+
tx.prepare_cached(INSERT_OVERWRITE_INDEX),
409+
},
410+
)?;
458411

459-
// After we've inserted all the full document chunks, drain the
460-
// remainder.
461-
for update in document_chunks.remainder() {
462-
let params = document_params(
412+
let insert_docs = async {
413+
if documents.is_empty() {
414+
return Ok(0);
415+
}
416+
let mut doc_params: [Vec<Param>; NUM_DOCUMENT_PARAMS] =
417+
array::from_fn(|_| Vec::with_capacity(documents.len()));
418+
for update in &documents {
419+
for (vec, param) in doc_params.iter_mut().zip(document_params(
463420
update.ts,
464421
update.id,
465422
&update.value,
466423
update.prev_ts,
467-
)?;
468-
let future = async {
469-
let timer = metrics::insert_one_document_timer();
470-
tx.execute_raw(&insert_document, params).await?;
471-
timer.finish();
472-
Ok::<_, anyhow::Error>(())
473-
};
474-
futures.push(Either::Left(Either::Right(future)));
424+
)?) {
425+
vec.push(param);
426+
}
475427
}
428+
tx.execute_raw(&insert_documents, doc_params).await
429+
};
476430

477-
let index_vec = indexes.into_iter().collect_vec();
478-
let mut index_chunks = index_vec.chunks_exact(CHUNK_SIZE);
479-
for chunk in &mut index_chunks {
480-
let mut params = Vec::with_capacity(chunk.len() * NUM_INDEX_PARAMS);
481-
for update in chunk {
482-
params.extend(index_params(update));
431+
let insert_idxs = async {
432+
if indexes.is_empty() {
433+
return Ok(0);
434+
}
435+
let mut idx_params: [Vec<Param>; NUM_INDEX_PARAMS] =
436+
array::from_fn(|_| Vec::with_capacity(indexes.len()));
437+
for update in &indexes {
438+
for (vec, param) in idx_params.iter_mut().zip(index_params(update)) {
439+
vec.push(param);
483440
}
484-
let future = async {
485-
let timer = metrics::insert_index_chunk_timer();
486-
tx.execute_raw(&insert_index_chunk, params).await?;
487-
timer.finish();
488-
Ok::<_, anyhow::Error>(())
489-
};
490-
futures.push(Either::Right(Either::Left(future)));
491441
}
442+
tx.execute_raw(&insert_indexes, idx_params).await
443+
};
492444

493-
// After we've inserted all the full index chunks, drain the remainder.
494-
for update in index_chunks.remainder() {
495-
let params = index_params(update);
496-
let future = async {
497-
let timer = metrics::insert_one_index_timer();
498-
tx.execute_raw(&insert_index, params).await?;
499-
timer.finish();
500-
Ok::<_, anyhow::Error>(())
501-
};
502-
futures.push(Either::Right(Either::Right(future)));
503-
}
445+
let timer = metrics::insert_timer();
446+
try_join!(insert_docs, insert_idxs)?;
447+
timer.finish();
504448

505-
// Wait on all of the futures in our `FuturesUnordered` to finish.
506-
while let Some(result) = futures.next().await {
507-
result?;
508-
}
509-
}
510449
Ok(())
511450
}
512451
.boxed()
@@ -1953,40 +1892,26 @@ SELECT id, ts, table_id, json_value, deleted, prev_ts
19531892

19541893
const INSERT_DOCUMENT: &str = r#"INSERT INTO @db_name.documents
19551894
(id, ts, table_id, json_value, deleted, prev_ts)
1956-
VALUES ($1, $2, $3, $4, $5, $6)
1895+
SELECT * FROM UNNEST(
1896+
$1::BYTEA[],
1897+
$2::BIGINT[],
1898+
$3::BYTEA[],
1899+
$4::BYTEA[],
1900+
$5::BOOLEAN[],
1901+
$6::BIGINT[]
1902+
)
19571903
"#;
19581904

19591905
const INSERT_OVERWRITE_DOCUMENT: &str = r#"INSERT INTO @db_name.documents
19601906
(id, ts, table_id, json_value, deleted, prev_ts)
1961-
VALUES ($1, $2, $3, $4, $5, $6)
1962-
ON CONFLICT (id, ts, table_id) DO UPDATE
1963-
SET deleted = excluded.deleted, json_value = excluded.json_value
1964-
"#;
1965-
1966-
const INSERT_DOCUMENT_CHUNK: &str = r#"INSERT INTO @db_name.documents
1967-
(id, ts, table_id, json_value, deleted, prev_ts)
1968-
VALUES
1969-
($1, $2, $3, $4, $5, $6),
1970-
($7, $8, $9, $10, $11, $12),
1971-
($13, $14, $15, $16, $17, $18),
1972-
($19, $20, $21, $22, $23, $24),
1973-
($25, $26, $27, $28, $29, $30),
1974-
($31, $32, $33, $34, $35, $36),
1975-
($37, $38, $39, $40, $41, $42),
1976-
($43, $44, $45, $46, $47, $48)
1977-
"#;
1978-
1979-
const INSERT_OVERWRITE_DOCUMENT_CHUNK: &str = r#"INSERT INTO @db_name.documents
1980-
(id, ts, table_id, json_value, deleted, prev_ts)
1981-
VALUES
1982-
($1, $2, $3, $4, $5, $6),
1983-
($7, $8, $9, $10, $11, $12),
1984-
($13, $14, $15, $16, $17, $18),
1985-
($19, $20, $21, $22, $23, $24),
1986-
($25, $26, $27, $28, $29, $30),
1987-
($31, $32, $33, $34, $35, $36),
1988-
($37, $38, $39, $40, $41, $42),
1989-
($43, $44, $45, $46, $47, $48)
1907+
SELECT * FROM UNNEST(
1908+
$1::BYTEA[],
1909+
$2::BIGINT[],
1910+
$3::BYTEA[],
1911+
$4::BYTEA[],
1912+
$5::BOOLEAN[],
1913+
$6::BIGINT[]
1914+
)
19901915
ON CONFLICT (id, ts, table_id) DO UPDATE
19911916
SET deleted = excluded.deleted, json_value = excluded.json_value
19921917
"#;
@@ -2011,7 +1936,16 @@ SELECT
20111936

20121937
const INSERT_INDEX: &str = r#"INSERT INTO @db_name.indexes
20131938
(index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id)
2014-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1939+
SELECT * FROM UNNEST(
1940+
$1::BYTEA[],
1941+
$2::BIGINT[],
1942+
$3::BYTEA[],
1943+
$4::BYTEA[],
1944+
$5::BYTEA[],
1945+
$6::BOOLEAN[],
1946+
$7::BYTEA[],
1947+
$8::BYTEA[]
1948+
)
20151949
"#;
20161950

20171951
// Note that on conflict, there's no need to update any of the columns that are
@@ -2020,7 +1954,16 @@ const INSERT_INDEX: &str = r#"INSERT INTO @db_name.indexes
20201954
// Only the fields that could have actually changed need to be updated.
20211955
const INSERT_OVERWRITE_INDEX: &str = r#"INSERT INTO @db_name.indexes
20221956
(index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id)
2023-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1957+
SELECT * FROM UNNEST(
1958+
$1::BYTEA[],
1959+
$2::BIGINT[],
1960+
$3::BYTEA[],
1961+
$4::BYTEA[],
1962+
$5::BYTEA[],
1963+
$6::BOOLEAN[],
1964+
$7::BYTEA[],
1965+
$8::BYTEA[]
1966+
)
20241967
ON CONFLICT ON CONSTRAINT indexes_pkey DO UPDATE
20251968
SET deleted = excluded.deleted, table_id = excluded.table_id, document_id = excluded.document_id
20261969
"#;
@@ -2045,34 +1988,6 @@ DELETE FROM @db_name.documents WHERE
20451988
(table_id = $1 AND id = $2 AND ts <= $3)
20461989
"#;
20471990

2048-
const INSERT_INDEX_CHUNK: &str = r#"INSERT INTO @db_name.indexes
2049-
(index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id)
2050-
VALUES
2051-
($1, $2, $3, $4, $5, $6, $7, $8),
2052-
($9, $10, $11, $12, $13, $14, $15, $16),
2053-
($17, $18, $19, $20, $21, $22, $23, $24),
2054-
($25, $26, $27, $28, $29, $30, $31, $32),
2055-
($33, $34, $35, $36, $37, $38, $39, $40),
2056-
($41, $42, $43, $44, $45, $46, $47, $48),
2057-
($49, $50, $51, $52, $53, $54, $55, $56),
2058-
($57, $58, $59, $60, $61, $62, $63, $64)
2059-
"#;
2060-
2061-
const INSERT_OVERWRITE_INDEX_CHUNK: &str = r#"INSERT INTO @db_name.indexes
2062-
(index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id)
2063-
VALUES
2064-
($1, $2, $3, $4, $5, $6, $7, $8),
2065-
($9, $10, $11, $12, $13, $14, $15, $16),
2066-
($17, $18, $19, $20, $21, $22, $23, $24),
2067-
($25, $26, $27, $28, $29, $30, $31, $32),
2068-
($33, $34, $35, $36, $37, $38, $39, $40),
2069-
($41, $42, $43, $44, $45, $46, $47, $48),
2070-
($49, $50, $51, $52, $53, $54, $55, $56),
2071-
($57, $58, $59, $60, $61, $62, $63, $64)
2072-
ON CONFLICT ON CONSTRAINT indexes_pkey DO UPDATE
2073-
SET deleted = excluded.deleted, table_id = excluded.table_id, document_id = excluded.document_id
2074-
"#;
2075-
20761991
const DELETE_INDEX_CHUNK: &str = r#"
20771992
/*+
20781993
Set(enable_seqscan OFF)

crates/postgres/src/metrics.rs

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -248,40 +248,15 @@ pub fn retention_validate_timer() -> StatusTimer {
248248
StatusTimer::new(&POSTGRES_RETENTION_VALIDATE_SECONDS)
249249
}
250250

251+
// This can't really be split between documents and indexes because of
252+
// pipelining
251253
register_convex_histogram!(
252-
POSTGRES_INSERT_CHUNK_SECONDS,
253-
"Time to insert a chunk of documents",
254+
POSTGRES_INSERT_SECONDS,
255+
"Time to insert documents & indexes",
254256
&STATUS_LABEL
255257
);
256-
pub fn insert_document_chunk_timer() -> StatusTimer {
257-
StatusTimer::new(&POSTGRES_INSERT_CHUNK_SECONDS)
258-
}
259-
260-
register_convex_histogram!(
261-
POSTGRES_INSERT_ONE_SECONDS,
262-
"Time to insert one document",
263-
&STATUS_LABEL
264-
);
265-
pub fn insert_one_document_timer() -> StatusTimer {
266-
StatusTimer::new(&POSTGRES_INSERT_ONE_SECONDS)
267-
}
268-
269-
register_convex_histogram!(
270-
POSTGRES_INSERT_INDEX_CHUNK_SECONDS,
271-
"Time to insert an index chunk",
272-
&STATUS_LABEL
273-
);
274-
pub fn insert_index_chunk_timer() -> StatusTimer {
275-
StatusTimer::new(&POSTGRES_INSERT_INDEX_CHUNK_SECONDS)
276-
}
277-
278-
register_convex_histogram!(
279-
POSTGRES_INSERT_ONE_INDEX_SECONDS,
280-
"Time to insert one index",
281-
&STATUS_LABEL
282-
);
283-
pub fn insert_one_index_timer() -> StatusTimer {
284-
StatusTimer::new(&POSTGRES_INSERT_ONE_INDEX_SECONDS)
258+
pub fn insert_timer() -> StatusTimer {
259+
StatusTimer::new(&POSTGRES_INSERT_SECONDS)
285260
}
286261

287262
register_convex_histogram!(

0 commit comments

Comments
 (0)