Skip to content

Commit 34b3fca

Browse files
committed
store: Kill and retry long running prune operations
Uses the same timeout as copying. If a prune operation now takes longer than `GRAPH_STORE_BATCH_TIMEOUT` it is aborted and retried with a batch size of 1
1 parent 553bf84 commit 34b3fca

File tree

3 files changed

+66
-30
lines changed

3 files changed

+66
-30
lines changed

docs/environment-variables.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,10 @@ those.
226226
copying or grafting should take. This limits how long transactions for
227227
such long running operations will be, and therefore helps control bloat
228228
in other tables. Value is in seconds and defaults to 180s.
229-
- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying or
230-
grafting is allowed to take at most. This is meant to guard against
231-
batches that are catastrophically big and should be set to a small
232-
multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that
229+
- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying,
230+
grafting, or pruning is allowed to take at most. This is meant to guard
231+
against batches that are catastrophically big and should be set to a
232+
small multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that
233233
value, and needs to be at least 2 times that value when set. If this
234234
timeout is hit, the batch size is reset to 1 so we can be sure that
235235
batches stay below `GRAPH_STORE_BATCH_TARGET_DURATION` and the smaller

store/postgres/src/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
6565
const REPLICATION_SLEEP: Duration = Duration::from_secs(10);
6666

6767
lazy_static! {
68-
static ref STATEMENT_TIMEOUT: Option<String> = ENV_VARS
68+
pub(crate) static ref BATCH_STATEMENT_TIMEOUT: Option<String> = ENV_VARS
6969
.store
7070
.batch_timeout
7171
.map(|duration| format!("set local statement_timeout={}", duration.as_millis()));
@@ -792,7 +792,7 @@ impl CopyTableWorker {
792792
}
793793

794794
match conn.transaction(|conn| {
795-
if let Some(timeout) = STATEMENT_TIMEOUT.as_ref() {
795+
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
796796
conn.batch_execute(timeout)?;
797797
}
798798
self.table.copy_batch(conn)

store/postgres/src/relational/prune.rs

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use graph::{
1818
use itertools::Itertools;
1919

2020
use crate::{
21-
catalog, deployment,
21+
catalog,
22+
copy::BATCH_STATEMENT_TIMEOUT,
23+
deployment,
2224
relational::{Table, VID_COLUMN},
2325
vid_batcher::{VidBatcher, VidRange},
2426
};
@@ -105,16 +107,15 @@ impl TablePair {
105107
tracker.start_copy_final(conn, &self.src, range)?;
106108

107109
while !batcher.finished() {
108-
let (_, rows) = batcher.step(|start, end| {
109-
conn.transaction(|conn| {
110-
// Page through all rows in `src` in batches of `batch_size`
111-
// and copy the ones that are visible to queries at block
112-
// heights between `earliest_block` and `final_block`, but
113-
// whose block_range does not extend past `final_block`
114-
// since they could still be reverted while we copy.
115-
// The conditions on `block_range` are expressed redundantly
116-
// to make more indexes useable
117-
sql_query(format!(
110+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
111+
// Page through all rows in `src` in batches of `batch_size`
112+
// and copy the ones that are visible to queries at block
113+
// heights between `earliest_block` and `final_block`, but
114+
// whose block_range does not extend past `final_block`
115+
// since they could still be reverted while we copy.
116+
// The conditions on `block_range` are expressed redundantly
117+
// to make more indexes useable
118+
sql_query(format!(
118119
"/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \
119120
insert into {dst}({column_list}) \
120121
select {column_list} from {src} \
@@ -128,13 +129,12 @@ impl TablePair {
128129
dst = self.dst.qualified_name,
129130
batch_size = end - start + 1,
130131
))
131-
.bind::<Integer, _>(earliest_block)
132-
.bind::<Integer, _>(final_block)
133-
.bind::<BigInt, _>(start)
134-
.bind::<BigInt, _>(end)
135-
.execute(conn)
136-
.map_err(StoreError::from)
137-
})
132+
.bind::<Integer, _>(earliest_block)
133+
.bind::<Integer, _>(final_block)
134+
.bind::<BigInt, _>(start)
135+
.bind::<BigInt, _>(end)
136+
.execute(conn)
137+
.map_err(StoreError::from)
138138
})?;
139139
let rows = rows.unwrap_or(0);
140140
tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?;
@@ -168,14 +168,13 @@ impl TablePair {
168168
tracker.start_copy_nonfinal(conn, &self.src, range)?;
169169

170170
while !batcher.finished() {
171-
let (_, rows) = batcher.step(|start, end| {
171+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
172172
// Page through all the rows in `src` in batches of
173173
// `batch_size` that are visible to queries at block heights
174174
// starting right after `final_block`. The conditions on
175175
// `block_range` are expressed redundantly to make more
176176
// indexes useable
177-
conn.transaction(|conn| {
178-
sql_query(format!(
177+
sql_query(format!(
179178
"/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \
180179
insert into {dst}({column_list}) \
181180
select {column_list} from {src} \
@@ -192,7 +191,6 @@ impl TablePair {
192191
.bind::<BigInt, _>(end)
193192
.execute(conn)
194193
.map_err(StoreError::from)
195-
})
196194
})?;
197195
let rows = rows.unwrap_or(0);
198196

@@ -460,7 +458,8 @@ impl Layout {
460458

461459
tracker.start_delete(conn, table, range, &batcher)?;
462460
while !batcher.finished() {
463-
let (_, rows) = batcher.step(|start, end| {sql_query(format!(
461+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
462+
sql_query(format!(
464463
"/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \
465464
delete from {qname} \
466465
where coalesce(upper(block_range), 2147483647) <= $1 \
@@ -471,7 +470,8 @@ impl Layout {
471470
.bind::<Integer, _>(req.earliest_block)
472471
.bind::<BigInt, _>(start)
473472
.bind::<BigInt, _>(end)
474-
.execute(conn).map_err(StoreError::from)})?;
473+
.execute(conn).map_err(StoreError::from)
474+
})?;
475475
let rows = rows.unwrap_or(0);
476476

477477
tracker.finish_batch(conn, table, -(rows as i64), &batcher)?;
@@ -501,6 +501,42 @@ impl Layout {
501501
}
502502
}
503503

504+
/// Perform a step with the `batcher`. If that step takes longer than
505+
/// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of
506+
/// the batcher to 1 and perform a step with that size which we assume takes
507+
/// less than `BATCH_STATEMENT_TIMEOUT`.
508+
///
509+
/// Doing this serves as a safeguard against very bad batch size estimations
510+
/// so that batches never take longer than `BATCH_SIZE_TIMEOUT`
511+
fn batch_with_timeout<F, T>(
512+
conn: &mut PgConnection,
513+
batcher: &mut VidBatcher,
514+
query: F,
515+
) -> Result<Option<T>, StoreError>
516+
where
517+
F: Fn(&mut PgConnection, i64, i64) -> Result<T, StoreError>,
518+
{
519+
let res = batcher
520+
.step(|start, end| {
521+
conn.transaction(|conn| {
522+
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
523+
conn.batch_execute(timeout)?;
524+
}
525+
query(conn, start, end)
526+
})
527+
})
528+
.map(|(_, res)| res);
529+
530+
if !matches!(res, Err(StoreError::StatementTimeout)) {
531+
return res;
532+
}
533+
534+
batcher.set_batch_size(1);
535+
batcher
536+
.step(|start, end| conn.transaction(|conn| query(conn, start, end)))
537+
.map(|(_, res)| res)
538+
}
539+
504540
mod status {
505541
use std::sync::Arc;
506542

0 commit comments

Comments
 (0)