Skip to content

Commit 8749f20

Browse files
committed
store: Move batching logic for copies into seperate struct
1 parent f2d5e44 commit 8749f20

File tree

3 files changed

+441
-87
lines changed

3 files changed

+441
-87
lines changed

store/postgres/src/catalog.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,3 +912,31 @@ fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result<bool, StoreError> {
912912

913913
Ok(sql_query(QUERY).get_result::<Ops>(conn)?.has_ops)
914914
}
915+
916+
pub(crate) fn histogram_bounds(
917+
conn: &mut PgConnection,
918+
namespace: &Namespace,
919+
table: &SqlName,
920+
column: &str,
921+
) -> Result<Vec<i64>, StoreError> {
922+
const QUERY: &str = "select histogram_bounds::text::int8[] bounds \
923+
from pg_stats \
924+
where schemaname = $1 \
925+
and tablename = $2 \
926+
and attname = $3";
927+
928+
#[derive(Queryable, QueryableByName)]
929+
struct Bounds {
930+
#[diesel(sql_type = Array<BigInt>)]
931+
bounds: Vec<i64>,
932+
}
933+
934+
sql_query(QUERY)
935+
.bind::<Text, _>(namespace.as_str())
936+
.bind::<Text, _>(table.as_str())
937+
.bind::<Text, _>(column)
938+
.get_result::<Bounds>(conn)
939+
.optional()
940+
.map(|bounds| bounds.map(|b| b.bounds).unwrap_or_default())
941+
.map_err(StoreError::from)
942+
}

store/postgres/src/copy.rs

Lines changed: 50 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ use diesel::{
2222
dsl::sql,
2323
insert_into,
2424
r2d2::{ConnectionManager, PooledConnection},
25-
select, sql_query,
26-
sql_types::{BigInt, Integer},
27-
update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
28-
RunQueryDsl,
25+
select, sql_query, update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection,
26+
QueryDsl, RunQueryDsl,
2927
};
3028
use graph::{
3129
constraint_violation,
@@ -39,7 +37,7 @@ use crate::{
3937
dynds::DataSourcesTable,
4038
primary::{DeploymentId, Site},
4139
relational::index::IndexList,
42-
vid_batcher::AdaptiveBatchSize,
40+
vid_batcher::{VidBatcher, VidRange},
4341
};
4442
use crate::{connection_pool::ConnectionPool, relational::Layout};
4543
use crate::{relational::Table, relational_queries as rq};
@@ -202,6 +200,7 @@ impl CopyState {
202200
TableState::init(
203201
conn,
204202
dst.site.clone(),
203+
&src,
205204
src_table.clone(),
206205
dst_table.clone(),
207206
&target_block,
@@ -217,9 +216,9 @@ impl CopyState {
217216
(
218217
cts::entity_type.eq(table.batch.dst.object.as_str()),
219218
cts::dst.eq(dst.site.id),
220-
cts::next_vid.eq(table.batch.next_vid),
221-
cts::target_vid.eq(table.batch.target_vid),
222-
cts::batch_size.eq(table.batch.batch_size.size),
219+
cts::next_vid.eq(table.batch.next_vid()),
220+
cts::target_vid.eq(table.batch.target_vid()),
221+
cts::batch_size.eq(table.batch.batch_size()),
223222
)
224223
})
225224
.collect::<Vec<_>>();
@@ -298,49 +297,42 @@ pub(crate) fn source(
298297
pub(crate) struct BatchCopy {
299298
src: Arc<Table>,
300299
dst: Arc<Table>,
301-
/// The `vid` of the next entity version that we will copy
302-
next_vid: i64,
303-
/// The last `vid` that should be copied
304-
target_vid: i64,
305-
batch_size: AdaptiveBatchSize,
300+
batcher: VidBatcher,
306301
}
307302

308303
impl BatchCopy {
309-
pub fn new(src: Arc<Table>, dst: Arc<Table>, first_vid: i64, last_vid: i64) -> Self {
310-
let batch_size = AdaptiveBatchSize::new(&dst);
311-
312-
Self {
313-
src,
314-
dst,
315-
next_vid: first_vid,
316-
target_vid: last_vid,
317-
batch_size,
318-
}
304+
pub fn new(batcher: VidBatcher, src: Arc<Table>, dst: Arc<Table>) -> Self {
305+
Self { src, dst, batcher }
319306
}
320307

321308
/// Copy one batch of entities and update internal state so that the
322309
/// next call to `run` will copy the next batch
323310
pub fn run(&mut self, conn: &mut PgConnection) -> Result<Duration, StoreError> {
324-
let start = Instant::now();
325-
326-
// Copy all versions with next_vid <= vid <= next_vid + batch_size - 1,
327-
// but do not go over target_vid
328-
let last_vid = (self.next_vid + self.batch_size.size - 1).min(self.target_vid);
329-
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, self.next_vid, last_vid)?
330-
.execute(conn)?;
311+
let (duration, _) = self.batcher.step(|start, end| {
312+
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
313+
.execute(conn)?;
314+
Ok(())
315+
})?;
331316

332-
let duration = start.elapsed();
317+
Ok(duration)
318+
}
333319

334-
// remember how far we got
335-
self.next_vid = last_vid + 1;
320+
pub fn finished(&self) -> bool {
321+
self.batcher.finished()
322+
}
336323

337-
self.batch_size.adapt(duration);
324+
/// The first `vid` that has not been copied yet
325+
pub fn next_vid(&self) -> i64 {
326+
self.batcher.next_vid()
327+
}
338328

339-
Ok(duration)
329+
/// The last `vid` that should be copied
330+
pub fn target_vid(&self) -> i64 {
331+
self.batcher.target_vid()
340332
}
341333

342-
pub fn finished(&self) -> bool {
343-
self.next_vid > self.target_vid
334+
pub fn batch_size(&self) -> i64 {
335+
self.batcher.batch_size() as i64
344336
}
345337
}
346338

@@ -354,38 +346,15 @@ impl TableState {
354346
fn init(
355347
conn: &mut PgConnection,
356348
dst_site: Arc<Site>,
349+
src_layout: &Layout,
357350
src: Arc<Table>,
358351
dst: Arc<Table>,
359352
target_block: &BlockPtr,
360353
) -> Result<Self, StoreError> {
361-
#[derive(QueryableByName)]
362-
struct VidRange {
363-
#[diesel(sql_type = BigInt)]
364-
min_vid: i64,
365-
#[diesel(sql_type = BigInt)]
366-
max_vid: i64,
367-
}
368-
369-
let max_block_clause = if src.immutable {
370-
"block$ <= $1"
371-
} else {
372-
"lower(block_range) <= $1"
373-
};
374-
let (next_vid, target_vid) = sql_query(format!(
375-
"select coalesce(min(vid), 0) as min_vid, \
376-
coalesce(max(vid), -1) as max_vid \
377-
from {} where {}",
378-
src.qualified_name.as_str(),
379-
max_block_clause
380-
))
381-
.bind::<Integer, _>(&target_block.number)
382-
.load::<VidRange>(conn)?
383-
.first()
384-
.map(|v| (v.min_vid, v.max_vid))
385-
.unwrap_or((0, -1));
386-
354+
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
355+
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
387356
Ok(Self {
388-
batch: BatchCopy::new(src, dst, next_vid, target_vid),
357+
batch: BatchCopy::new(batcher, src, dst),
389358
dst_site,
390359
duration_ms: 0,
391360
})
@@ -451,10 +420,14 @@ impl TableState {
451420
);
452421
match (src, dst) {
453422
(Ok(src), Ok(dst)) => {
454-
let mut batch = BatchCopy::new(src, dst, current_vid, target_vid);
455-
let batch_size = AdaptiveBatchSize { size };
456-
457-
batch.batch_size = batch_size;
423+
let batcher = VidBatcher::load(
424+
conn,
425+
&src_layout.site.namespace,
426+
&src,
427+
VidRange::new(current_vid, target_vid),
428+
)?
429+
.with_batch_size(size as usize);
430+
let batch = BatchCopy::new(batcher, src, dst);
458431

459432
Ok(TableState {
460433
batch,
@@ -493,8 +466,8 @@ impl TableState {
493466
.set(cts::started_at.eq(sql("now()")))
494467
.execute(conn)?;
495468
let values = (
496-
cts::next_vid.eq(self.batch.next_vid),
497-
cts::batch_size.eq(self.batch.batch_size.size),
469+
cts::next_vid.eq(self.batch.next_vid()),
470+
cts::batch_size.eq(self.batch.batch_size()),
498471
cts::duration_ms.eq(self.duration_ms),
499472
);
500473
update(
@@ -566,12 +539,12 @@ impl<'a> CopyProgress<'a> {
566539
let target_vid: i64 = state
567540
.tables
568541
.iter()
569-
.map(|table| table.batch.target_vid)
542+
.map(|table| table.batch.target_vid())
570543
.sum();
571544
let current_vid = state
572545
.tables
573546
.iter()
574-
.map(|table| table.batch.next_vid.min(table.batch.target_vid))
547+
.map(|table| table.batch.next_vid())
575548
.sum();
576549
Self {
577550
logger,
@@ -609,18 +582,18 @@ impl<'a> CopyProgress<'a> {
609582
info!(
610583
self.logger,
611584
"Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data",
612-
Self::progress_pct(batch.next_vid, batch.target_vid),
585+
Self::progress_pct(batch.next_vid(), batch.target_vid()),
613586
batch.dst.object,
614-
batch.next_vid,
615-
batch.target_vid,
616-
Self::progress_pct(self.current_vid + batch.next_vid, self.target_vid)
587+
batch.next_vid(),
588+
batch.target_vid(),
589+
Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid)
617590
);
618591
self.last_log = Instant::now();
619592
}
620593
}
621594

622595
fn table_finished(&mut self, batch: &BatchCopy) {
623-
self.current_vid += batch.next_vid;
596+
self.current_vid += batch.next_vid();
624597
}
625598

626599
fn finished(&self) {

0 commit comments

Comments
 (0)