diff --git a/Cargo.lock b/Cargo.lock
index aec4ad866aff..6a85892b8ffe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1940,6 +1940,7 @@ dependencies = [
"moka",
"nu-ansi-term",
"object-store",
+ "ordered-float 4.6.0",
"plugins",
"prometheus",
"prost 0.13.5",
@@ -10136,6 +10137,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-runtime",
+ "common-stat",
"common-telemetry",
"common-time",
"datafusion",
diff --git a/config/config.md b/config/config.md
index 92f192e6df6a..48427be475c0 100644
--- a/config/config.md
+++ b/config/config.md
@@ -101,6 +101,7 @@
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode. Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data. - `File`: the data is stored in the local file system. - `S3`: the data is stored in the S3 object storage. - `Gcs`: the data is stored in the Google Cloud Storage. - `Azblob`: the data is stored in the Azure Blob Storage. - `Oss`: the data is stored in the Aliyun OSS. |
@@ -152,6 +153,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -302,6 +304,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
| `query.allow_query_fallback` | Bool | `false` | Whether to allow query fallback when push down optimize fails. Default to false, meaning when push down optimize failed, return error msg |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans (only applies to datanodes). |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.connect_timeout` | String | `10s` | -- |
@@ -493,6 +496,7 @@
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL. **It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system can still successfully replay memtable data without throwing an out-of-range error. However, enabling this option might lead to unexpected data loss, as the system will skip over missing entries instead of treating them as critical errors. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data. - `File`: the data is stored in the local file system. - `S3`: the data is stored in the S3 object storage. - `Gcs`: the data is stored in the Google Cloud Storage. - `Azblob`: the data is stored in the Azure Blob Storage. - `Oss`: the data is stored in the Aliyun OSS. |
@@ -546,6 +550,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -666,5 +671,6 @@
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `query` | -- | -- | -- |
| `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode. Default to 1, so it won't use too much cpu or memory |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `memory` | -- | -- | The memory options. |
| `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup. When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable is set to "prof:true,prof_active:false". The official image adds this env variable. Default is true. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index e283967680d5..bbf16963ae20 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -256,6 +256,13 @@ overwrite_entry_start_id = false
## Default to 0, which means the number of CPU cores.
parallelism = 0
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The data storage options.
[storage]
## The working home directory.
@@ -496,6 +503,11 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
+## Memory limit for table scans across all queries.
+## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit.
+scan_memory_limit = "50%"
+
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 81ff25f28366..4e44c1ecbb04 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -158,6 +158,13 @@ default_ratio = 1.0
## Default to 1, so it won't use too much cpu or memory
parallelism = 1
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The memory options.
[memory]
## Whether to enable heap profiling activation during startup.
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index b26d88323e49..4e1ef02e24ad 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -244,6 +244,13 @@ parallelism = 0
## Default to false, meaning when push down optimize failed, return error msg
allow_query_fallback = false
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans (only applies to datanodes).
+memory_pool_size = "50%"
+
## Datanode options.
[datanode]
## Datanode client options.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 5fae0f444fda..41acb4d265e6 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -353,6 +353,13 @@ max_running_procedures = 128
## Default to 0, which means the number of CPU cores.
parallelism = 0
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The data storage options.
[storage]
## The working home directory.
@@ -580,6 +587,11 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
+## Memory limit for table scans across all queries.
+## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit.
+scan_memory_limit = "50%"
+
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml
index 8baa1dc50d4e..105b7d19b008 100644
--- a/src/cmd/Cargo.toml
+++ b/src/cmd/Cargo.toml
@@ -96,6 +96,7 @@ tikv-jemallocator = "0.6"
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
+ordered-float.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index b92cf9631d46..43d69e6c18ad 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -15,6 +15,7 @@
use std::time::Duration;
use cmd::options::GreptimeOptions;
+use common_base::memory_limit::MemoryLimit;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
@@ -73,6 +74,7 @@ fn test_load_datanode_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
+ scan_memory_limit: MemoryLimit::Percentage(0.5),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -81,6 +83,10 @@ fn test_load_datanode_example_config() {
flush_metadata_region_interval: Duration::from_secs(30),
}),
],
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(0.5),
+ ..Default::default()
+ },
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
@@ -153,6 +159,10 @@ fn test_load_frontend_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(0.5),
+ ..Default::default()
+ },
..Default::default()
},
..Default::default()
@@ -240,6 +250,7 @@ fn test_load_flownode_example_config() {
query: QueryOptions {
parallelism: 1,
allow_query_fallback: false,
+ memory_pool_size: MemoryLimit::Percentage(0.5),
},
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
@@ -283,6 +294,7 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
+ scan_memory_limit: MemoryLimit::Percentage(0.5),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -311,7 +323,10 @@ fn test_load_standalone_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
-
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(0.5),
+ ..Default::default()
+ },
..Default::default()
},
..Default::default()
diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs
index cc5acdbf47af..681efe28d71f 100644
--- a/src/common/base/src/lib.rs
+++ b/src/common/base/src/lib.rs
@@ -15,6 +15,7 @@
pub mod bit_vec;
pub mod bytes;
pub mod cancellation;
+pub mod memory_limit;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
diff --git a/src/common/base/src/memory_limit.rs b/src/common/base/src/memory_limit.rs
new file mode 100644
index 000000000000..ccd3fd6081a7
--- /dev/null
+++ b/src/common/base/src/memory_limit.rs
@@ -0,0 +1,216 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt::{self, Display};
+use std::str::FromStr;
+
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::readable_size::ReadableSize;
+
+/// Memory limit configuration that supports both absolute size and percentage.
+///
+/// Examples:
+/// - Absolute size: "2GB", "4GiB", "512MB"
+/// - Percentage: "50%", "75%"
+/// - Unlimited: "0"
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum MemoryLimit {
+ /// Absolute memory size.
+ Size(ReadableSize),
+ /// Percentage of total system memory (0.0 to 1.0).
+ Percentage(f64),
+}
+
+// Safe to implement Eq because percentage values are always in valid range (0.0-1.0)
+// and won't be NaN or Infinity.
+impl Eq for MemoryLimit {}
+
+impl MemoryLimit {
+ /// Resolve the memory limit to bytes based on total system memory.
+ /// Returns 0 if the limit is disabled (Size(0) or Percentage(0)).
+ pub fn resolve(&self, total_memory_bytes: u64) -> u64 {
+ match self {
+ MemoryLimit::Size(size) => size.as_bytes(),
+ MemoryLimit::Percentage(pct) => {
+ if *pct <= 0.0 {
+ 0
+ } else {
+ (total_memory_bytes as f64 * pct) as u64
+ }
+ }
+ }
+ }
+
+ /// Returns true if this limit is disabled (0 bytes or 0%).
+ pub fn is_unlimited(&self) -> bool {
+ match self {
+ MemoryLimit::Size(size) => size.as_bytes() == 0,
+ MemoryLimit::Percentage(pct) => *pct <= 0.0,
+ }
+ }
+}
+
+impl Default for MemoryLimit {
+ fn default() -> Self {
+ MemoryLimit::Size(ReadableSize(0))
+ }
+}
+
+impl FromStr for MemoryLimit {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result {
+ let s = s.trim();
+
+ if let Some(pct_str) = s.strip_suffix('%') {
+ let pct = pct_str
+ .trim()
+ .parse::()
+ .map_err(|e| format!("invalid percentage value '{}': {}", pct_str, e))?;
+
+ if !(0.0..=100.0).contains(&pct) {
+ return Err(format!("percentage must be between 0 and 100, got {}", pct));
+ }
+
+ Ok(MemoryLimit::Percentage(pct / 100.0))
+ } else {
+ let size = ReadableSize::from_str(s)?;
+ Ok(MemoryLimit::Size(size))
+ }
+ }
+}
+
+impl Display for MemoryLimit {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ MemoryLimit::Size(size) => write!(f, "{}", size),
+ MemoryLimit::Percentage(pct) => write!(f, "{}%", pct * 100.0),
+ }
+ }
+}
+
+impl Serialize for MemoryLimit {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl<'de> Deserialize<'de> for MemoryLimit {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ let s = String::deserialize(deserializer)?;
+ MemoryLimit::from_str(&s).map_err(serde::de::Error::custom)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_absolute_size() {
+ assert_eq!(
+ "2GB".parse::().unwrap(),
+ MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024))
+ );
+ assert_eq!(
+ "512MB".parse::().unwrap(),
+ MemoryLimit::Size(ReadableSize(512 * 1024 * 1024))
+ );
+ assert_eq!(
+ "0".parse::().unwrap(),
+ MemoryLimit::Size(ReadableSize(0))
+ );
+ }
+
+ #[test]
+ fn test_parse_percentage() {
+ assert_eq!(
+ "50%".parse::().unwrap(),
+ MemoryLimit::Percentage(0.5)
+ );
+ assert_eq!(
+ "75%".parse::().unwrap(),
+ MemoryLimit::Percentage(0.75)
+ );
+ assert_eq!(
+ "0%".parse::().unwrap(),
+ MemoryLimit::Percentage(0.0)
+ );
+ }
+
+ #[test]
+ fn test_parse_invalid() {
+ assert!("150%".parse::().is_err());
+ assert!("-10%".parse::().is_err());
+ assert!("invalid".parse::().is_err());
+ }
+
+ #[test]
+ fn test_resolve() {
+ let total = 8 * 1024 * 1024 * 1024; // 8GB
+
+ assert_eq!(
+ MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024)).resolve(total),
+ 2 * 1024 * 1024 * 1024
+ );
+ assert_eq!(
+ MemoryLimit::Percentage(0.5).resolve(total),
+ 4 * 1024 * 1024 * 1024
+ );
+ assert_eq!(MemoryLimit::Size(ReadableSize(0)).resolve(total), 0);
+ assert_eq!(MemoryLimit::Percentage(0.0).resolve(total), 0);
+ }
+
+ #[test]
+ fn test_is_unlimited() {
+ assert!(MemoryLimit::Size(ReadableSize(0)).is_unlimited());
+ assert!(MemoryLimit::Percentage(0.0).is_unlimited());
+ assert!(!MemoryLimit::Size(ReadableSize(1024)).is_unlimited());
+ assert!(!MemoryLimit::Percentage(0.5).is_unlimited());
+ }
+
+ #[test]
+ fn test_parse_100_percent() {
+ assert_eq!(
+ "100%".parse::().unwrap(),
+ MemoryLimit::Percentage(1.0)
+ );
+ }
+
+ #[test]
+ fn test_parse_decimal_percentage() {
+ assert_eq!(
+ "20.5%".parse::().unwrap(),
+ MemoryLimit::Percentage(0.205)
+ );
+ }
+
+ #[test]
+ fn test_display_integer_percentage() {
+ assert_eq!(MemoryLimit::Percentage(0.2).to_string(), "20%");
+ assert_eq!(MemoryLimit::Percentage(0.5).to_string(), "50%");
+ }
+
+ #[test]
+ fn test_display_decimal_percentage() {
+ assert_eq!(MemoryLimit::Percentage(0.205).to_string(), "20.5%");
+ }
+}
diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs
index e07d152d2dc4..88d79d570e24 100644
--- a/src/common/recordbatch/src/error.rs
+++ b/src/common/recordbatch/src/error.rs
@@ -15,6 +15,7 @@
//! Error of record batch.
use std::any::Any;
+use common_base::readable_size::ReadableSize;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
@@ -193,6 +194,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display(
+ "Exceeded memory limit: {} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}",
+ ReadableSize(*requested as u64),
+ ReadableSize(*global_used as u64),
+ if *limit > 0 { global_used * 100 / limit } else { 0 },
+ ReadableSize(*stream_used as u64),
+ is_privileged,
+ ReadableSize(*effective_limit as u64),
+ if *limit > 0 { effective_limit * 100 / limit } else { 0 },
+ ReadableSize(*limit as u64)
+ ))]
+ ExceedMemoryLimit {
+ requested: usize,
+ global_used: usize,
+ stream_used: usize,
+ is_privileged: bool,
+ effective_limit: usize,
+ limit: usize,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
impl ErrorExt for Error {
@@ -229,6 +252,8 @@ impl ErrorExt for Error {
Error::StreamTimeout { .. } => StatusCode::Cancelled,
Error::StreamCancelled { .. } => StatusCode::Cancelled,
+
+ Error::ExceedMemoryLimit { .. } => StatusCode::RuntimeResourcesExhausted,
}
}
diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs
index 7ae4a419d6fa..8cca133dd026 100644
--- a/src/common/recordbatch/src/lib.rs
+++ b/src/common/recordbatch/src/lib.rs
@@ -21,8 +21,10 @@ pub mod filter;
mod recordbatch;
pub mod util;
+use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
@@ -406,6 +408,393 @@ impl> + Unpin> Stream for RecordBatchStream
}
}
+/// Memory permit for a stream, providing privileged access or rate limiting.
+///
+/// The permit tracks whether this stream has privileged Top-K status.
+/// When dropped, it automatically releases any privileged slot it holds.
+pub struct MemoryPermit {
+ tracker: QueryMemoryTracker,
+ is_privileged: AtomicBool,
+}
+
+impl MemoryPermit {
+ /// Check if this permit currently has privileged status.
+ pub fn is_privileged(&self) -> bool {
+ self.is_privileged.load(Ordering::Acquire)
+ }
+
+ /// Ensure this permit has privileged status by acquiring a slot if available.
+ /// Returns true if privileged (either already privileged or just acquired privilege).
+ fn ensure_privileged(&self) -> bool {
+ if self.is_privileged.load(Ordering::Acquire) {
+ return true;
+ }
+
+ // Try to claim a privileged slot
+ self.tracker
+ .privileged_count
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
+ if count < self.tracker.privileged_slots {
+ Some(count + 1)
+ } else {
+ None
+ }
+ })
+ .map(|_| {
+ self.is_privileged.store(true, Ordering::Release);
+ true
+ })
+ .unwrap_or(false)
+ }
+
+ /// Track additional memory usage with this permit.
+ /// Returns error if limit is exceeded.
+ ///
+ /// # Arguments
+ /// * `additional` - Additional memory size to track in bytes
+ /// * `stream_tracked` - Total memory already tracked by this stream
+ ///
+ /// # Behavior
+ /// - Privileged streams: Can push global memory usage up to full limit
+ /// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7)
+ /// - Standard-tier streams automatically attempt to acquire privilege if slots become available
+ /// - The configured limit is absolute hard limit - no stream can exceed it
+ pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> {
+ // Ensure privileged status if possible
+ let is_privileged = self.ensure_privileged();
+
+ self.tracker
+ .track_internal(additional, is_privileged, stream_tracked)
+ }
+
+ /// Release tracked memory.
+ ///
+ /// # Arguments
+ /// * `amount` - Amount of memory to release in bytes
+ pub fn release(&self, amount: usize) {
+ self.tracker.release(amount);
+ }
+}
+
+impl Drop for MemoryPermit {
+ fn drop(&mut self) {
+ // Release privileged slot if we had one
+ if self.is_privileged.load(Ordering::Acquire) {
+ self.tracker
+ .privileged_count
+ .fetch_sub(1, Ordering::Release);
+ }
+ }
+}
+
+/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries.
+///
+/// Implements a two-tier memory allocation strategy:
+/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit
+/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%)
+/// - Privilege is granted on a first-come-first-served basis
+/// - The configured limit is an absolute hard cap - no stream can exceed it
+#[derive(Clone)]
+pub struct QueryMemoryTracker {
+ current: Arc,
+ limit: usize,
+ standard_tier_memory_fraction: f64,
+ privileged_count: Arc,
+ privileged_slots: usize,
+ on_update: Option>,
+ on_reject: Option>,
+}
+
+impl fmt::Debug for QueryMemoryTracker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("QueryMemoryTracker")
+ .field("current", &self.current.load(Ordering::Acquire))
+ .field("limit", &self.limit)
+ .field(
+ "standard_tier_memory_fraction",
+ &self.standard_tier_memory_fraction,
+ )
+ .field(
+ "privileged_count",
+ &self.privileged_count.load(Ordering::Acquire),
+ )
+ .field("privileged_slots", &self.privileged_slots)
+ .field("on_update", &self.on_update.is_some())
+ .field("on_reject", &self.on_reject.is_some())
+ .finish()
+ }
+}
+
+impl QueryMemoryTracker {
+ /// Default maximum number of streams that can get privileged memory access.
+ /// Privileged streams can use up to the full memory limit.
+ /// Privilege is granted on a first-come-first-served basis.
+ pub const DEFAULT_PRIVILEGED_SLOTS: usize = 20;
+ /// Default memory fraction available to standard-tier (non-privileged) streams.
+ /// Standard-tier streams can only use up to `limit * this_fraction`.
+ /// Value range: [0.0, 1.0].
+ pub const DEFAULT_STANDARD_TIER_MEMORY_FRACTION: f64 = 0.7;
+
+ /// Create a new memory tracker with the given limit (in bytes).
+ ///
+ /// Uses default configuration:
+ /// - Privileged slots: 20
+ /// - Standard-tier memory fraction: 0.7 (70%)
+ ///
+ /// # Arguments
+ /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
+ pub fn new(limit: usize) -> Self {
+ Self::with_config(
+ limit,
+ Self::DEFAULT_PRIVILEGED_SLOTS,
+ Self::DEFAULT_STANDARD_TIER_MEMORY_FRACTION,
+ )
+ }
+
+ /// Create a new memory tracker with custom privileged slots limit.
+ pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self {
+ Self::with_config(
+ limit,
+ privileged_slots,
+ Self::DEFAULT_STANDARD_TIER_MEMORY_FRACTION,
+ )
+ }
+
+ /// Create a new memory tracker with full configuration.
+ ///
+ /// # Arguments
+ /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
+ /// * `privileged_slots` - Maximum number of streams that can get privileged status.
+ /// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]).
+ ///
+ /// # Panics
+ /// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0].
+ pub fn with_config(
+ limit: usize,
+ privileged_slots: usize,
+ standard_tier_memory_fraction: f64,
+ ) -> Self {
+ assert!(
+ (0.0..=1.0).contains(&standard_tier_memory_fraction),
+ "standard_tier_memory_fraction must be in [0.0, 1.0], got {}",
+ standard_tier_memory_fraction
+ );
+
+ Self {
+ current: Arc::new(AtomicUsize::new(0)),
+ limit,
+ standard_tier_memory_fraction,
+ privileged_count: Arc::new(AtomicUsize::new(0)),
+ privileged_slots,
+ on_update: None,
+ on_reject: None,
+ }
+ }
+
+ /// Register a new permit for memory tracking.
+ /// The first `privileged_slots` permits get privileged status automatically.
+ /// The returned permit can be shared across multiple streams of the same query.
+ pub fn register_permit(&self) -> MemoryPermit {
+ // Try to claim a privileged slot
+ let is_privileged = self
+ .privileged_count
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
+ if count < self.privileged_slots {
+ Some(count + 1)
+ } else {
+ None
+ }
+ })
+ .is_ok();
+
+ MemoryPermit {
+ tracker: self.clone(),
+ is_privileged: AtomicBool::new(is_privileged),
+ }
+ }
+
+ /// Set a callback to be called whenever the usage changes successfully.
+ /// The callback receives the new total usage in bytes.
+ ///
+ /// # Note
+ /// The callback is called after both successful `track()` and `release()` operations.
+ /// It is called even when `limit == 0` (unlimited mode) to track actual usage.
+ pub fn with_update_callback(mut self, callback: F) -> Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.on_update = Some(Arc::new(callback));
+ self
+ }
+
+ /// Set a callback to be called when memory allocation is rejected.
+ ///
+ /// # Note
+ /// This is only called when `track()` fails due to exceeding the limit.
+ /// It is never called when `limit == 0` (unlimited mode).
+ pub fn with_reject_callback(mut self, callback: F) -> Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.on_reject = Some(Arc::new(callback));
+ self
+ }
+
+ /// Get the current memory usage in bytes.
+ pub fn current(&self) -> usize {
+ self.current.load(Ordering::Acquire)
+ }
+
+ /// Internal method to track additional memory usage.
+ ///
+ /// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly.
+ fn track_internal(
+ &self,
+ additional: usize,
+ is_privileged: bool,
+ stream_tracked: usize,
+ ) -> Result<()> {
+ // Calculate effective global limit based on stream privilege
+ // Privileged streams: can push global usage up to full limit
+ // Standard-tier streams: can only push global usage up to fraction of limit
+ let effective_limit = if is_privileged {
+ self.limit
+ } else {
+ (self.limit as f64 * self.standard_tier_memory_fraction) as usize
+ };
+
+ let mut new_total = 0;
+ let result = self
+ .current
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
+ new_total = current.saturating_add(additional);
+
+ if self.limit == 0 {
+ // Unlimited mode
+ return Some(new_total);
+ }
+
+ // Check if new global total exceeds effective limit
+ // The configured limit is absolute hard limit - no stream can exceed it
+ if new_total <= effective_limit {
+ Some(new_total)
+ } else {
+ None
+ }
+ });
+
+ match result {
+ Ok(_) => {
+ if let Some(callback) = &self.on_update {
+ callback(new_total);
+ }
+ Ok(())
+ }
+ Err(current) => {
+ if let Some(callback) = &self.on_reject {
+ callback();
+ }
+ error::ExceedMemoryLimitSnafu {
+ requested: additional,
+ global_used: current,
+ stream_used: stream_tracked,
+ is_privileged,
+ effective_limit,
+ limit: self.limit,
+ }
+ .fail()
+ }
+ }
+ }
+
+ /// Release tracked memory.
+ ///
+ /// # Arguments
+ /// * `amount` - Amount of memory to release in bytes
+ pub fn release(&self, amount: usize) {
+ if let Ok(old_value) =
+ self.current
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
+ Some(current.saturating_sub(amount))
+ })
+ && let Some(callback) = &self.on_update
+ {
+ callback(old_value.saturating_sub(amount));
+ }
+ }
+}
+
+/// A wrapper stream that tracks memory usage of RecordBatches.
+pub struct MemoryTrackedStream {
+ inner: SendableRecordBatchStream,
+ permit: Arc,
+ // Total tracked size, released when stream drops.
+ total_tracked: usize,
+}
+
+impl MemoryTrackedStream {
+ pub fn new(inner: SendableRecordBatchStream, permit: Arc) -> Self {
+ Self {
+ inner,
+ permit,
+ total_tracked: 0,
+ }
+ }
+}
+
+impl Stream for MemoryTrackedStream {
+ type Item = Result;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll