Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 59 additions & 1 deletion core/configs/src/server_config/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,69 @@ use std::str::FromStr;

use configs::ConfigEnv;

#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
/// Default capacity of the per-shard inter-shard inbox channel. Sized
/// comfortably above the consensus working set, which is roughly
/// `PIPELINE_PREPARE_QUEUE_MAX (= 8) * replica_count * directions`
/// frames in flight per shard, without allowing a runaway producer to
/// eat unbounded memory. Tunable via `[system.sharding] inbox_capacity`
/// in TOML.
///
/// The capacity must also absorb the worst-case cross-shard client
/// Reply burst. Unlike consensus frames, client Replies have no VSR
/// retransmit path: a Reply lost on full inbox is gone and the client
/// times out. A reasonable lower bound is
/// `max_inflight_client_requests / num_shards` (assuming requests are
/// distributed evenly across owning shards) plus the consensus
/// headroom above.
///
/// Consensus frames and client-reply forwards share this one channel,
/// so the two headrooms are not independent: a consensus burst or
/// retransmit storm can fill the inbox with consensus frames exactly
/// when a client Reply needs the space. A single `inbox_capacity` knob
/// cannot isolate the two frame classes - size it for the sum of both
/// worst cases occurring together. Watch the drop-site `tracing` logs
/// (and, once a per-shard exporter lands, the `frame_drops_total`
/// `{variant="forward_client_send"}` counter) to detect when the bound
/// is too low in production.
pub const DEFAULT_INBOX_CAPACITY: usize = 1024;

/// Maximum permitted per-shard inbox depth. The channel is allocated
/// up-front per shard, so a runaway value here OOMs the process at boot.
/// `1 << 20` (~1M frames) is several orders of magnitude above any
/// realistic backpressure target and still fits comfortably in process
/// address space.
pub const INBOX_CAPACITY_MAX: usize = 1 << 20;

const fn default_inbox_capacity() -> usize {
DEFAULT_INBOX_CAPACITY
}

#[derive(Debug, Deserialize, Serialize, ConfigEnv)]
pub struct ShardingConfig {
#[serde(default)]
#[config_env(leaf)]
pub cpu_allocation: CpuAllocation,
/// Per-shard inter-shard inbox channel capacity. Bounded by design.
/// Drops on full inbox of consensus frames are recovered by VSR
/// retransmit. Drops of cross-shard client Reply frames are terminal:
/// the client never receives the reply (no in-protocol retransmit).
/// Both frame classes share this one channel, so a consensus burst
/// can starve client-reply forwards: size against the worst-case sum
/// of consensus working set + peak client-reply fan-out per shard
/// occurring together; see `DEFAULT_INBOX_CAPACITY` for the
/// rationale. Used by `core/server-ng`; the legacy server uses its
/// own hard-coded inbox sizing.
#[serde(default = "default_inbox_capacity")]
pub inbox_capacity: usize,
}

impl Default for ShardingConfig {
fn default() -> Self {
Self {
cpu_allocation: CpuAllocation::default(),
inbox_capacity: DEFAULT_INBOX_CAPACITY,
}
}
}

#[derive(Debug, Clone, PartialEq, Default)]
Expand Down
19 changes: 18 additions & 1 deletion core/configs/src/server_config/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::server::{
DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig,
};
use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig};
use super::sharding::{CpuAllocation, ShardingConfig};
use super::sharding::{CpuAllocation, INBOX_CAPACITY_MAX, ShardingConfig};
use super::system::SegmentConfig;
use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
use crate::ConfigurationError;
Expand Down Expand Up @@ -379,6 +379,23 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {

impl Validatable<ConfigurationError> for ShardingConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.inbox_capacity == 0 {
eprintln!(
"Invalid sharding configuration: inbox_capacity must be > 0 (crossfire silently \
rounds 0 to 1, masking config errors)"
);
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.inbox_capacity > INBOX_CAPACITY_MAX {
eprintln!(
"Invalid sharding configuration: inbox_capacity {} exceeds the {} cap (each \
shard preallocates a channel of this size; oversizing here OOMs the process at \
boot)",
self.inbox_capacity, INBOX_CAPACITY_MAX
);
return Err(ConfigurationError::InvalidConfigurationValue);
}

let available_cpus = available_parallelism()
.map_err(|_| {
eprintln!("Failed to detect available CPU cores");
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/src/metadata_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ mod tests {
) -> Result<(), SendError> {
Ok(())
}

fn set_connection_lost_fn(&self, _f: message_bus::ConnectionLostFn) {}
fn set_replica_forward_fn(&self, _f: message_bus::ReplicaForwardFn) {}
fn set_client_forward_fn(&self, _f: message_bus::ClientForwardFn) {}
}

// Register-retry replay: cached IS the register reply; bytes replayed
Expand Down
8 changes: 8 additions & 0 deletions core/consensus/src/plane_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,10 @@ mod tests {
) -> Result<(), SendError> {
Ok(())
}

fn set_connection_lost_fn(&self, _f: message_bus::ConnectionLostFn) {}
fn set_replica_forward_fn(&self, _f: message_bus::ReplicaForwardFn) {}
fn set_client_forward_fn(&self, _f: message_bus::ClientForwardFn) {}
}

fn prepare_message(op: u64, parent: u128, checksum: u128) -> Message<PrepareHeader> {
Expand Down Expand Up @@ -675,6 +679,10 @@ mod tests {
self.sent.borrow_mut().push((replica, data));
Ok(())
}

fn set_connection_lost_fn(&self, _f: message_bus::ConnectionLostFn) {}
fn set_replica_forward_fn(&self, _f: message_bus::ReplicaForwardFn) {}
fn set_client_forward_fn(&self, _f: message_bus::ClientForwardFn) {}
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion core/journal/src/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub struct FileStorage {

#[allow(clippy::future_not_send)]
impl FileStorage {
/// Open or create the file at `path`, setting `write_offset` to current file length.
/// Open or create the file at `path` in read-write mode, setting
/// `write_offset` to current file length.
///
/// # Errors
/// Returns an I/O error if the file cannot be opened or created.
Expand Down
Loading
Loading