Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
688 changes: 566 additions & 122 deletions crates/corro-agent/src/agent/handlers.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ mod tests {

async fn setup_test_db(path: std::path::PathBuf) -> eyre::Result<SplitPool> {
let write_sema = Arc::new(Semaphore::new(1));
let pool = SplitPool::create(path, write_sema).await?;
let pool = SplitPool::create(path, write_sema, -1048576).await?;

// Create test table
let conn = pool.write_priority().await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let write_sema = Arc::new(Semaphore::new(1));

let pool = SplitPool::create(&conf.db.path, write_sema.clone()).await?;
let pool = SplitPool::create(&conf.db.path, write_sema.clone(), conf.db.cache_size_kib).await?;

let clock = Arc::new(
uhlc::HLCBuilder::default()
Expand Down
32 changes: 29 additions & 3 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ pub async fn process_multiple_changes(
warn!("process_multiple_changes: removing duplicates took too long - {elapsed:?}");
}

// Deduplicate
let phase1_t = start.elapsed();
let t2 = Instant::now();

let mut conn = agent.pool().write_normal().await?;

let changesets = block_in_place(|| {
Expand Down Expand Up @@ -1094,18 +1098,40 @@ pub async fn process_multiple_changes(
Ok::<_, ChangeError>(changesets)
})?;

// block_in place db + bookie stuff
let phase2_t = t2.elapsed();
let t3 = Instant::now();

let mut change_chunk_size = 0;

for (_actor_id, changeset, db_version, _src) in changesets {
for (_actor_id, changeset, _db_version, _src) in &changesets {
change_chunk_size += changeset.len();
match_changes(agent.subs_manager(), &changeset, db_version);
match_changes(agent.updates_manager(), &changeset, db_version);
}
tokio::spawn(async move {
for (_actor_id, changeset, db_version, _src) in changesets {
match_changes(agent.subs_manager(), &changeset, db_version);
match_changes(agent.updates_manager(), &changeset, db_version);
}
});

histogram!("corro.agent.changes.processing.time.seconds", "source" => "remote")
.record(start.elapsed());
histogram!("corro.agent.changes.processing.chunk_size").record(change_chunk_size as f64);

// Broadcast changes
let phase3_t = t3.elapsed();

let total = phase1_t + phase2_t + phase3_t;
info!(
"preprocess: {} ({} of total), db_stuff: {} ({} of total), match_changes: {} ({} of total)",
phase1_t.as_micros(),
phase1_t.as_secs_f64() / total.as_secs_f64(),
phase2_t.as_micros(),
phase2_t.as_secs_f64() / total.as_secs_f64(),
phase3_t.as_micros(),
phase3_t.as_secs_f64() / total.as_secs_f64()
);

Ok(())
}

Expand Down
16 changes: 13 additions & 3 deletions crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ pub struct SplitPool(Arc<SplitPoolInner>);
struct SplitPoolInner {
path: PathBuf,
write_sema: Arc<Semaphore>,
cache_size_kib: i64,

read: SqlitePool,
write: SqlitePool,
Expand Down Expand Up @@ -511,10 +512,11 @@ impl SplitPool {
pub async fn create<P: AsRef<Path>>(
path: P,
write_sema: Arc<Semaphore>,
cache_size_kib: i64,
) -> Result<Self, SplitPoolCreateError> {
let rw_pool = sqlite_pool::Config::new(path.as_ref())
.max_size(1)
.create_pool_transform(rusqlite_to_crsqlite_write)?;
.create_pool_transform(move |conn| rusqlite_to_crsqlite_write(conn, cache_size_kib))?;

debug!("built RW pool");

Expand All @@ -527,12 +529,19 @@ impl SplitPool {
Ok(Self::new(
path.as_ref().to_owned(),
write_sema,
cache_size_kib,
ro_pool,
rw_pool,
))
}

fn new(path: PathBuf, write_sema: Arc<Semaphore>, read: SqlitePool, write: SqlitePool) -> Self {
fn new(
path: PathBuf,
write_sema: Arc<Semaphore>,
cache_size_kib: i64,
read: SqlitePool,
write: SqlitePool,
) -> Self {
let (priority_tx, mut priority_rx) = bounded(256, "priority");
let (normal_tx, mut normal_rx) = bounded(512, "normal");
let (low_tx, mut low_rx) = bounded(1024, "low");
Expand All @@ -554,6 +563,7 @@ impl SplitPool {
Self(Arc::new(SplitPoolInner {
path,
write_sema,
cache_size_kib,
read,
write,
priority_tx,
Expand Down Expand Up @@ -603,7 +613,7 @@ impl SplitPool {
#[tracing::instrument(skip(self), level = "debug")]
pub fn client_dedicated(&self) -> rusqlite::Result<CrConn> {
let conn = rusqlite::Connection::open(&self.0.path)?;
let cr_conn = rusqlite_to_crsqlite_write(conn)?;
let cr_conn = rusqlite_to_crsqlite_write(conn, self.0.cache_size_kib)?;
trace_heavy_queries(cr_conn.conn())?;
Ok(cr_conn)
}
Expand Down
59 changes: 49 additions & 10 deletions crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,26 @@ pub const DEFAULT_MAX_SYNC_BACKOFF: u32 = 2;
#[cfg(not(test))]
pub const DEFAULT_MAX_SYNC_BACKOFF: u32 = 15;

const fn default_apply_queue() -> usize {
const fn default_apply_batch_min() -> usize {
100
}

const fn default_apply_batch_step() -> usize {
500
}

const fn default_apply_batch_max() -> usize {
16_000
}

const fn default_batch_threshold_ratio() -> f64 {
0.9
}

const fn default_cache_size_kib() -> i64 {
-1048576 // 1 GB (negative value means KiB)
}

const fn default_reaper_interval() -> usize {
3600
}
Expand Down Expand Up @@ -129,6 +145,11 @@ pub struct DbConfig {
pub schema_paths: Vec<Utf8PathBuf>,
#[serde(default)]
pub subscriptions_path: Option<Utf8PathBuf>,
/// SQLite page cache size in KiB for writes (negative value).
/// Default: -1048576 (1 GB). Larger values improve write performance but use more RAM.
/// WARNING: Setting this too low (<100MB) can severely degrade performance.
#[serde(default = "default_cache_size_kib")]
pub cache_size_kib: i64,
}

impl DbConfig {
Expand Down Expand Up @@ -227,20 +248,34 @@ pub struct PerfConfig {
pub bcast_channel_len: usize,
#[serde(default = "default_small_channel")]
pub foca_channel_len: usize,
#[serde(default = "default_apply_timeout")]
pub apply_queue_timeout: usize,
#[serde(default = "default_apply_queue")]
pub apply_queue_len: usize,
#[serde(default = "default_wal_threshold")]
pub wal_threshold_mb: usize,
#[serde(default = "default_processing_queue")]
pub processing_queue_len: usize,
#[serde(default = "default_sql_tx_timeout")]
pub sql_tx_timeout: usize,
#[serde(default = "default_min_sync_backoff")]
pub min_sync_backoff: u32,
#[serde(default = "default_max_sync_backoff")]
pub max_sync_backoff: u32,
// How many unapplied changesets corrosion will buffer before starting to drop them
#[serde(default = "default_processing_queue")]
pub processing_queue_len: usize,
// How many ms corrosion will wait before proceeding to apply a batch of changes
// We wait either for apply_queue_timeout or untill at least apply_queue_min_batch_size changes accumulate
#[serde(default = "default_apply_timeout")]
pub apply_queue_timeout: usize,
// Minimum amount of changes corrosion will try to apply at once in the same transaction
#[serde(default = "default_apply_batch_min")]
pub apply_queue_min_batch_size: usize,
// batch_size = clamp(min_batch_size, step_base * 2 ** floor(log2(x/step_base)), max_batch_size)
#[serde(default = "default_apply_batch_step")]
pub apply_queue_step_base: usize,
// Maximum amount of changes corrosion will try to apply at once in the same transaction
#[serde(default = "default_apply_batch_max")]
pub apply_queue_max_batch_size: usize,
// Threshold ratio (0.0-1.0) for immediate batch spawning when queue reaches this fraction of batch size
// It's used to decide whether to wait for more changes for apply_queue_timeout ms or spawn a batch immediately
#[serde(default = "default_batch_threshold_ratio")]
pub apply_queue_batch_threshold_ratio: f64,
}

impl Default for PerfConfig {
Expand All @@ -255,13 +290,16 @@ impl Default for PerfConfig {
clearbuf_channel_len: default_mid_channel(),
bcast_channel_len: default_mid_channel(),
foca_channel_len: default_small_channel(),
apply_queue_timeout: default_apply_timeout(),
apply_queue_len: default_apply_queue(),
wal_threshold_mb: default_wal_threshold(),
processing_queue_len: default_processing_queue(),
sql_tx_timeout: default_sql_tx_timeout(),
min_sync_backoff: default_min_sync_backoff(),
max_sync_backoff: default_max_sync_backoff(),
processing_queue_len: default_processing_queue(),
apply_queue_timeout: default_apply_timeout(),
apply_queue_min_batch_size: default_apply_batch_min(),
apply_queue_step_base: default_apply_batch_step(),
apply_queue_max_batch_size: default_apply_batch_max(),
apply_queue_batch_threshold_ratio: default_batch_threshold_ratio(),
}
}
}
Expand Down Expand Up @@ -446,6 +484,7 @@ impl ConfigBuilder {
path: db_path,
schema_paths: self.schema_paths,
subscriptions_path: None,
cache_size_kib: default_cache_size_kib(),
},
api: ApiConfig {
bind_addr: self.api_addr,
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ mod tests {
let subscriptions_path: Utf8PathBuf =
tmpdir.path().join("subs").display().to_string().into();

let pool = SplitPool::create(db_path, Arc::new(Semaphore::new(1))).await?;
let pool = SplitPool::create(db_path, Arc::new(Semaphore::new(1)), -1048576).await?;
let clock = Arc::new(uhlc::HLC::default());

{
Expand Down Expand Up @@ -2644,7 +2644,7 @@ mod tests {
let subscriptions_path: Utf8PathBuf =
tmpdir.path().join("subs").display().to_string().into();

let pool = SplitPool::create(&db_path, Arc::new(Semaphore::new(1)))
let pool = SplitPool::create(&db_path, Arc::new(Semaphore::new(1)), -1048576)
.await
.unwrap();
let mut conn = pool.write_priority().await.unwrap();
Expand Down
14 changes: 12 additions & 2 deletions crates/corro-types/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,19 @@ static CRSQL_EXT_DIR: Lazy<TempDir> = Lazy::new(|| {
dir
});

pub fn rusqlite_to_crsqlite_write(conn: rusqlite::Connection) -> rusqlite::Result<CrConn> {
pub fn rusqlite_to_crsqlite_write(
conn: rusqlite::Connection,
cache_size_kib: i64,
) -> rusqlite::Result<CrConn> {
let conn = rusqlite_to_crsqlite(conn)?;
conn.execute_batch("PRAGMA cache_size = -32000;")?;
conn.execute_batch(&format!(
"
PRAGMA cache_size = {};
PRAGMA temp_store = MEMORY;
PRAGMA cache_spill = FALSE;
",
cache_size_kib
))?;

Ok(conn)
}
Expand Down
Loading