Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ restate-invoker-impl = { path = "crates/invoker-impl" }
restate-local-cluster-runner = { path = "crates/local-cluster-runner" }
restate-log-server = { path = "crates/log-server" }
restate-log-server-grpc = { path = "crates/log-server-grpc" }
restate-memory = { path = "crates/memory" }
restate-metadata-providers = { path = "crates/metadata-providers" }
restate-metadata-server = { path = "crates/metadata-server" }
restate-metadata-server-grpc = { path = "crates/metadata-server-grpc" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ restate-workspace-hack = { workspace = true }

restate-core-derive = { workspace = true, optional = true }
restate-futures-util = { workspace = true }
restate-memory = { workspace = true }
restate-metadata-store = { workspace = true }
restate-time-util = { workspace = true }
restate-types = { workspace = true }
Expand Down
64 changes: 64 additions & 0 deletions crates/core/src/network/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::sync::{oneshot, watch};
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_memory::EstimatedMemorySize;
use restate_types::GenerationalNodeId;
use restate_types::net::codec::{WireDecode, WireEncode};
use restate_types::net::{ProtocolVersion, Service, UnaryMessage, WatchResponse};
Expand Down Expand Up @@ -255,6 +256,27 @@ impl<S> Incoming<RawSvcRpc<S>> {
}
}

impl EstimatedMemorySize for Incoming<RawRpc> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<RawSvcRpc<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<Rpc<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl Incoming<RawRpc> {
pub fn msg_type(&self) -> &str {
&self.inner.msg_type
Expand Down Expand Up @@ -336,6 +358,27 @@ impl<S> Incoming<RawSvcUnary<S>> {
}
}

impl EstimatedMemorySize for Incoming<RawUnary> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<RawSvcUnary<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<Unary<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl Incoming<RawUnary> {
pub fn msg_type(&self) -> &str {
&self.inner.msg_type
Expand Down Expand Up @@ -407,6 +450,27 @@ impl Incoming<RawWatch> {
}
}

impl EstimatedMemorySize for Incoming<RawWatch> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<RawSvcWatch<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

impl<S> EstimatedMemorySize for Incoming<Watch<S>> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.inner.payload.estimated_memory_size()
}
}

// A polymorphic incoming subscription request bound to a certain service
impl<S> Incoming<RawSvcWatch<S>> {
/// The sort-code is applicable if the sender specifies a target mailbox for this message.
Expand Down
16 changes: 16 additions & 0 deletions crates/memory/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "restate-memory"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[features]
default = []

[dependencies]
restate-workspace-hack = { workspace = true }

bytes = { workspace = true }
239 changes: 239 additions & 0 deletions crates/memory/src/footprint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

/// A trait for types that can be significant contributors to memory usage.
pub trait EstimatedMemorySize {
/// Estimated number of bytes to be used by this value in memory.
fn estimated_memory_size(&self) -> usize;
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for &T {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for &mut T {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl EstimatedMemorySize for () {
#[inline]
fn estimated_memory_size(&self) -> usize {
0
}
}

impl<T: EstimatedMemorySize> EstimatedMemorySize for Option<T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.as_ref()
.map_or(0, EstimatedMemorySize::estimated_memory_size)
}
}

impl<T: EstimatedMemorySize> EstimatedMemorySize for Vec<T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.iter()
.map(EstimatedMemorySize::estimated_memory_size)
.sum()
}
}

impl<T: EstimatedMemorySize> EstimatedMemorySize for [T] {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.iter()
.map(EstimatedMemorySize::estimated_memory_size)
.sum()
}
}

impl EstimatedMemorySize for String {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.len()
}
}

impl EstimatedMemorySize for bytes::Bytes {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.len()
}
}

impl EstimatedMemorySize for bytes::BytesMut {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.len()
}
}

impl EstimatedMemorySize for [u8] {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.len()
}
}

impl EstimatedMemorySize for str {
#[inline]
fn estimated_memory_size(&self) -> usize {
self.len()
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for std::sync::Arc<T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for std::rc::Rc<T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for Box<T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for std::sync::MutexGuard<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for std::sync::RwLockReadGuard<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ?Sized> EstimatedMemorySize for std::sync::RwLockWriteGuard<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize> EstimatedMemorySize for std::cell::Ref<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize> EstimatedMemorySize for std::cell::RefMut<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self)
}
}

impl<T: EstimatedMemorySize + ToOwned + ?Sized> EstimatedMemorySize for std::borrow::Cow<'_, T> {
#[inline]
fn estimated_memory_size(&self) -> usize {
T::estimated_memory_size(self.as_ref())
}
}

#[cfg(test)]
mod tests {
use super::*;

use bytes::{Bytes, BytesMut};

#[test]
fn test_primitives() {
assert_eq!(().estimated_memory_size(), 0);

let slice: &[u8] = &[1, 2, 3, 4, 5];
assert_eq!(slice.estimated_memory_size(), 5);
}

#[test]
fn test_string_and_bytes() {
assert_eq!("hello".to_string().estimated_memory_size(), 5);
assert_eq!(Bytes::from_static(b"world").estimated_memory_size(), 5);
assert_eq!(BytesMut::from(&b"test"[..]).estimated_memory_size(), 4);
}

#[test]
fn test_option() {
let none: Option<String> = None;
assert_eq!(none.estimated_memory_size(), 0);

let some = Some("hello".to_string());
assert_eq!(some.estimated_memory_size(), 5);
}

#[test]
fn test_vec_and_slice() {
let vec: Vec<String> = vec!["hello".to_string(), "world".to_string()];
assert_eq!(vec.estimated_memory_size(), 10);

let slice: &[String] = &vec;
assert_eq!(slice.estimated_memory_size(), 10);
}

#[test]
#[allow(clippy::needless_borrow, clippy::unnecessary_mut_passed)]
fn test_references() {
// These explicit borrows are intentional - we're testing the
// EstimatedMemorySize implementations for &T and &mut T
let s = "hello".to_string();
assert_eq!((&s).estimated_memory_size(), 5);
assert_eq!((&&s).estimated_memory_size(), 5);

let mut s2 = "world".to_string();
assert_eq!((&mut s2).estimated_memory_size(), 5);
}

#[test]
fn test_smart_pointers() {
use std::borrow::Cow;
use std::rc::Rc;
use std::sync::Arc;

// Box
let boxed = Box::new("hello".to_string());
assert_eq!(boxed.estimated_memory_size(), 5);

// Arc
let arc = Arc::new("world".to_string());
assert_eq!(arc.estimated_memory_size(), 5);

// Rc
let rc = Rc::new("test".to_string());
assert_eq!(rc.estimated_memory_size(), 4);

// Cow
let cow_borrowed: Cow<'_, str> = Cow::Borrowed("borrowed");
assert_eq!(cow_borrowed.estimated_memory_size(), 8);

let cow_owned: Cow<'_, str> = Cow::Owned("owned".to_string());
assert_eq!(cow_owned.estimated_memory_size(), 5);
}
}
Loading
Loading