Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
Expand Down Expand Up @@ -235,13 +237,15 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
Expand Down
8 changes: 8 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better set a reasonable default, like 100MB, for better out-of-the-box user experience

## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
Expand All @@ -54,6 +58,10 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
Expand Down
8 changes: 8 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
Expand All @@ -56,6 +60,10 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl<'a> FlownodeServiceBuilder<'a> {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
max_total_message_memory: opts.grpc.max_total_message_memory.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
max_connection_age: opts.grpc.max_connection_age,
};
Expand Down
14 changes: 14 additions & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Too many concurrent large requests, limit: {}, request size: {} bytes",
limit,
request_size
))]
TooManyConcurrentRequests {
limit: usize,
request_size: usize,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: {}", reason))]
InvalidQuery {
reason: String,
Expand Down Expand Up @@ -729,6 +741,8 @@ impl ErrorExt for Error {

InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments,

TooManyConcurrentRequests { .. } => StatusCode::RuntimeResourcesExhausted,

ParsePromQL { source, .. } => source.status_code(),
Other { source, .. } => source.status_code(),

Expand Down
16 changes: 16 additions & 0 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod database;
pub mod flight;
pub mod frontend_grpc_handler;
pub mod greptime_handler;
pub mod memory_limit;
pub mod prom_query_gateway;
pub mod region_server;

Expand Down Expand Up @@ -51,6 +52,7 @@ use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, T
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::server::Server;
use crate::tls::TlsOption;

Expand All @@ -67,6 +69,8 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
Expand Down Expand Up @@ -116,6 +120,7 @@ impl GrpcOptions {
GrpcServerConfig {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
tls: self.tls.clone(),
max_connection_age: self.max_connection_age,
}
Expand All @@ -134,6 +139,7 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
Expand All @@ -153,6 +159,7 @@ impl GrpcOptions {
server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
Expand Down Expand Up @@ -217,6 +224,7 @@ pub struct GrpcServer {
bind_addr: Option<SocketAddr>,
name: Option<String>,
config: GrpcServerConfig,
memory_limiter: RequestMemoryLimiter,
}

/// Grpc Server configuration
Expand All @@ -226,6 +234,8 @@ pub struct GrpcServerConfig {
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: usize,
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
Expand All @@ -238,6 +248,7 @@ impl Default for GrpcServerConfig {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
max_total_message_memory: 0,
tls: TlsOption::default(),
max_connection_age: None,
}
Expand Down Expand Up @@ -277,6 +288,11 @@ impl GrpcServer {
}
Ok(())
}

/// Get the memory limiter for monitoring current memory usage
pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
&self.memory_limiter
}
}

pub struct HealthCheckHandler;
Expand Down
21 changes: 20 additions & 1 deletion src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::grpc::{GrpcServer, GrpcServerConfig};
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::tls::TlsOption;

/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
Expand All @@ -57,7 +58,17 @@ macro_rules! add_service {
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

$builder.routes_builder_mut().add_service(service_builder);
// Apply memory limiter layer
use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
let service_with_limiter = $crate::tower::ServiceBuilder::new()
.layer(MemoryLimiterExtensionLayer::new(
$builder.memory_limiter().clone(),
))
.service(service_builder);

$builder
.routes_builder_mut()
.add_service(service_with_limiter);
};
}

Expand All @@ -73,17 +84,20 @@ pub struct GrpcServerBuilder {
HeaderInterceptor,
>,
>,
memory_limiter: RequestMemoryLimiter,
}

impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
Self {
name: None,
config,
runtime,
routes_builder: RoutesBuilder::default(),
tls_config: None,
otel_arrow_service: None,
memory_limiter,
}
}

Expand All @@ -95,6 +109,10 @@ impl GrpcServerBuilder {
&self.runtime
}

pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
&self.memory_limiter
}

pub fn name(self, name: Option<String>) -> Self {
Self { name, ..self }
}
Expand Down Expand Up @@ -198,6 +216,7 @@ impl GrpcServerBuilder {
bind_addr: None,
name: self.name,
config: self.config,
memory_limiter: self.memory_limiter,
}
}
}
46 changes: 46 additions & 0 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use prost::Message;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{TonicResult, cancellation};
use crate::hint_headers;
use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
use crate::request_limiter::RequestMemoryLimiter;

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -48,6 +51,27 @@ impl GreptimeDatabase for DatabaseService {
"GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
remote_addr, hints
);

let _guard = request
.extensions()
.get::<RequestMemoryLimiter>()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.get_ref().encoded_len();
limiter
.try_acquire(message_size)
.map(|guard| {
guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
})
})
.inspect_err(|_| {
METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
})
.transpose()
})
.transpose()?;

let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
Expand Down Expand Up @@ -94,13 +118,35 @@ impl GreptimeDatabase for DatabaseService {
"GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
remote_addr, hints
);

let limiter = request.extensions().get::<RequestMemoryLimiter>().cloned();

let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;

let _guard = limiter
.as_ref()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.encoded_len();
limiter
.try_acquire(message_size)
.map(|guard| {
guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
})
})
.inspect_err(|_| {
METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
})
.transpose()
})
.transpose()?;
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
Expand Down
Loading
Loading