Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
Expand Down Expand Up @@ -235,13 +237,15 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
Expand Down
8 changes: 8 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better set a reasonable default, like 100MB, for better out-of-the-box user experience

## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
Expand All @@ -54,6 +58,10 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
Expand Down
8 changes: 8 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
Expand All @@ -56,6 +60,10 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl<'a> FlownodeServiceBuilder<'a> {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
max_total_message_memory: opts.grpc.max_total_message_memory.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
max_connection_age: opts.grpc.max_connection_age,
};
Expand Down
14 changes: 14 additions & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Too many concurrent large requests, limit: {}, request size: {} bytes",
limit,
request_size
))]
TooManyConcurrentRequests {
limit: usize,
request_size: usize,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: {}", reason))]
InvalidQuery {
reason: String,
Expand Down Expand Up @@ -729,6 +741,8 @@ impl ErrorExt for Error {

InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments,

TooManyConcurrentRequests { .. } => StatusCode::RuntimeResourcesExhausted,

ParsePromQL { source, .. } => source.status_code(),
Other { source, .. } => source.status_code(),

Expand Down
16 changes: 16 additions & 0 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod database;
pub mod flight;
pub mod frontend_grpc_handler;
pub mod greptime_handler;
pub mod memory_limit;
pub mod prom_query_gateway;
pub mod region_server;

Expand Down Expand Up @@ -51,6 +52,7 @@ use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, T
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::server::Server;
use crate::tls::TlsOption;

Expand All @@ -67,6 +69,8 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
Expand Down Expand Up @@ -116,6 +120,7 @@ impl GrpcOptions {
GrpcServerConfig {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
tls: self.tls.clone(),
max_connection_age: self.max_connection_age,
}
Expand All @@ -134,6 +139,7 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
Expand All @@ -153,6 +159,7 @@ impl GrpcOptions {
server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
Expand Down Expand Up @@ -217,6 +224,7 @@ pub struct GrpcServer {
bind_addr: Option<SocketAddr>,
name: Option<String>,
config: GrpcServerConfig,
memory_limiter: RequestMemoryLimiter,
}

/// Grpc Server configuration
Expand All @@ -226,6 +234,8 @@ pub struct GrpcServerConfig {
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: usize,
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
Expand All @@ -238,6 +248,7 @@ impl Default for GrpcServerConfig {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
max_total_message_memory: 0,
tls: TlsOption::default(),
max_connection_age: None,
}
Expand Down Expand Up @@ -277,6 +288,11 @@ impl GrpcServer {
}
Ok(())
}

/// Get the memory limiter for monitoring current memory usage
pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
&self.memory_limiter
}
}

pub struct HealthCheckHandler;
Expand Down
21 changes: 20 additions & 1 deletion src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::grpc::{GrpcServer, GrpcServerConfig};
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::tls::TlsOption;

/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
Expand All @@ -57,7 +58,17 @@ macro_rules! add_service {
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

$builder.routes_builder_mut().add_service(service_builder);
// Apply memory limiter layer
use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
let service_with_limiter = $crate::tower::ServiceBuilder::new()
.layer(MemoryLimiterExtensionLayer::new(
$builder.memory_limiter().clone(),
))
.service(service_builder);

$builder
.routes_builder_mut()
.add_service(service_with_limiter);
};
}

Expand All @@ -73,17 +84,20 @@ pub struct GrpcServerBuilder {
HeaderInterceptor,
>,
>,
memory_limiter: RequestMemoryLimiter,
}

impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
Self {
name: None,
config,
runtime,
routes_builder: RoutesBuilder::default(),
tls_config: None,
otel_arrow_service: None,
memory_limiter,
}
}

Expand All @@ -95,6 +109,10 @@ impl GrpcServerBuilder {
&self.runtime
}

pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
&self.memory_limiter
}

pub fn name(self, name: Option<String>) -> Self {
Self { name, ..self }
}
Expand Down Expand Up @@ -198,6 +216,7 @@ impl GrpcServerBuilder {
bind_addr: None,
name: self.name,
config: self.config,
memory_limiter: self.memory_limiter,
}
}
}
25 changes: 25 additions & 0 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use prost::Message;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{TonicResult, cancellation};
use crate::hint_headers;
use crate::request_limiter::RequestMemoryLimiter;

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -48,6 +50,17 @@ impl GreptimeDatabase for DatabaseService {
"GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
remote_addr, hints
);

let _guard = request
.extensions()
.get::<RequestMemoryLimiter>()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.get_ref().encoded_len();
limiter.try_acquire(message_size).transpose()
})
.transpose()?;

let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
Expand Down Expand Up @@ -94,13 +107,25 @@ impl GreptimeDatabase for DatabaseService {
"GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
remote_addr, hints
);

let limiter = request.extensions().get::<RequestMemoryLimiter>().cloned();

let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;

let _guard = limiter
.as_ref()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.encoded_len();
limiter.try_acquire(message_size).transpose()
})
.transpose()?;
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
Expand Down
Loading
Loading