Skip to content

Commit c4844ce

Browse files
committed
store: Use VidRange for pruning
1 parent d63782c commit c4844ce

File tree

2 files changed

+41
-55
lines changed

2 files changed

+41
-55
lines changed

store/postgres/src/relational/prune.rs

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,48 +20,11 @@ use itertools::Itertools;
2020
use crate::{
2121
catalog, deployment,
2222
relational::{Table, VID_COLUMN},
23-
vid_batcher::AdaptiveBatchSize,
23+
vid_batcher::{AdaptiveBatchSize, VidRange},
2424
};
2525

2626
use super::{Catalog, Layout, Namespace};
2727

28-
// Additions to `Table` that are useful for pruning
29-
impl Table {
30-
/// Return the first and last vid of any entity that is visible in the
31-
/// block range from `first_block` (inclusive) to `last_block`
32-
/// (exclusive)
33-
fn vid_range(
34-
&self,
35-
conn: &mut PgConnection,
36-
first_block: BlockNumber,
37-
last_block: BlockNumber,
38-
) -> Result<(i64, i64), StoreError> {
39-
#[derive(QueryableByName)]
40-
struct VidRange {
41-
#[diesel(sql_type = BigInt)]
42-
min_vid: i64,
43-
#[diesel(sql_type = BigInt)]
44-
max_vid: i64,
45-
}
46-
47-
// Determine the last vid that we need to copy
48-
let VidRange { min_vid, max_vid } = sql_query(format!(
49-
"/* controller=prune,first={first_block},last={last_block} */ \
50-
select coalesce(min(vid), 0) as min_vid, \
51-
coalesce(max(vid), -1) as max_vid from {src} \
52-
where lower(block_range) <= $2 \
53-
and coalesce(upper(block_range), 2147483647) > $1 \
54-
and coalesce(upper(block_range), 2147483647) <= $2 \
55-
and block_range && int4range($1, $2)",
56-
src = self.qualified_name,
57-
))
58-
.bind::<Integer, _>(first_block)
59-
.bind::<Integer, _>(last_block)
60-
.get_result::<VidRange>(conn)?;
61-
Ok((min_vid, max_vid))
62-
}
63-
}
64-
6528
/// Utility to copy relevant data out of a source table and into a new
6629
/// destination table and replace the source table with the destination
6730
/// table
@@ -122,12 +85,12 @@ impl TablePair {
12285
let column_list = self.column_list();
12386

12487
// Determine the last vid that we need to copy
125-
let (min_vid, max_vid) = self.src.vid_range(conn, earliest_block, final_block)?;
88+
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;
12689

12790
let mut batch_size = AdaptiveBatchSize::new(&self.src);
12891
// The first vid we still need to copy
129-
let mut next_vid = min_vid;
130-
while next_vid <= max_vid {
92+
let mut next_vid = range.min;
93+
while next_vid <= range.max {
13194
let start = Instant::now();
13295
let rows = conn.transaction(|conn| {
13396
// Page through all rows in `src` in batches of `batch_size`
@@ -167,7 +130,7 @@ impl TablePair {
167130
self.src.name.as_str(),
168131
rows,
169132
PrunePhase::CopyFinal,
170-
next_vid > max_vid,
133+
next_vid > range.max,
171134
);
172135
}
173136
Ok(())
@@ -185,14 +148,12 @@ impl TablePair {
185148
let column_list = self.column_list();
186149

187150
// Determine the last vid that we need to copy
188-
let (min_vid, max_vid) = self
189-
.src
190-
.vid_range(conn, final_block + 1, BLOCK_NUMBER_MAX)?;
151+
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;
191152

192153
let mut batch_size = AdaptiveBatchSize::new(&self.src);
193154
// The first vid we still need to copy
194-
let mut next_vid = min_vid;
195-
while next_vid <= max_vid {
155+
let mut next_vid = range.min;
156+
while next_vid <= range.max {
196157
let start = Instant::now();
197158
let rows = conn.transaction(|conn| {
198159
// Page through all the rows in `src` in batches of
@@ -227,7 +188,7 @@ impl TablePair {
227188
self.src.name.as_str(),
228189
rows,
229190
PrunePhase::CopyNonfinal,
230-
next_vid > max_vid,
191+
next_vid > range.max,
231192
);
232193
}
233194
Ok(())
@@ -459,10 +420,10 @@ impl Layout {
459420
PruningStrategy::Delete => {
460421
// Delete all entity versions whose range was closed
461422
// before `req.earliest_block`
462-
let (min_vid, max_vid) = table.vid_range(conn, 0, req.earliest_block)?;
423+
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
463424
let mut batch_size = AdaptiveBatchSize::new(&table);
464-
let mut next_vid = min_vid;
465-
while next_vid <= max_vid {
425+
let mut next_vid = range.min;
426+
while next_vid <= range.max {
466427
let start = Instant::now();
467428
let rows = sql_query(format!(
468429
"/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \
@@ -485,7 +446,7 @@ impl Layout {
485446
table.name.as_str(),
486447
rows as usize,
487448
PrunePhase::Delete,
488-
next_vid > max_vid,
449+
next_vid > range.max,
489450
);
490451
}
491452
}

store/postgres/src/vid_batcher.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use diesel::{
1010
};
1111
use graph::{
1212
env::ENV_VARS,
13-
prelude::{BlockPtr, StoreError},
13+
prelude::{BlockNumber, BlockPtr, StoreError},
1414
util::ogive::Ogive,
1515
};
1616

@@ -251,9 +251,9 @@ impl VidBatcher {
251251
#[derive(Copy, Clone, QueryableByName)]
252252
pub(crate) struct VidRange {
253253
#[diesel(sql_type = BigInt, column_name = "min_vid")]
254-
min: i64,
254+
pub min: i64,
255255
#[diesel(sql_type = BigInt, column_name = "max_vid")]
256-
max: i64,
256+
pub max: i64,
257257
}
258258

259259
const EMPTY_VID_RANGE: VidRange = VidRange { max: -1, min: 0 };
@@ -300,6 +300,31 @@ impl VidRange {
300300
.unwrap_or(EMPTY_VID_RANGE);
301301
Ok(vid_range)
302302
}
303+
304+
/// Return the first and last vid of any entity that is visible in the
305+
/// block range from `first_block` (inclusive) to `last_block`
306+
/// (exclusive)
307+
pub fn for_prune(
308+
conn: &mut PgConnection,
309+
src: &Table,
310+
first_block: BlockNumber,
311+
last_block: BlockNumber,
312+
) -> Result<Self, StoreError> {
313+
sql_query(format!(
314+
"/* controller=prune,first={first_block},last={last_block} */ \
315+
select coalesce(min(vid), 0) as min_vid, \
316+
coalesce(max(vid), -1) as max_vid from {src} \
317+
where lower(block_range) <= $2 \
318+
and coalesce(upper(block_range), 2147483647) > $1 \
319+
and coalesce(upper(block_range), 2147483647) <= $2 \
320+
and block_range && int4range($1, $2)",
321+
src = src.qualified_name,
322+
))
323+
.bind::<Integer, _>(first_block)
324+
.bind::<Integer, _>(last_block)
325+
.get_result::<VidRange>(conn)
326+
.map_err(StoreError::from)
327+
}
303328
}
304329

305330
#[cfg(test)]

0 commit comments

Comments
 (0)