From c0a2cc1eafaded52f1e22f2c79d0c6b55d1eeea7 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Fri, 6 Feb 2026 22:13:51 +0000 Subject: [PATCH] Add restate-memory crate with EstimatedMemorySize trait Introduce a new restate-memory crate containing memory management utilities. The crate provides EstimatedMemorySize trait for types that can be significant contributors to memory usage. Key features: - EstimatedMemorySize trait with blanket impls for references (&T, &mut T) - Impls for common types: String, Bytes, BytesMut, [u8], str, Vec, [T] - Impls for smart pointers: Arc, Rc, Box, Cow - Impls for lock guards: MutexGuard, RwLockReadGuard, RwLockWriteGuard, Ref, RefMut - Impl for Option The trait is re-exported through restate-types::memory for convenience. Also implements EstimatedMemorySize for Record, Payloads, Store, and PolyBytes in restate-types. --- Cargo.lock | 9 ++ Cargo.toml | 1 + crates/core/Cargo.toml | 1 + crates/core/src/network/incoming.rs | 64 ++++++++ crates/memory/Cargo.toml | 16 ++ crates/memory/src/footprint.rs | 239 ++++++++++++++++++++++++++++ crates/memory/src/lib.rs | 14 ++ crates/types/Cargo.toml | 1 + crates/types/src/lib.rs | 5 + crates/types/src/logs/record.rs | 9 ++ crates/types/src/net/log_server.rs | 14 ++ crates/types/src/storage.rs | 11 +- 12 files changed, 382 insertions(+), 2 deletions(-) create mode 100644 crates/memory/Cargo.toml create mode 100644 crates/memory/src/footprint.rs create mode 100644 crates/memory/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 71c38f801a..c5300aa46e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7551,6 +7551,14 @@ dependencies = [ "tonic-prost-build", ] +[[package]] +name = "restate-memory" +version = "1.6.1-dev" +dependencies = [ + "bytes", + "restate-workspace-hack", +] + [[package]] name = "restate-metadata-providers" version = "1.6.1-dev" @@ -8221,6 +8229,7 @@ dependencies = [ "restate-clock", "restate-encoding", "restate-errors", + "restate-memory", "restate-serde-util", "restate-test-util", "restate-time-util", diff --git a/Cargo.toml b/Cargo.toml index 4e2595f380..c94a490787 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ restate-invoker-impl = { path = "crates/invoker-impl" } restate-local-cluster-runner = { path = "crates/local-cluster-runner" } restate-log-server = { path = "crates/log-server" } restate-log-server-grpc = { path = "crates/log-server-grpc" } +restate-memory = { path = "crates/memory" } restate-metadata-providers = { path = "crates/metadata-providers" } restate-metadata-server = { path = "crates/metadata-server" } restate-metadata-server-grpc = { path = "crates/metadata-server-grpc" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 024cb48b1e..dd9a9a8f7c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,6 +22,7 @@ restate-workspace-hack = { workspace = true } restate-core-derive = { workspace = true, optional = true } restate-futures-util = { workspace = true } +restate-memory = { workspace = true } restate-metadata-store = { workspace = true } restate-time-util = { workspace = true } restate-types = { workspace = true } diff --git a/crates/core/src/network/incoming.rs b/crates/core/src/network/incoming.rs index 9af3adf7be..d9a8bf365c 100644 --- a/crates/core/src/network/incoming.rs +++ b/crates/core/src/network/incoming.rs @@ -16,6 +16,7 @@ use tokio::sync::{oneshot, watch}; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; +use restate_memory::EstimatedMemorySize; use restate_types::GenerationalNodeId; use restate_types::net::codec::{WireDecode, WireEncode}; use restate_types::net::{ProtocolVersion, Service, UnaryMessage, WatchResponse}; @@ -255,6 +256,27 @@ impl Incoming> { } } +impl EstimatedMemorySize for Incoming { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + impl Incoming { pub fn msg_type(&self) -> &str { &self.inner.msg_type @@ -336,6 +358,27 @@ impl Incoming> { } } +impl EstimatedMemorySize for Incoming { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + impl Incoming { pub fn msg_type(&self) -> &str { &self.inner.msg_type @@ -407,6 +450,27 @@ impl Incoming { } } +impl EstimatedMemorySize for Incoming { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + +impl EstimatedMemorySize for Incoming> { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.inner.payload.estimated_memory_size() + } +} + // A polymorphic incoming subscription request bound to a certain service impl Incoming> { /// The sort-code is applicable if the sender specifies a target mailbox for this message. diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml new file mode 100644 index 0000000000..bb31df7191 --- /dev/null +++ b/crates/memory/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "restate-memory" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[features] +default = [] + +[dependencies] +restate-workspace-hack = { workspace = true } + +bytes = { workspace = true } diff --git a/crates/memory/src/footprint.rs b/crates/memory/src/footprint.rs new file mode 100644 index 0000000000..a5ba8c1eb3 --- /dev/null +++ b/crates/memory/src/footprint.rs @@ -0,0 +1,239 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +/// A trait for types that can be significant contributors to memory usage. +pub trait EstimatedMemorySize { + /// Estimated number of bytes to be used by this value in memory. + fn estimated_memory_size(&self) -> usize; +} + +impl EstimatedMemorySize for &T { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for &mut T { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for () { + #[inline] + fn estimated_memory_size(&self) -> usize { + 0 + } +} + +impl EstimatedMemorySize for Option { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.as_ref() + .map_or(0, EstimatedMemorySize::estimated_memory_size) + } +} + +impl EstimatedMemorySize for Vec { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.iter() + .map(EstimatedMemorySize::estimated_memory_size) + .sum() + } +} + +impl EstimatedMemorySize for [T] { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.iter() + .map(EstimatedMemorySize::estimated_memory_size) + .sum() + } +} + +impl EstimatedMemorySize for String { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.len() + } +} + +impl EstimatedMemorySize for bytes::Bytes { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.len() + } +} + +impl EstimatedMemorySize for bytes::BytesMut { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.len() + } +} + +impl EstimatedMemorySize for [u8] { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.len() + } +} + +impl EstimatedMemorySize for str { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.len() + } +} + +impl EstimatedMemorySize for std::sync::Arc { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::rc::Rc { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for Box { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::sync::MutexGuard<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::sync::RwLockReadGuard<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::sync::RwLockWriteGuard<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::cell::Ref<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::cell::RefMut<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self) + } +} + +impl EstimatedMemorySize for std::borrow::Cow<'_, T> { + #[inline] + fn estimated_memory_size(&self) -> usize { + T::estimated_memory_size(self.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use bytes::{Bytes, BytesMut}; + + #[test] + fn test_primitives() { + assert_eq!(().estimated_memory_size(), 0); + + let slice: &[u8] = &[1, 2, 3, 4, 5]; + assert_eq!(slice.estimated_memory_size(), 5); + } + + #[test] + fn test_string_and_bytes() { + assert_eq!("hello".to_string().estimated_memory_size(), 5); + assert_eq!(Bytes::from_static(b"world").estimated_memory_size(), 5); + assert_eq!(BytesMut::from(&b"test"[..]).estimated_memory_size(), 4); + } + + #[test] + fn test_option() { + let none: Option = None; + assert_eq!(none.estimated_memory_size(), 0); + + let some = Some("hello".to_string()); + assert_eq!(some.estimated_memory_size(), 5); + } + + #[test] + fn test_vec_and_slice() { + let vec: Vec = vec!["hello".to_string(), "world".to_string()]; + assert_eq!(vec.estimated_memory_size(), 10); + + let slice: &[String] = &vec; + assert_eq!(slice.estimated_memory_size(), 10); + } + + #[test] + #[allow(clippy::needless_borrow, clippy::unnecessary_mut_passed)] + fn test_references() { + // These explicit borrows are intentional - we're testing the + // EstimatedMemorySize implementations for &T and &mut T + let s = "hello".to_string(); + assert_eq!((&s).estimated_memory_size(), 5); + assert_eq!((&&s).estimated_memory_size(), 5); + + let mut s2 = "world".to_string(); + assert_eq!((&mut s2).estimated_memory_size(), 5); + } + + #[test] + fn test_smart_pointers() { + use std::borrow::Cow; + use std::rc::Rc; + use std::sync::Arc; + + // Box + let boxed = Box::new("hello".to_string()); + assert_eq!(boxed.estimated_memory_size(), 5); + + // Arc + let arc = Arc::new("world".to_string()); + assert_eq!(arc.estimated_memory_size(), 5); + + // Rc + let rc = Rc::new("test".to_string()); + assert_eq!(rc.estimated_memory_size(), 4); + + // Cow + let cow_borrowed: Cow<'_, str> = Cow::Borrowed("borrowed"); + assert_eq!(cow_borrowed.estimated_memory_size(), 8); + + let cow_owned: Cow<'_, str> = Cow::Owned("owned".to_string()); + assert_eq!(cow_owned.estimated_memory_size(), 5); + } +} diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs new file mode 100644 index 0000000000..cfa2ebaa9c --- /dev/null +++ b/crates/memory/src/lib.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Memory management utilities for Restate. +mod footprint; + +pub use footprint::*; diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index d9278e7d22..d9d6c0acbe 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -26,6 +26,7 @@ restate-base64-util = { workspace = true } restate-clock = { workspace = true, features = ["prost-types", "jiff", "hlc"] } restate-encoding = { workspace = true } restate-errors = { workspace = true } +restate-memory = { workspace = true } restate-serde-util = { workspace = true } restate-test-util = { workspace = true, optional = true } restate-time-util = { workspace = true, features = ["serde", "serde_with"] } diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 64c2a26ab9..c921b4accb 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -68,6 +68,11 @@ pub mod clock { pub use restate_clock::*; } +// Re-export restate-memory crate for memory management utilities. +pub mod memory { + pub use restate_memory::*; +} + // Re-export metrics' SharedString (Space-efficient Cow + RefCounted variant) pub type SharedString = metrics::SharedString; diff --git a/crates/types/src/logs/record.rs b/crates/types/src/logs/record.rs index aa4571c100..9a8d5267ab 100644 --- a/crates/types/src/logs/record.rs +++ b/crates/types/src/logs/record.rs @@ -11,7 +11,9 @@ use std::sync::Arc; use bytes::BytesMut; + use restate_encoding::NetSerde; +use restate_memory::EstimatedMemorySize; use crate::storage::{PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode}; use crate::time::NanosSinceEpoch; @@ -132,6 +134,13 @@ impl Record { } } +impl EstimatedMemorySize for Record { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.body.estimated_memory_size() + } +} + impl MatchKeyQuery for Record { fn matches_key_query(&self, query: &KeyFilter) -> bool { self.keys.matches_key_query(query) diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index c05990cb82..2ca2cb9aff 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -15,6 +15,7 @@ use bitflags::bitflags; use prost_dto::{FromProst, IntoProst}; use restate_encoding::{ArcedSlice, BilrostNewType, NetSerde}; +use restate_memory::EstimatedMemorySize; use super::{RpcResponse, ServiceTag}; use crate::GenerationalNodeId; @@ -310,6 +311,13 @@ impl From> for Payloads { } } +impl EstimatedMemorySize for Payloads { + #[inline] + fn estimated_memory_size(&self) -> usize { + self.0.estimated_memory_size() + } +} + /// Store one or more records on a log-server #[derive(Debug, Clone, bilrost::Message, NetSerde)] pub struct Store { @@ -356,6 +364,12 @@ impl Store { } } +impl EstimatedMemorySize for Store { + fn estimated_memory_size(&self) -> usize { + self.payloads.estimated_memory_size() + } +} + impl WireEncode for Store { fn encode_to_bytes( &self, diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index 6d835e976a..266d945df4 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -19,6 +19,7 @@ use chrono::Utc; use downcast_rs::{DowncastSync, impl_downcast}; use restate_encoding::{BilrostAs, NetSerde}; +use restate_memory::EstimatedMemorySize; use crate::errors::GenericError; use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry}; @@ -221,8 +222,8 @@ impl Default for PolyBytes { // implement NetSerde for PolyBytes manually impl NetSerde for PolyBytes {} -impl PolyBytes { - pub fn estimated_encode_size(&self) -> usize { +impl EstimatedMemorySize for PolyBytes { + fn estimated_memory_size(&self) -> usize { match self { PolyBytes::Bytes(bytes) => bytes.len(), PolyBytes::Both(_, bytes) => bytes.len(), @@ -234,6 +235,12 @@ impl PolyBytes { } } } +} + +impl PolyBytes { + pub fn estimated_encode_size(&self) -> usize { + self.estimated_memory_size() + } #[tracing::instrument(skip_all)] pub fn encode_to_bytes(&self, scratch: &mut BytesMut) -> Result {