Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .tool-versions

This file was deleted.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions docs/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

- Configuration errors:
- [Missing or invalid TLS configuration](#config-missing-or-invalid-tls)
- [Network configuration change requires restart](#config-network-change-requires-restart)

<!-- ---------------------------------------------------------------------------------------------------- -->

<!-- ---------------------------------------------------------------------------------------------------- -->
Expand Down Expand Up @@ -651,3 +653,35 @@ Check that the certificate and private key are valid.


<!-- ---------------------------------------------------------------------------------------------------- -->


## Network configuration change requires restart <a id='config-network-change-requires-restart'></a>

A configuration reload was attempted with network-level changes that require a full restart.

### Error message

```
Network configuration change requires restart
```

### Notes

When receiving a SIGHUP signal, CipherStash Proxy attempts to reload application-level configuration without disrupting active connections. However, certain network-related configuration changes require stopping and restarting the proxy service to take effect.

The following settings require a restart when changed:
- `server.host` - The host address the proxy listens on
- `server.port` - The port the proxy listens on
- `server.require_tls` - TLS requirement setting
- `server.worker_threads` - Number of worker threads
- `tls` - Any TLS certificate or key configuration

### How to fix

1. Stop the CipherStash Proxy service
2. Update the configuration as needed
3. Restart the CipherStash Proxy service

Application-level configuration changes (database, auth, encrypt, log, prometheus, development) can be reloaded without restart using SIGHUP.

<!-- ---------------------------------------------------------------------------------------------------- -->
17 changes: 13 additions & 4 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ echo
mise --env tcp run postgres:setup
mise --env tls run postgres:setup

mise run test:integration:showcase

echo
echo '###############################################'
echo '# Test: Prometheus'
Expand Down Expand Up @@ -377,10 +379,8 @@ echo '###############################################'
echo '# Test: Showcase'
echo '###############################################'
echo
mise --env tls run proxy:up proxy-tls --extra-args "--detach --wait"
mise --env tls run test:wait_for_postgres_to_quack --port 6432 --max-retries 20 --tls
RUST_BACKTRACE=full cargo run -p showcase
mise --env tls run proxy:down

mise run test:integration:showcase

echo
echo '###############################################'
Expand Down Expand Up @@ -637,6 +637,15 @@ else
fi
"""

[tasks."test:integration:showcase"]
description = "Run Showcase integration test"
run = """
mise --env tls run proxy:up proxy-tls --extra-args "--detach --wait"
mise --env tls run test:wait_for_postgres_to_quack --port 6432 --max-retries 20 --tls
RUST_BACKTRACE=full cargo run -p showcase
mise --env tls run proxy:down
"""

[tasks.release]
description = "Publish release artifacts"
depends = ["release:docker"]
Expand Down
1 change: 1 addition & 0 deletions packages/cipherstash-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "2.0.0"
edition = "2021"

[dependencies]
async-trait = "0.1"
aws-lc-rs = "1.13.3"
bigdecimal = { version = "0.4.6", features = ["serde-json"] }
arc-swap = "1.7.1"
Expand Down
15 changes: 15 additions & 0 deletions packages/cipherstash-proxy/src/config/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ impl DatabaseConfig {
})?;
Ok(name)
}

#[cfg(test)]
pub fn for_testing() -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started with impl Default but a default implies that there is a sensible default, but there is not.

Went withfor_testing instead, but open to ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this

Self {
host: Self::default_host(),
port: Self::default_port(),
name: "test".to_string(),
username: "test".to_string(),
password: Protected::new("test".to_string()),
connection_timeout: None,
with_tls_verification: false,
config_reload_interval: Self::default_config_reload_interval(),
schema_reload_interval: Self::default_schema_reload_interval(),
}
}
}

///
Expand Down
23 changes: 23 additions & 0 deletions packages/cipherstash-proxy/src/config/tandem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,29 @@ impl TandemConfig {

DEFAULT_THREAD_STACK_SIZE
}

#[cfg(test)]
pub fn for_testing() -> Self {
Self {
server: ServerConfig::default(),
database: DatabaseConfig::for_testing(),
auth: AuthConfig {
workspace_crn: "crn:ap-southeast-2.aws:IJGECSCWKREECNBS".parse().unwrap(),
client_access_key: "test".to_string(),
},
encrypt: EncryptConfig {
client_id: "test".to_string(),
client_key: "test".to_string(),
default_keyset_id: Some(
Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
),
},
tls: None,
log: LogConfig::default(),
prometheus: PrometheusConfig::default(),
development: None,
}
}
}

impl PrometheusConfig {
Expand Down
2 changes: 1 addition & 1 deletion packages/cipherstash-proxy/src/config/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{error::TlsConfigError, log::CONFIG};
/// Server TLS Configuration
/// This is listener/inbound connection config
///
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum TlsConfig {
Pem {
Expand Down
3 changes: 3 additions & 0 deletions packages/cipherstash-proxy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ pub enum ConfigError {
#[error("Expected an Encrypt configuration table")]
MissingEncryptConfigTable,

#[error("Network configuration change requires restart For help visit {}#config-network-change-requires-restart", ERROR_DOC_BASE_URL)]
NetworkConfigurationChangeRequiresRestart,

#[error(transparent)]
Parse(#[from] serde_json::Error),

Expand Down
45 changes: 26 additions & 19 deletions packages/cipherstash-proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use cipherstash_proxy::config::TandemConfig;
use cipherstash_proxy::connect::{self, AsyncStream};
use cipherstash_proxy::error::Error;
use cipherstash_proxy::error::{ConfigError, Error};
use cipherstash_proxy::prometheus::CLIENTS_ACTIVE_CONNECTIONS;
use cipherstash_proxy::proxy::Proxy;
use cipherstash_proxy::{cli, log, postgresql as pg, prometheus, tls, Args};
use clap::Parser;
use metrics::gauge;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio_util::task::TaskTracker;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -55,7 +54,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut proxy = init(config).await;

let mut listener = connect::bind_with_retry(&proxy.config.server).await;
let listener = connect::bind_with_retry(&proxy.config.server).await;
let tracker = TaskTracker::new();

let mut client_id = 0;
Expand All @@ -81,26 +80,24 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
break;
},
_ = sighup() => {
info!(msg = "Received SIGHUP. Reloading configuration");
(listener, proxy) = reload_config(listener, &args, proxy).await;
info!(msg = "Reloaded configuration");
info!(msg = "Received SIGHUP. Reloading application configuration");
proxy = reload_application_config(&proxy.config, &args).await.unwrap_or(proxy);
},
_ = sigterm() => {
info!(msg = "Received SIGTERM");
break;
},
Ok(client_stream) = AsyncStream::accept(&listener) => {

let proxy = proxy.clone();

client_id += 1;

let context = proxy.context(client_id);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proxy is the core orchestrator. We create a new context from the proxy for a particular client.


tracker.spawn(async move {
let proxy = proxy.clone();

gauge!(CLIENTS_ACTIVE_CONNECTIONS).increment(1);

match pg::handler(client_stream, proxy, client_id).await {
match pg::handler(client_stream,context).await {
Ok(_) => (),
Err(err) => {

Expand Down Expand Up @@ -261,25 +258,35 @@ async fn sighup() -> std::io::Result<()> {
Ok(())
}

async fn reload_config(listener: TcpListener, args: &Args, proxy: Proxy) -> (TcpListener, Proxy) {
fn has_network_config_changed(current: &TandemConfig, new: &TandemConfig) -> bool {
current.server.host != new.server.host
|| current.server.port != new.server.port
|| current.server.require_tls != new.server.require_tls
|| current.server.worker_threads != new.server.worker_threads
|| current.tls != new.tls
}

async fn reload_application_config(config: &TandemConfig, args: &Args) -> Result<Proxy, Error> {
let new_config = match TandemConfig::load(args) {
Ok(config) => config,
Err(err) => {
warn!(
msg = "Configuration could not be reloaded: {}",
error = err.to_string()
);
return (listener, proxy);
return Err(err);
}
};

let new_proxy = init(new_config).await;
// Check for network config changes that require restart
if has_network_config_changed(config, &new_config) {
let err = ConfigError::NetworkConfigurationChangeRequiresRestart;
warn!(msg = err.to_string());

// Explicit drop needed here to free the network resources before binding if using the same address & port
std::mem::drop(listener);
return Err(err.into());
}

(
connect::bind_with_retry(&new_proxy.config.server).await,
new_proxy,
)
info!(msg = "Configuration reloaded");
let proxy = init(new_config).await;
Ok(proxy)
}
38 changes: 16 additions & 22 deletions packages/cipherstash-proxy/src/postgresql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::prometheus::{
DECRYPTION_ERROR_TOTAL, DECRYPTION_REQUESTS_TOTAL, ROWS_ENCRYPTED_TOTAL,
ROWS_PASSTHROUGH_TOTAL, ROWS_TOTAL, SERVER_BYTES_RECEIVED_TOTAL,
};
use crate::proxy::Proxy;
use crate::proxy::EncryptionService;
use bytes::BytesMut;
use metrics::{counter, histogram};
use std::time::Instant;
Expand Down Expand Up @@ -70,25 +70,25 @@ use tracing::{debug, error, info, warn};
/// - `RowDescription`: Result column metadata (modified for encrypted columns)
/// - `ParameterDescription`: Parameter metadata (modified for encrypted parameters)
/// - `ReadyForQuery`: Session ready state (triggers schema reload if needed)
pub struct Backend<R>
pub struct Backend<R, S>
where
R: AsyncRead + Unpin,
S: EncryptionService,
{
/// Sender for outgoing messages to client
client_sender: Sender,
/// Reader for incoming messages from server
server_reader: R,
/// Encryption service for column decryption
proxy: Proxy,
/// Session context with portal and statement metadata
context: Context,
context: Context<S>,
/// Buffer for batching DataRow messages before decryption
buffer: MessageBuffer,
}

impl<R> Backend<R>
impl<R, S> Backend<R, S>
where
R: AsyncRead + Unpin,
S: EncryptionService,
{
/// Creates a new Backend instance.
///
Expand All @@ -98,12 +98,11 @@ where
/// * `server_reader` - Stream for reading messages from the PostgreSQL server
/// * `encrypt` - Encryption service for handling column decryption
/// * `context` - Session context shared with the frontend
pub fn new(client_sender: Sender, server_reader: R, proxy: Proxy, context: Context) -> Self {
pub fn new(client_sender: Sender, server_reader: R, context: Context<S>) -> Self {
let buffer = MessageBuffer::new();
Backend {
client_sender,
server_reader,
proxy,
context,
buffer,
}
Expand Down Expand Up @@ -150,19 +149,17 @@ where
/// Returns `Ok(())` on successful message processing, or an `Error` if a fatal
/// error occurs that should terminate the connection.
pub async fn rewrite(&mut self) -> Result<(), Error> {
let connection_timeout = self.proxy.config.database.connection_timeout();

let (code, mut bytes) = protocol::read_message(
&mut self.server_reader,
self.context.client_id,
connection_timeout,
self.context.connection_timeout(),
)
.await?;

let sent: u64 = bytes.len() as u64;
counter!(SERVER_BYTES_RECEIVED_TOTAL).increment(sent);

if self.proxy.is_passthrough() {
if self.context.is_passthrough() {
debug!(target: DEVELOPMENT,
client_id = self.context.client_id,
msg = "Passthrough enabled"
Expand Down Expand Up @@ -250,7 +247,7 @@ where
msg = "ReadyForQuery"
);
if self.context.schema_changed() {
self.proxy.reload_schema().await;
self.context.reload_schema().await;
}
}

Expand Down Expand Up @@ -450,16 +447,12 @@ where
);

// Decrypt CipherText -> Plaintext
let plaintexts = self
.proxy
.decrypt(keyset_id, ciphertexts)
.await
.inspect_err(|_| {
counter!(DECRYPTION_ERROR_TOTAL).increment(1);
})?;
let plaintexts = self.context.decrypt(ciphertexts).await.inspect_err(|_| {
counter!(DECRYPTION_ERROR_TOTAL).increment(1);
})?;

// Avoid the iter calculation if we can
if self.proxy.config.prometheus_enabled() {
if self.context.prometheus_enabled() {
let decrypted_count =
plaintexts
.iter()
Expand Down Expand Up @@ -655,9 +648,10 @@ where
}

/// Implementation of PostgreSQL error handling for the Backend component.
impl<R> PostgreSqlErrorHandler for Backend<R>
impl<R, S> PostgreSqlErrorHandler for Backend<R, S>
where
R: AsyncRead + Unpin,
S: EncryptionService,
{
fn client_sender(&mut self) -> &mut Sender {
&mut self.client_sender
Expand Down
Loading