Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions crates/storage/db/src/mdbx/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
//! Metrics collection for database operations.
//!
//! This module provides metrics tracking for:
//! - Transaction lifecycle (creation, commit, abort)
//! - Database operations (get, put, delete, clear)
//! - Operation timing and success rates

use std::sync::Arc;
use std::time::Instant;

use katana_metrics::metrics::{Counter, Histogram};
use katana_metrics::Metrics;

/// Metrics for database operations.
#[derive(Clone, Debug)]
pub struct DbMetrics {
inner: Arc<DbMetricsInner>,
}

impl Default for DbMetrics {
fn default() -> Self {
Self::new()
}
}

impl DbMetrics {
/// Creates a new instance of database metrics.
pub fn new() -> Self {
// database tables metrics
::metrics::describe_gauge!(
"db.table_size",
::metrics::Unit::Bytes,
"Total size of the table"
);
::metrics::describe_gauge!(
"db.table_pages",
::metrics::Unit::Count,
"Number of pages in the table"
);
::metrics::describe_gauge!(
"db.table_entries",
::metrics::Unit::Count,
"Number of entries in the table"
);
::metrics::describe_gauge!(
"db.freelist",
::metrics::Unit::Bytes,
"Size of the database freelist"
);

Self {
inner: Arc::new(DbMetricsInner {
transaction: DbTransactionMetrics::default(),
operations: DbOperationMetrics::default(),
}),
}
}

/// Records a transaction creation.
pub fn record_ro_tx_create(&self) {
self.inner.transaction.ro_created.increment(1);
}

pub fn record_rw_tx_create(&self) {
self.inner.transaction.rw_created.increment(1);
}

/// Records a transaction commit with timing.
///
/// ## Arguments
///
/// * `duration` - Time taken for the get operation in seconds
/// * `success` - Whether the commit operation completed sucessfully or not.
pub fn record_tx_commit(&self, duration: f64, success: bool) {
if success {
self.inner.transaction.commits_successful.increment(1);
} else {
self.inner.transaction.commits_failed.increment(1);
}

self.inner.transaction.commit_time_seconds.record(duration);
}

/// Records a transaction abort.
pub fn record_tx_abort(&self) {
self.inner.transaction.aborts.increment(1);
}

/// Records a get operation.
///
/// ## Arguments
///
/// * `duration` - Time taken for the get operation in seconds
/// * `found` - Whether the requested value was found
pub fn record_get(&self, duration: f64, found: bool) {
if found {
self.inner.operations.get_hits.increment(1);
} else {
self.inner.operations.get_misses.increment(1);
}

self.inner.operations.get_time_seconds.record(duration);
}

/// Records a put operation.
///
/// ## Arguments
///
/// * `duration` - Time taken for the put operation in seconds
pub fn record_put(&self, duration: f64) {
self.inner.operations.puts.increment(1);
self.inner.operations.put_time_seconds.record(duration);
}

/// Records a delete operation.
///
/// ## Arguments
///
/// * `duration` - Time taken for the delete operation in seconds
pub fn record_delete(&self, duration: f64, deleted: bool) {
if deleted {
self.inner.operations.deletes_successful.increment(1);
} else {
self.inner.operations.deletes_failed.increment(1);
}

self.inner.operations.delete_time_seconds.record(duration);
}

/// Records a clear operation.
///
/// ## Arguments
///
/// * `duration` - Time taken for the clear operation in seconds
pub fn record_clear(&self, duration: f64) {
self.inner.operations.clears.increment(1);
self.inner.operations.clear_time_seconds.record(duration);
}
}

#[derive(Debug)]
struct DbMetricsInner {
transaction: DbTransactionMetrics,
operations: DbOperationMetrics,
}

/// Metrics for database transactions.
#[derive(Metrics, Clone)]
#[metrics(scope = "db.transaction")]
struct DbTransactionMetrics {
/// Number of read-only transactions created
ro_created: Counter,
/// Number of read-write transactions created
rw_created: Counter,
/// Number of successful commits
commits_successful: Counter,
/// Number of failed commits
commits_failed: Counter,
/// Number of transaction aborts
aborts: Counter,
/// Time taken to commit a transaction
commit_time_seconds: Histogram,
}

/// Metrics for database operations.
#[derive(Metrics, Clone)]
#[metrics(scope = "db.operation")]
struct DbOperationMetrics {
/// Number of get operations that found a value
get_hits: Counter,
/// Number of get operations that didn't find a value
get_misses: Counter,
/// Time taken for get operations
get_time_seconds: Histogram,
/// Number of put operations
puts: Counter,
/// Time taken for put operations
put_time_seconds: Histogram,
/// Number of successful delete operations
deletes_successful: Counter,
/// Number of failed delete operations
deletes_failed: Counter,
/// Time taken for delete operations
delete_time_seconds: Histogram,
/// Number of clear operations
clears: Counter,
/// Time taken for clear operations
clear_time_seconds: Histogram,
}

/// Helper for timing database operations.
pub(super) struct OpTimer {
start: Instant,
}

impl OpTimer {
/// Starts timing an operation.
pub fn new() -> Self {
Self { start: Instant::now() }
}

/// Returns the elapsed time in seconds.
pub fn elapsed(&self) -> f64 {
self.start.elapsed().as_secs_f64()
}
}
58 changes: 39 additions & 19 deletions crates/storage/db/src/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use katana_metrics::metrics::gauge;
pub use libmdbx;
use libmdbx::{DatabaseFlags, EnvironmentFlags, Geometry, PageSize, SyncMode, RO, RW};
use metrics::{describe_gauge, Label};
use tracing::error;

use crate::abstraction::Database;
Expand All @@ -14,9 +13,11 @@ use crate::tables::{TableType, Tables, NUM_TABLES};
use crate::{utils, GIGABYTE, TERABYTE};

pub mod cursor;
pub mod metrics;
pub mod stats;
pub mod tx;

use self::metrics::DbMetrics;
use self::stats::{Stats, TableStat};
use self::tx::Tx;

Expand Down Expand Up @@ -98,8 +99,9 @@ impl DbEnvBuilder {

let env = builder.open(path.as_ref()).map_err(DatabaseError::OpenEnv)?;
let dir = path.as_ref().to_path_buf();
let metrics = DbMetrics::new();

Ok(DbEnv { inner: Arc::new(DbEnvInner { env, dir }) }.with_metrics())
Ok(DbEnv { inner: Arc::new(DbEnvInner { env, dir, metrics }) })
}
}

Expand All @@ -110,17 +112,30 @@ impl Default for DbEnvBuilder {
}

/// Wrapper for `libmdbx-sys` environment.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct DbEnv {
pub(crate) inner: Arc<DbEnvInner>,
}

#[derive(Debug)]
pub(super) struct DbEnvInner {
/// The handle to the MDBX environment.
pub(super) env: libmdbx::Environment,
/// The path where the database environemnt is stored at.
pub(super) dir: PathBuf,
/// Metrics for database operations.
pub(super) metrics: DbMetrics,
}

impl std::fmt::Debug for DbEnv {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DbEnv").field("dir", &self.inner.dir).finish_non_exhaustive()
}
}

impl std::fmt::Debug for DbEnvInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DbEnvInner").field("dir", &self.dir).finish_non_exhaustive()
}
}

impl DbEnv {
Expand All @@ -146,14 +161,6 @@ impl DbEnv {
pub fn path(&self) -> &Path {
&self.inner.dir
}

pub(super) fn with_metrics(self) -> Self {
describe_gauge!("db.table_size", metrics::Unit::Bytes, "Total size of the table");
describe_gauge!("db.table_pages", metrics::Unit::Count, "Number of pages in the table");
describe_gauge!("db.table_entries", metrics::Unit::Count, "Number of entries in the table");
describe_gauge!("db.freelist", metrics::Unit::Bytes, "Size of the database freelist");
self
}
}

impl Database for DbEnv {
Expand All @@ -163,12 +170,16 @@ impl Database for DbEnv {

#[tracing::instrument(level = "trace", name = "db_txn_ro_create", skip_all)]
fn tx(&self) -> Result<Self::Tx, DatabaseError> {
Ok(Tx::new(self.inner.env.begin_ro_txn().map_err(DatabaseError::CreateROTx)?))
let tx = self.inner.env.begin_ro_txn().map_err(DatabaseError::CreateROTx)?;
self.inner.metrics.record_ro_tx_create();
Ok(Tx::new(tx, self.inner.metrics.clone()))
}

#[tracing::instrument(level = "trace", name = "db_txn_rw_create", skip_all)]
fn tx_mut(&self) -> Result<Self::TxMut, DatabaseError> {
Ok(Tx::new(self.inner.env.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?))
let tx = self.inner.env.begin_rw_txn().map_err(DatabaseError::CreateRWTx)?;
self.inner.metrics.record_rw_tx_create();
Ok(Tx::new(tx, self.inner.metrics.clone()))
}

fn stats(&self) -> Result<Self::Stats, DatabaseError> {
Expand All @@ -195,28 +206,37 @@ impl katana_metrics::Report for DbEnv {
let mut pgsize = 0;

for (table, stat) in stats.table_stats() {
gauge!("db.table_size", vec![Label::new("table", *table)])
gauge!("db.table_size", vec![::metrics::Label::new("table", *table)])
.set(stat.total_size() as f64);

gauge!(
"db.table_pages",
vec![Label::new("table", *table), Label::new("type", "leaf")]
vec![
::metrics::Label::new("table", *table),
::metrics::Label::new("type", "leaf")
]
)
.set(stat.leaf_pages() as f64);

gauge!(
"db.table_pages",
vec![Label::new("table", *table), Label::new("type", "branch")]
vec![
::metrics::Label::new("table", *table),
::metrics::Label::new("type", "branch")
]
)
.set(stat.branch_pages() as f64);

gauge!(
"db.table_pages",
vec![Label::new("table", *table), Label::new("type", "overflow")]
vec![
::metrics::Label::new("table", *table),
::metrics::Label::new("type", "overflow")
]
)
.set(stat.overflow_pages() as f64);

gauge!("db.table_entries", vec![Label::new("table", *table)])
gauge!("db.table_entries", vec![::metrics::Label::new("table", *table)])
.set(stat.entries() as f64);

if pgsize == 0 {
Expand Down
Loading
Loading