Skip to content

Commit 63a8989

Browse files
authored
Merge pull request #431 from superfly/gorbak/burstable-change-processing
Burstable change processing
2 parents f06b591 + 1b5afd5 commit 63a8989

File tree

8 files changed

+648
-144
lines changed

8 files changed

+648
-144
lines changed

crates/corro-agent/src/agent/handlers.rs

Lines changed: 563 additions & 122 deletions
Large diffs are not rendered by default.

crates/corro-agent/src/agent/reaper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ mod tests {
218218

219219
async fn setup_test_db(path: std::path::PathBuf) -> eyre::Result<SplitPool> {
220220
let write_sema = Arc::new(Semaphore::new(1));
221-
let pool = SplitPool::create(path, write_sema).await?;
221+
let pool = SplitPool::create(path, write_sema, -1048576).await?;
222222

223223
// Create test table
224224
let conn = pool.write_priority().await?;

crates/corro-agent/src/agent/setup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
9393

9494
let write_sema = Arc::new(Semaphore::new(1));
9595

96-
let pool = SplitPool::create(&conf.db.path, write_sema.clone()).await?;
96+
let pool = SplitPool::create(&conf.db.path, write_sema.clone(), conf.db.cache_size_kib).await?;
9797

9898
let clock = Arc::new(
9999
uhlc::HLCBuilder::default()

crates/corro-agent/src/agent/util.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,11 +1096,15 @@ pub async fn process_multiple_changes(
10961096

10971097
let mut change_chunk_size = 0;
10981098

1099-
for (_actor_id, changeset, db_version, _src) in changesets {
1099+
for (_actor_id, changeset, _db_version, _src) in &changesets {
11001100
change_chunk_size += changeset.len();
1101-
match_changes(agent.subs_manager(), &changeset, db_version);
1102-
match_changes(agent.updates_manager(), &changeset, db_version);
11031101
}
1102+
tokio::spawn(async move {
1103+
for (_actor_id, changeset, db_version, _src) in changesets {
1104+
match_changes(agent.subs_manager(), &changeset, db_version);
1105+
match_changes(agent.updates_manager(), &changeset, db_version);
1106+
}
1107+
});
11041108

11051109
histogram!("corro.agent.changes.processing.time.seconds", "source" => "remote")
11061110
.record(start.elapsed());

crates/corro-types/src/agent.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ pub struct SplitPool(Arc<SplitPoolInner>);
458458
struct SplitPoolInner {
459459
path: PathBuf,
460460
write_sema: Arc<Semaphore>,
461+
cache_size_kib: i64,
461462

462463
read: SqlitePool,
463464
write: SqlitePool,
@@ -511,10 +512,11 @@ impl SplitPool {
511512
pub async fn create<P: AsRef<Path>>(
512513
path: P,
513514
write_sema: Arc<Semaphore>,
515+
cache_size_kib: i64,
514516
) -> Result<Self, SplitPoolCreateError> {
515517
let rw_pool = sqlite_pool::Config::new(path.as_ref())
516518
.max_size(1)
517-
.create_pool_transform(rusqlite_to_crsqlite_write)?;
519+
.create_pool_transform(move |conn| rusqlite_to_crsqlite_write(conn, cache_size_kib))?;
518520

519521
debug!("built RW pool");
520522

@@ -527,12 +529,19 @@ impl SplitPool {
527529
Ok(Self::new(
528530
path.as_ref().to_owned(),
529531
write_sema,
532+
cache_size_kib,
530533
ro_pool,
531534
rw_pool,
532535
))
533536
}
534537

535-
fn new(path: PathBuf, write_sema: Arc<Semaphore>, read: SqlitePool, write: SqlitePool) -> Self {
538+
fn new(
539+
path: PathBuf,
540+
write_sema: Arc<Semaphore>,
541+
cache_size_kib: i64,
542+
read: SqlitePool,
543+
write: SqlitePool,
544+
) -> Self {
536545
let (priority_tx, mut priority_rx) = bounded(256, "priority");
537546
let (normal_tx, mut normal_rx) = bounded(512, "normal");
538547
let (low_tx, mut low_rx) = bounded(1024, "low");
@@ -554,6 +563,7 @@ impl SplitPool {
554563
Self(Arc::new(SplitPoolInner {
555564
path,
556565
write_sema,
566+
cache_size_kib,
557567
read,
558568
write,
559569
priority_tx,
@@ -603,7 +613,7 @@ impl SplitPool {
603613
#[tracing::instrument(skip(self), level = "debug")]
604614
pub fn client_dedicated(&self) -> rusqlite::Result<CrConn> {
605615
let conn = rusqlite::Connection::open(&self.0.path)?;
606-
let cr_conn = rusqlite_to_crsqlite_write(conn)?;
616+
let cr_conn = rusqlite_to_crsqlite_write(conn, self.0.cache_size_kib)?;
607617
trace_heavy_queries(cr_conn.conn())?;
608618
Ok(cr_conn)
609619
}

crates/corro-types/src/config.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,26 @@ pub const DEFAULT_MAX_SYNC_BACKOFF: u32 = 2;
1414
#[cfg(not(test))]
1515
pub const DEFAULT_MAX_SYNC_BACKOFF: u32 = 15;
1616

17-
const fn default_apply_queue() -> usize {
17+
const fn default_apply_batch_min() -> usize {
1818
100
1919
}
2020

21+
const fn default_apply_batch_step() -> usize {
22+
500
23+
}
24+
25+
const fn default_apply_batch_max() -> usize {
26+
16_000
27+
}
28+
29+
const fn default_batch_threshold_ratio() -> f64 {
30+
0.9
31+
}
32+
33+
const fn default_cache_size_kib() -> i64 {
34+
-1048576 // 1 GB (negative value means KiB)
35+
}
36+
2137
const fn default_reaper_interval() -> usize {
2238
3600
2339
}
@@ -129,6 +145,11 @@ pub struct DbConfig {
129145
pub schema_paths: Vec<Utf8PathBuf>,
130146
#[serde(default)]
131147
pub subscriptions_path: Option<Utf8PathBuf>,
148+
/// SQLite page cache size in KiB for writes (negative value).
149+
/// Default: -1048576 (1 GB). Larger values improve write performance but use more RAM.
150+
/// WARNING: Setting this too low (<100MB) can severely degrade performance.
151+
#[serde(default = "default_cache_size_kib")]
152+
pub cache_size_kib: i64,
132153
}
133154

134155
impl DbConfig {
@@ -227,20 +248,34 @@ pub struct PerfConfig {
227248
pub bcast_channel_len: usize,
228249
#[serde(default = "default_small_channel")]
229250
pub foca_channel_len: usize,
230-
#[serde(default = "default_apply_timeout")]
231-
pub apply_queue_timeout: usize,
232-
#[serde(default = "default_apply_queue")]
233-
pub apply_queue_len: usize,
234251
#[serde(default = "default_wal_threshold")]
235252
pub wal_threshold_mb: usize,
236-
#[serde(default = "default_processing_queue")]
237-
pub processing_queue_len: usize,
238253
#[serde(default = "default_sql_tx_timeout")]
239254
pub sql_tx_timeout: usize,
240255
#[serde(default = "default_min_sync_backoff")]
241256
pub min_sync_backoff: u32,
242257
#[serde(default = "default_max_sync_backoff")]
243258
pub max_sync_backoff: u32,
259+
// How many unapplied changesets corrosion will buffer before starting to drop them
260+
#[serde(default = "default_processing_queue")]
261+
pub processing_queue_len: usize,
262+
// How many ms corrosion will wait before proceeding to apply a batch of changes
263+
// We wait either for apply_queue_timeout or untill at least apply_queue_min_batch_size changes accumulate
264+
#[serde(default = "default_apply_timeout")]
265+
pub apply_queue_timeout: usize,
266+
// Minimum amount of changes corrosion will try to apply at once in the same transaction
267+
#[serde(default = "default_apply_batch_min")]
268+
pub apply_queue_min_batch_size: usize,
269+
// batch_size = clamp(min_batch_size, step_base * 2 ** floor(log2(x/step_base)), max_batch_size)
270+
#[serde(default = "default_apply_batch_step")]
271+
pub apply_queue_step_base: usize,
272+
// Maximum amount of changes corrosion will try to apply at once in the same transaction
273+
#[serde(default = "default_apply_batch_max")]
274+
pub apply_queue_max_batch_size: usize,
275+
// Threshold ratio (0.0-1.0) for immediate batch spawning when queue reaches this fraction of batch size
276+
// It's used to decide whether to wait for more changes for apply_queue_timeout ms or spawn a batch immediately
277+
#[serde(default = "default_batch_threshold_ratio")]
278+
pub apply_queue_batch_threshold_ratio: f64,
244279
}
245280

246281
impl Default for PerfConfig {
@@ -255,13 +290,16 @@ impl Default for PerfConfig {
255290
clearbuf_channel_len: default_mid_channel(),
256291
bcast_channel_len: default_mid_channel(),
257292
foca_channel_len: default_small_channel(),
258-
apply_queue_timeout: default_apply_timeout(),
259-
apply_queue_len: default_apply_queue(),
260293
wal_threshold_mb: default_wal_threshold(),
261-
processing_queue_len: default_processing_queue(),
262294
sql_tx_timeout: default_sql_tx_timeout(),
263295
min_sync_backoff: default_min_sync_backoff(),
264296
max_sync_backoff: default_max_sync_backoff(),
297+
processing_queue_len: default_processing_queue(),
298+
apply_queue_timeout: default_apply_timeout(),
299+
apply_queue_min_batch_size: default_apply_batch_min(),
300+
apply_queue_step_base: default_apply_batch_step(),
301+
apply_queue_max_batch_size: default_apply_batch_max(),
302+
apply_queue_batch_threshold_ratio: default_batch_threshold_ratio(),
265303
}
266304
}
267305
}
@@ -446,6 +484,7 @@ impl ConfigBuilder {
446484
path: db_path,
447485
schema_paths: self.schema_paths,
448486
subscriptions_path: None,
487+
cache_size_kib: default_cache_size_kib(),
449488
},
450489
api: ApiConfig {
451490
bind_addr: self.api_addr,

crates/corro-types/src/pubsub.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2524,7 +2524,7 @@ mod tests {
25242524
let subscriptions_path: Utf8PathBuf =
25252525
tmpdir.path().join("subs").display().to_string().into();
25262526

2527-
let pool = SplitPool::create(db_path, Arc::new(Semaphore::new(1))).await?;
2527+
let pool = SplitPool::create(db_path, Arc::new(Semaphore::new(1)), -1048576).await?;
25282528
let clock = Arc::new(uhlc::HLC::default());
25292529

25302530
{
@@ -2644,7 +2644,7 @@ mod tests {
26442644
let subscriptions_path: Utf8PathBuf =
26452645
tmpdir.path().join("subs").display().to_string().into();
26462646

2647-
let pool = SplitPool::create(&db_path, Arc::new(Semaphore::new(1)))
2647+
let pool = SplitPool::create(&db_path, Arc::new(Semaphore::new(1)), -1048576)
26482648
.await
26492649
.unwrap();
26502650
let mut conn = pool.write_priority().await.unwrap();

crates/corro-types/src/sqlite.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,19 @@ static CRSQL_EXT_DIR: Lazy<TempDir> = Lazy::new(|| {
181181
dir
182182
});
183183

184-
pub fn rusqlite_to_crsqlite_write(conn: rusqlite::Connection) -> rusqlite::Result<CrConn> {
184+
pub fn rusqlite_to_crsqlite_write(
185+
conn: rusqlite::Connection,
186+
cache_size_kib: i64,
187+
) -> rusqlite::Result<CrConn> {
185188
let conn = rusqlite_to_crsqlite(conn)?;
186-
conn.execute_batch("PRAGMA cache_size = -32000;")?;
189+
conn.execute_batch(&format!(
190+
"
191+
PRAGMA cache_size = {};
192+
PRAGMA temp_store = MEMORY;
193+
PRAGMA cache_spill = FALSE;
194+
",
195+
cache_size_kib
196+
))?;
187197

188198
Ok(conn)
189199
}

0 commit comments

Comments
 (0)