Skip to content

Commit 5f648ff

Browse files
committed
store: Use VidBatcher to batch pruning queries
1 parent c4844ce commit 5f648ff

File tree

1 file changed

+61
-75
lines changed

1 file changed

+61
-75
lines changed

store/postgres/src/relational/prune.rs

Lines changed: 61 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Write, sync::Arc, time::Instant};
1+
use std::{fmt::Write, sync::Arc};
22

33
use diesel::{
44
connection::SimpleConnection,
@@ -20,7 +20,7 @@ use itertools::Itertools;
2020
use crate::{
2121
catalog, deployment,
2222
relational::{Table, VID_COLUMN},
23-
vid_batcher::{AdaptiveBatchSize, VidRange},
23+
vid_batcher::{VidBatcher, VidRange},
2424
};
2525

2626
use super::{Catalog, Layout, Namespace};
@@ -86,51 +86,47 @@ impl TablePair {
8686

8787
// Determine the last vid that we need to copy
8888
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;
89-
90-
let mut batch_size = AdaptiveBatchSize::new(&self.src);
91-
// The first vid we still need to copy
92-
let mut next_vid = range.min;
93-
while next_vid <= range.max {
94-
let start = Instant::now();
95-
let rows = conn.transaction(|conn| {
96-
// Page through all rows in `src` in batches of `batch_size`
97-
// and copy the ones that are visible to queries at block
98-
// heights between `earliest_block` and `final_block`, but
99-
// whose block_range does not extend past `final_block`
100-
// since they could still be reverted while we copy.
101-
// The conditions on `block_range` are expressed redundantly
102-
// to make more indexes useable
103-
sql_query(format!(
104-
"/* controller=prune,phase=final,start_vid={next_vid},batch_size={batch_size} */ \
89+
let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?;
90+
91+
while !batcher.finished() {
92+
let (_, rows) = batcher.step(|start, end| {
93+
conn.transaction(|conn| {
94+
// Page through all rows in `src` in batches of `batch_size`
95+
// and copy the ones that are visible to queries at block
96+
// heights between `earliest_block` and `final_block`, but
97+
// whose block_range does not extend past `final_block`
98+
// since they could still be reverted while we copy.
99+
// The conditions on `block_range` are expressed redundantly
100+
// to make more indexes useable
101+
sql_query(format!(
102+
"/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \
105103
insert into {dst}({column_list}) \
106104
select {column_list} from {src} \
107105
where lower(block_range) <= $2 \
108106
and coalesce(upper(block_range), 2147483647) > $1 \
109107
and coalesce(upper(block_range), 2147483647) <= $2 \
110108
and block_range && int4range($1, $2, '[]') \
111-
and vid >= $3 and vid < $3 + $4 \
109+
and vid >= $3 and vid <= $4 \
112110
order by vid",
113111
src = self.src.qualified_name,
114112
dst = self.dst.qualified_name,
115-
batch_size = batch_size.size,
113+
batch_size = end - start + 1,
116114
))
117-
.bind::<Integer, _>(earliest_block)
118-
.bind::<Integer, _>(final_block)
119-
.bind::<BigInt, _>(next_vid)
120-
.bind::<BigInt, _>(&batch_size)
121-
.execute(conn)
115+
.bind::<Integer, _>(earliest_block)
116+
.bind::<Integer, _>(final_block)
117+
.bind::<BigInt, _>(start)
118+
.bind::<BigInt, _>(end)
119+
.execute(conn)
120+
.map_err(StoreError::from)
121+
})
122122
})?;
123123
cancel.check_cancel()?;
124124

125-
next_vid += batch_size.size;
126-
127-
batch_size.adapt(start.elapsed());
128-
129125
reporter.prune_batch(
130126
self.src.name.as_str(),
131-
rows,
127+
rows.unwrap_or(0),
132128
PrunePhase::CopyFinal,
133-
next_vid > range.max,
129+
batcher.finished(),
134130
);
135131
}
136132
Ok(())
@@ -149,46 +145,41 @@ impl TablePair {
149145

150146
// Determine the last vid that we need to copy
151147
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;
148+
let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?;
152149

153-
let mut batch_size = AdaptiveBatchSize::new(&self.src);
154-
// The first vid we still need to copy
155-
let mut next_vid = range.min;
156-
while next_vid <= range.max {
157-
let start = Instant::now();
158-
let rows = conn.transaction(|conn| {
150+
while !batcher.finished() {
151+
let (_, rows) = batcher.step(|start, end| {
159152
// Page through all the rows in `src` in batches of
160153
// `batch_size` that are visible to queries at block heights
161-
// starting right after `final_block`.
162-
// The conditions on `block_range` are expressed redundantly
163-
// to make more indexes useable
164-
sql_query(format!(
165-
"/* controller=prune,phase=nonfinal,start_vid={next_vid},batch_size={batch_size} */ \
154+
// starting right after `final_block`. The conditions on
155+
// `block_range` are expressed redundantly to make more
156+
// indexes useable
157+
conn.transaction(|conn| {
158+
sql_query(format!(
159+
"/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \
166160
insert into {dst}({column_list}) \
167161
select {column_list} from {src} \
168162
where coalesce(upper(block_range), 2147483647) > $1 \
169163
and block_range && int4range($1, null) \
170-
and vid >= $2 and vid < $2 + $3 \
164+
and vid >= $2 and vid <= $3 \
171165
order by vid",
172-
dst = self.dst.qualified_name,
173-
src = self.src.qualified_name,
174-
batch_size = batch_size.size
175-
))
176-
.bind::<Integer, _>(final_block)
177-
.bind::<BigInt, _>(next_vid)
178-
.bind::<BigInt, _>(&batch_size)
179-
.execute(conn)
180-
.map_err(StoreError::from)
166+
dst = self.dst.qualified_name,
167+
src = self.src.qualified_name,
168+
batch_size = end - start + 1,
169+
))
170+
.bind::<Integer, _>(final_block)
171+
.bind::<BigInt, _>(start)
172+
.bind::<BigInt, _>(end)
173+
.execute(conn)
174+
.map_err(StoreError::from)
175+
})
181176
})?;
182177

183-
next_vid += batch_size.size;
184-
185-
batch_size.adapt(start.elapsed());
186-
187178
reporter.prune_batch(
188179
self.src.name.as_str(),
189-
rows,
180+
rows.unwrap_or(0),
190181
PrunePhase::CopyNonfinal,
191-
next_vid > range.max,
182+
batcher.finished(),
192183
);
193184
}
194185
Ok(())
@@ -421,32 +412,27 @@ impl Layout {
421412
// Delete all entity versions whose range was closed
422413
// before `req.earliest_block`
423414
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
424-
let mut batch_size = AdaptiveBatchSize::new(&table);
425-
let mut next_vid = range.min;
426-
while next_vid <= range.max {
427-
let start = Instant::now();
428-
let rows = sql_query(format!(
429-
"/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \
415+
let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?;
416+
417+
while !batcher.finished() {
418+
let (_, rows) = batcher.step(|start, end| {sql_query(format!(
419+
"/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \
430420
delete from {qname} \
431421
where coalesce(upper(block_range), 2147483647) <= $1 \
432-
and vid >= $2 and vid < $2 + $3",
422+
and vid >= $2 and vid <= $3",
433423
qname = table.qualified_name,
434-
batch_size = batch_size.size
424+
batch_size = end - start + 1
435425
))
436426
.bind::<Integer, _>(req.earliest_block)
437-
.bind::<BigInt, _>(next_vid)
438-
.bind::<BigInt, _>(&batch_size)
439-
.execute(conn)?;
440-
441-
next_vid += batch_size.size;
442-
443-
batch_size.adapt(start.elapsed());
427+
.bind::<BigInt, _>(start)
428+
.bind::<BigInt, _>(end)
429+
.execute(conn).map_err(StoreError::from)})?;
444430

445431
reporter.prune_batch(
446432
table.name.as_str(),
447-
rows as usize,
433+
rows.unwrap_or(0),
448434
PrunePhase::Delete,
449-
next_vid > range.max,
435+
batcher.finished(),
450436
);
451437
}
452438
}

0 commit comments

Comments
 (0)