Skip to content

Commit f2d5e44

Browse files
committed
store: Move AdaptiveBatchSize to its own module
1 parent a0860d4 commit f2d5e44

File tree

4 files changed

+69
-61
lines changed

4 files changed

+69
-61
lines changed

store/postgres/src/copy.rs

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,17 @@ use std::{
1919
};
2020

2121
use diesel::{
22-
deserialize::FromSql,
2322
dsl::sql,
2423
insert_into,
25-
pg::Pg,
2624
r2d2::{ConnectionManager, PooledConnection},
27-
select,
28-
serialize::{Output, ToSql},
29-
sql_query,
25+
select, sql_query,
3026
sql_types::{BigInt, Integer},
3127
update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
3228
RunQueryDsl,
3329
};
3430
use graph::{
3531
constraint_violation,
36-
prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError, ENV_VARS},
32+
prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError},
3733
schema::EntityType,
3834
};
3935
use itertools::Itertools;
@@ -43,17 +39,11 @@ use crate::{
4339
dynds::DataSourcesTable,
4440
primary::{DeploymentId, Site},
4541
relational::index::IndexList,
42+
vid_batcher::AdaptiveBatchSize,
4643
};
4744
use crate::{connection_pool::ConnectionPool, relational::Layout};
4845
use crate::{relational::Table, relational_queries as rq};
4946

50-
/// The initial batch size for tables that do not have an array column
51-
const INITIAL_BATCH_SIZE: i64 = 10_000;
52-
/// The initial batch size for tables that do have an array column; those
53-
/// arrays can be large and large arrays will slow down copying a lot. We
54-
/// therefore tread lightly in that case
55-
const INITIAL_BATCH_SIZE_LIST: i64 = 100;
56-
5747
const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);
5848

5949
/// If replicas are lagging by more than this, the copying code will pause
@@ -299,51 +289,6 @@ pub(crate) fn source(
299289
.map_err(StoreError::from)
300290
}
301291

302-
/// Track the desired size of a batch in such a way that doing the next
303-
/// batch gets close to TARGET_DURATION for the time it takes to copy one
304-
/// batch, but don't step up the size by more than 2x at once
305-
#[derive(Debug, Queryable)]
306-
pub(crate) struct AdaptiveBatchSize {
307-
pub size: i64,
308-
}
309-
310-
impl AdaptiveBatchSize {
311-
pub fn new(table: &Table) -> Self {
312-
let size = if table.columns.iter().any(|col| col.is_list()) {
313-
INITIAL_BATCH_SIZE_LIST
314-
} else {
315-
INITIAL_BATCH_SIZE
316-
};
317-
318-
Self { size }
319-
}
320-
321-
// adjust batch size by trying to extrapolate in such a way that we
322-
// get close to TARGET_DURATION for the time it takes to copy one
323-
// batch, but don't step up batch_size by more than 2x at once
324-
pub fn adapt(&mut self, duration: Duration) {
325-
// Avoid division by zero
326-
let duration = duration.as_millis().max(1);
327-
let new_batch_size = self.size as f64
328-
* ENV_VARS.store.batch_target_duration.as_millis() as f64
329-
/ duration as f64;
330-
self.size = (2 * self.size).min(new_batch_size.round() as i64);
331-
}
332-
}
333-
334-
impl ToSql<BigInt, Pg> for AdaptiveBatchSize {
335-
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
336-
<i64 as ToSql<BigInt, Pg>>::to_sql(&self.size, out)
337-
}
338-
}
339-
340-
impl FromSql<BigInt, Pg> for AdaptiveBatchSize {
341-
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
342-
let size = <i64 as FromSql<BigInt, Pg>>::from_sql(bytes)?;
343-
Ok(AdaptiveBatchSize { size })
344-
}
345-
}
346-
347292
/// A helper to copy entities from one table to another in batches that are
348293
/// small enough to not interfere with the rest of the operations happening
349294
/// in the database. The `src` and `dst` table must have the same structure

store/postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod store;
3636
mod store_events;
3737
mod subgraph_store;
3838
pub mod transaction_receipt;
39+
mod vid_batcher;
3940
mod writable;
4041

4142
pub mod graphman;

store/postgres/src/relational/prune.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ use graph::{
1818
use itertools::Itertools;
1919

2020
use crate::{
21-
catalog,
22-
copy::AdaptiveBatchSize,
23-
deployment,
21+
catalog, deployment,
2422
relational::{Table, VID_COLUMN},
23+
vid_batcher::AdaptiveBatchSize,
2524
};
2625

2726
use super::{Catalog, Layout, Namespace};

store/postgres/src/vid_batcher.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::time::Duration;
2+
3+
use diesel::{
4+
deserialize::FromSql,
5+
pg::Pg,
6+
serialize::{Output, ToSql},
7+
sql_types::BigInt,
8+
};
9+
use graph::env::ENV_VARS;
10+
11+
use crate::relational::Table;
12+
13+
/// The initial batch size for tables that do not have an array column
14+
const INITIAL_BATCH_SIZE: i64 = 10_000;
15+
/// The initial batch size for tables that do have an array column; those
16+
/// arrays can be large and large arrays will slow down copying a lot. We
17+
/// therefore tread lightly in that case
18+
const INITIAL_BATCH_SIZE_LIST: i64 = 100;
19+
20+
/// Track the desired size of a batch in such a way that doing the next
21+
/// batch gets close to TARGET_DURATION for the time it takes to copy one
22+
/// batch, but don't step up the size by more than 2x at once
23+
#[derive(Debug, Queryable)]
24+
pub(crate) struct AdaptiveBatchSize {
25+
pub size: i64,
26+
}
27+
28+
impl AdaptiveBatchSize {
29+
pub fn new(table: &Table) -> Self {
30+
let size = if table.columns.iter().any(|col| col.is_list()) {
31+
INITIAL_BATCH_SIZE_LIST
32+
} else {
33+
INITIAL_BATCH_SIZE
34+
};
35+
36+
Self { size }
37+
}
38+
39+
// adjust batch size by trying to extrapolate in such a way that we
40+
// get close to TARGET_DURATION for the time it takes to copy one
41+
// batch, but don't step up batch_size by more than 2x at once
42+
pub fn adapt(&mut self, duration: Duration) {
43+
// Avoid division by zero
44+
let duration = duration.as_millis().max(1);
45+
let new_batch_size = self.size as f64
46+
* ENV_VARS.store.batch_target_duration.as_millis() as f64
47+
/ duration as f64;
48+
self.size = (2 * self.size).min(new_batch_size.round() as i64);
49+
}
50+
}
51+
52+
impl ToSql<BigInt, Pg> for AdaptiveBatchSize {
53+
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
54+
<i64 as ToSql<BigInt, Pg>>::to_sql(&self.size, out)
55+
}
56+
}
57+
58+
impl FromSql<BigInt, Pg> for AdaptiveBatchSize {
59+
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
60+
let size = <i64 as FromSql<BigInt, Pg>>::from_sql(bytes)?;
61+
Ok(AdaptiveBatchSize { size })
62+
}
63+
}

0 commit comments

Comments
 (0)