Skip to content

Commit 4553cc4

Browse files
authored
Add greedy mempool with configuration options (#433)
Adds a usable memory limit to each query. If P_QUERY_MEMORY_LIMIT is set then it will use that as a fixed limit for greedy memory pool otherwise it will use 80% of available memory as hard limit for this query. In effect if the memory pool quota is filled it then Datafusion will try to spill few execution nodes in temporary directory. This solution is not dynamic as Datafusion does not keep track of actual available memory during its runtime. Pool size set to Datafusion is a fixed number in bytes. So it can happen that bound was set higher but some other process took memory and thus actual allocation cannot happen at runtime, Datafusion will return an error in that case instead of OOM. Vice versa if bound was set lower and later during runtime if more memory is available for use even then it won't be able to use it.
1 parent e922b07 commit 4553cc4

File tree

5 files changed

+47
-16
lines changed

5 files changed

+47
-16
lines changed

server/src/option.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ pub struct Server {
187187
/// Rows in Parquet Rowgroup
188188
pub row_group_size: usize,
189189

190+
/// Query memory limit in bytes
191+
pub query_memory_pool_size: Option<usize>,
192+
190193
/// Parquet compression algorithm
191194
pub parquet_compression: Compression,
192195
}
@@ -229,6 +232,11 @@ impl FromArgMatches for Server {
229232
.get_one::<bool>(Self::SEND_ANALYTICS)
230233
.cloned()
231234
.expect("default for send analytics");
235+
// converts Gib to bytes before assigning
236+
self.query_memory_pool_size = m
237+
.get_one::<u8>(Self::QUERY_MEM_POOL_SIZE)
238+
.cloned()
239+
.map(|gib| gib as usize * 1024usize.pow(3));
232240
self.row_group_size = m
233241
.get_one::<usize>(Self::ROW_GROUP_SIZE)
234242
.cloned()
@@ -263,6 +271,7 @@ impl Server {
263271
pub const PASSWORD: &str = "password";
264272
pub const CHECK_UPDATE: &str = "check-update";
265273
pub const SEND_ANALYTICS: &str = "send-analytics";
274+
pub const QUERY_MEM_POOL_SIZE: &str = "query-mempool-size";
266275
pub const ROW_GROUP_SIZE: &str = "row-group-size";
267276
pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo";
268277
pub const DEFAULT_USERNAME: &str = "admin";
@@ -352,6 +361,15 @@ impl Server {
352361
.value_parser(value_parser!(bool))
353362
.help("Disable/Enable checking for updates"),
354363
)
364+
.arg(
365+
Arg::new(Self::QUERY_MEM_POOL_SIZE)
366+
.long(Self::QUERY_MEM_POOL_SIZE)
367+
.env("P_QUERY_MEMORY_LIMIT")
368+
.value_name("Gib")
369+
.required(false)
370+
.value_parser(value_parser!(u8))
371+
.help("Set a fixed memory limit for query"),
372+
)
355373
.arg(
356374
Arg::new(Self::ROW_GROUP_SIZE)
357375
.long(Self::ROW_GROUP_SIZE)

server/src/query.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ use chrono::{DateTime, Utc};
2323
use datafusion::arrow::datatypes::Schema;
2424
use datafusion::arrow::record_batch::RecordBatch;
2525
use datafusion::execution::context::SessionState;
26+
use datafusion::execution::disk_manager::DiskManagerConfig;
27+
use datafusion::execution::runtime_env::RuntimeEnv;
2628
use datafusion::prelude::*;
2729
use itertools::Itertools;
2830
use serde_json::Value;
2931
use std::path::Path;
3032
use std::sync::Arc;
33+
use sysinfo::{System, SystemExt};
3134

3235
use crate::option::CONFIG;
3336
use crate::storage::ObjectStorageError;
@@ -81,7 +84,23 @@ impl Query {
8184
// create session context for this query
8285
fn create_session_context(&self) -> SessionContext {
8386
let config = SessionConfig::default();
84-
let runtime = CONFIG.storage().get_datafusion_runtime();
87+
let runtime_config = CONFIG
88+
.storage()
89+
.get_datafusion_runtime()
90+
.with_disk_manager(DiskManagerConfig::NewOs);
91+
92+
let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size {
93+
Some(size) => (size, 1.),
94+
None => {
95+
let mut system = System::new();
96+
system.refresh_memory();
97+
let available_mem = system.available_memory();
98+
(available_mem as usize, 0.85)
99+
}
100+
};
101+
102+
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
103+
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
85104
let mut state = SessionState::with_config_rt(config, runtime);
86105

87106
if let Some(tag) = &self.filter_tag {

server/src/storage/localfs.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use datafusion::{
3131
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
3232
},
3333
error::DataFusionError,
34-
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
34+
execution::runtime_env::RuntimeConfig,
3535
};
3636
use fs_extra::file::{move_file, CopyOptions};
3737
use futures::{stream::FuturesUnordered, TryStreamExt};
@@ -64,10 +64,8 @@ pub struct FSConfig {
6464
}
6565

6666
impl ObjectStorageProvider for FSConfig {
67-
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv> {
68-
let config = RuntimeConfig::new();
69-
let runtime = RuntimeEnv::new(config).unwrap();
70-
Arc::new(runtime)
67+
fn get_datafusion_runtime(&self) -> RuntimeConfig {
68+
RuntimeConfig::new()
7169
}
7270

7371
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {

server/src/storage/object_storage.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use arrow_schema::Schema;
3333
use async_trait::async_trait;
3434
use bytes::Bytes;
3535
use datafusion::{
36-
datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv,
36+
datasource::listing::ListingTable, error::DataFusionError,
37+
execution::runtime_env::RuntimeConfig,
3738
};
3839
use relative_path::RelativePath;
3940
use relative_path::RelativePathBuf;
@@ -48,7 +49,7 @@ const SCHEMA_FILE_NAME: &str = ".schema";
4849
const ALERT_FILE_NAME: &str = ".alert.json";
4950

5051
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug {
51-
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv>;
52+
fn get_datafusion_runtime(&self) -> RuntimeConfig;
5253
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send>;
5354
fn get_endpoint(&self) -> String;
5455
fn register_store_metrics(&self, handler: &PrometheusMetrics);

server/src/storage/s3.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion::datasource::object_store::{
2828
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
2929
};
3030
use datafusion::error::DataFusionError;
31-
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
31+
use datafusion::execution::runtime_env::RuntimeConfig;
3232
use futures::stream::FuturesUnordered;
3333
use futures::{StreamExt, TryStreamExt};
3434
use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum};
@@ -184,7 +184,7 @@ impl S3Config {
184184
}
185185

186186
impl ObjectStorageProvider for S3Config {
187-
fn get_datafusion_runtime(&self) -> Arc<RuntimeEnv> {
187+
fn get_datafusion_runtime(&self) -> RuntimeConfig {
188188
let s3 = self.get_default_builder().build().unwrap();
189189

190190
// limit objectstore to a concurrent request limit
@@ -194,12 +194,7 @@ impl ObjectStorageProvider for S3Config {
194194
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
195195
object_store_registry.register_store(url.as_ref(), Arc::new(s3));
196196

197-
let config =
198-
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
199-
200-
let runtime = RuntimeEnv::new(config).unwrap();
201-
202-
Arc::new(runtime)
197+
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
203198
}
204199

205200
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {

0 commit comments

Comments
 (0)