Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
af1f22e
feat: request time encryption
infiniteregrets Mar 18, 2026
0798e16
.
infiniteregrets Mar 18, 2026
f44c37b
.
infiniteregrets Mar 18, 2026
d6cb24b
.
infiniteregrets Mar 20, 2026
e11c23f
.
infiniteregrets Mar 20, 2026
17360b4
.
infiniteregrets Mar 20, 2026
553cf51
.
infiniteregrets Mar 22, 2026
080b82d
.
infiniteregrets Mar 23, 2026
cbe7ae8
.
infiniteregrets Mar 23, 2026
a065eab
.
infiniteregrets Mar 23, 2026
8f755a8
.
infiniteregrets Mar 23, 2026
94eb362
.
infiniteregrets Mar 23, 2026
9955bbb
.
infiniteregrets Mar 23, 2026
176e347
.
infiniteregrets Mar 23, 2026
e2a59a8
.
infiniteregrets Mar 23, 2026
0c40ff8
.
infiniteregrets Mar 23, 2026
8f90bff
.
infiniteregrets Mar 23, 2026
a815b31
..
infiniteregrets Mar 23, 2026
43fdc54
.
infiniteregrets Mar 23, 2026
41ed54f
.
infiniteregrets Mar 23, 2026
620ffb4
Merge branch 'main' into m/encryption
infiniteregrets Mar 24, 2026
7ebd7c5
.
infiniteregrets Mar 24, 2026
327c709
.
infiniteregrets Mar 24, 2026
4a42619
refactor(encryption): use stream_id as AAD, move encryption to handle…
infiniteregrets Mar 26, 2026
425e0e1
chore: remove backend-level encryption tests
infiniteregrets Mar 26, 2026
1dcf22b
chore: simplify encryption context
infiniteregrets Mar 26, 2026
6bad4c7
fix: restore Debug derive on AppendPermit
infiniteregrets Mar 26, 2026
edd9088
fix(cli): error when --encryption-algorithm is provided without a key
infiniteregrets Mar 26, 2026
5f917c1
,
infiniteregrets Mar 26, 2026
82dd6b4
fix(cli): add encryption support to tail command
infiniteregrets Mar 26, 2026
84eadc4
refactor(cli): remove --encryption-algorithm from read/tail commands
infiniteregrets Mar 26, 2026
80072b2
refactor(cli): enforce --encryption-algorithm requires key via clap g…
infiniteregrets Mar 26, 2026
a46841e
better
infiniteregrets Mar 27, 2026
820efcb
.
infiniteregrets Mar 27, 2026
dac2431
refactor: make encryption algorithm optional in directives
infiniteregrets Mar 27, 2026
ec37e4e
.
infiniteregrets Mar 27, 2026
4f7b9b1
chore: deduplicate resolve_encryption/resolve_decryption, rename test…
infiniteregrets Mar 27, 2026
b2390b7
Merge branch 'main' into m/encryption
infiniteregrets Mar 30, 2026
2967e10
sbcr
infiniteregrets Mar 30, 2026
ef57351
.
infiniteregrets Mar 30, 2026
961cd75
.
infiniteregrets Mar 30, 2026
36d8109
sort
infiniteregrets Mar 31, 2026
f79d655
Merge branch 'main' into m/encryption
infiniteregrets Mar 31, 2026
9fdaf22
.
infiniteregrets Mar 31, 2026
4f89d78
.
infiniteregrets Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 195 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ repository = "https://github.com/s2-streamstore/s2"
homepage = "https://s2.dev"

[workspace.dependencies]
aegis = "0.9"
aes-gcm = "0.10"
async-stream = "0.3"
async-trait = "0.1"
aws-config = "1"
Expand All @@ -31,6 +33,7 @@ enumset = "1.1"
eyre = "0.6"
flate2 = "1.1"
futures = "0.3"
hex = "0.4"
http = "1.4"
humantime = "2.3"
indexmap = "2.13"
Expand All @@ -55,6 +58,7 @@ s2-common = { path = "common", version = "0.30" }
s2-lite = { path = "lite", version = "0.29" }
s2-sdk = { path = "sdk", version = "0.26" }
schemars = "1.2"
secrecy = "0.10.3"
serde = "1.0"
serde_json = "1.0"
slatedb = "0.11.2"
Expand Down
57 changes: 56 additions & 1 deletion cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
parse_records_output_source,
},
types::{
AccessTokenMatcher, BasinConfig, BasinMatcher, Interval, Operation,
AccessTokenMatcher, BasinConfig, BasinMatcher, EncryptionAlgorithm, Interval, Operation,
PermittedOperationGroups, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri,
StorageClass, StreamConfig, StreamMatcher,
},
Expand Down Expand Up @@ -464,6 +464,55 @@ pub struct AppendArgs {
/// How long to wait for more records before flushing a batch.
#[arg(long, default_value = "5ms")]
pub linger: humantime::Duration,

#[command(flatten)]
pub encryption: EncryptionArgs,
}

#[derive(Args, Debug, Clone, Default)]
#[group(multiple = true)]
pub struct EncryptionArgs {
/// Base64-encoded 32-byte encryption key. Alternatively, set S2_ENCRYPTION_KEY env var.
#[arg(
long,
env = "S2_ENCRYPTION_KEY",
hide_env_values = true,
group = "encryption_key_source",
requires = "encryption_algorithm"
)]
pub encryption_key: Option<String>,

/// Read encryption key from file.
#[arg(
long,
conflicts_with = "encryption_key",
group = "encryption_key_source",
requires = "encryption_algorithm"
)]
pub encryption_key_file: Option<PathBuf>,

/// Encryption algorithm.
#[arg(long, value_enum, requires = "encryption_key_source")]
pub encryption_algorithm: Option<EncryptionAlgorithm>,

/// Attest client-side encryption.
#[arg(long, conflicts_with_all = ["encryption_key", "encryption_key_file", "encryption_algorithm"])]
pub encryption_attest: bool,
}

#[derive(Args, Debug, Clone, Default)]
pub struct DecryptionArgs {
/// Base64-encoded 32-byte decryption key. Alternatively, set S2_ENCRYPTION_KEY env var.
#[arg(
long = "encryption-key",
env = "S2_ENCRYPTION_KEY",
hide_env_values = true
)]
pub encryption_key: Option<String>,

/// Read decryption key from file.
#[arg(long = "encryption-key-file", conflicts_with = "encryption_key")]
pub encryption_key_file: Option<PathBuf>,
}

#[derive(Args, Debug)]
Expand Down Expand Up @@ -515,6 +564,9 @@ pub struct ReadArgs {
/// Use "-" to write to stdout.
#[arg(short = 'o', long, value_parser = parse_records_output_source, default_value = "-")]
pub output: RecordsOut,

#[command(flatten)]
pub encryption: DecryptionArgs,
}

#[derive(Args, Debug)]
Expand All @@ -539,6 +591,9 @@ pub struct TailArgs {
/// Use "-" to write to stdout.
#[arg(short = 'o', long, value_parser = parse_records_output_source, default_value = "-")]
pub output: RecordsOut,

#[command(flatten)]
pub encryption: DecryptionArgs,
}

#[derive(Args, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum CliError {
))]
OperationWithTokenSource(OpKind, #[source] S2Error, TokenSource),

#[error("Invalid encryption key: {0}")]
#[diagnostic(help("Key must be a base64-encoded 32-byte value."))]
InvalidEncryptionKey(String),

#[error("S2 Lite server error: {0}")]
#[diagnostic(help("{}", HELP))]
LiteServer(String),
Expand Down
86 changes: 82 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use record_format::{
use s2_sdk::{
S2,
types::{
AppendRetryPolicy, CreateStreamInput, DeleteOnEmptyConfig, DeleteStreamInput, MeteredBytes,
Metric, RetentionPolicy, RetryConfig, StreamConfig as SdkStreamConfig, StreamName,
TimestampingConfig, TimestampingMode,
AppendRetryPolicy, CreateStreamInput, DeleteOnEmptyConfig, DeleteStreamInput,
EncryptionConfig, MeteredBytes, Metric, RetentionPolicy, RetryConfig,
StreamConfig as SdkStreamConfig, StreamName, TimestampingConfig, TimestampingMode,
},
};
use strum::VariantNames;
Expand Down Expand Up @@ -151,7 +151,10 @@ async fn run() -> Result<(), CliError> {
}

let cli_config = load_cli_config()?;
let sdk_config = sdk_config(&cli_config)?;
let mut sdk_config = sdk_config(&cli_config)?;
if let Some(enc) = resolve_command_encryption(&command)? {
sdk_config = sdk_config.with_encryption(enc);
}
let s2 = S2::new(sdk_config.clone()).map_err(CliError::SdkInit)?;
let token_source = access_token_source(&cli_config);
let result: Result<(), CliError> = (async {
Expand Down Expand Up @@ -796,3 +799,78 @@ fn print_metrics(metrics: &[Metric]) {
}
}
}

fn resolve_command_encryption(command: &Command) -> Result<Option<EncryptionConfig>, CliError> {
match command {
Command::Append(a) => resolve_encryption(&a.encryption),
Command::Read(a) => resolve_decryption(&a.encryption),
Command::Tail(a) => resolve_decryption(&a.encryption),
_ => Ok(None),
}
}

fn resolve_key(
key: &Option<String>,
key_file: &Option<std::path::PathBuf>,
) -> Result<Option<String>, CliError> {
match (key, key_file) {
(Some(k), _) => Ok(Some(k.clone())),
(_, Some(path)) => {
let contents = std::fs::read_to_string(path).map_err(|e| {
CliError::InvalidEncryptionKey(format!("cannot read key file: {e}"))
})?;
Ok(Some(
contents.lines().next().unwrap_or("").trim().to_owned(),
))
}
_ => Ok(None),
}
}

fn validate_base64_key(key: &str) -> Result<(), CliError> {
use base64ct::{Base64, Encoding};
let bytes = Base64::decode_vec(key)
.map_err(|_| CliError::InvalidEncryptionKey("key is not valid base64".to_owned()))?;
if bytes.len() != 32 {
return Err(CliError::InvalidEncryptionKey(format!(
"key must be exactly 32 bytes, got {} bytes",
bytes.len()
)));
}
Ok(())
}

fn resolve_encryption(args: &cli::EncryptionArgs) -> Result<Option<EncryptionConfig>, CliError> {
resolve_encryption_config(
args.encryption_attest,
&args.encryption_key,
&args.encryption_key_file,
args.encryption_algorithm.map(Into::into),
)
}

fn resolve_decryption(args: &cli::DecryptionArgs) -> Result<Option<EncryptionConfig>, CliError> {
resolve_encryption_config(false, &args.encryption_key, &args.encryption_key_file, None)
}

fn resolve_encryption_config(
attest: bool,
key: &Option<String>,
key_file: &Option<std::path::PathBuf>,
alg: Option<s2_sdk::types::EncryptionAlgorithm>,
) -> Result<Option<EncryptionConfig>, CliError> {
if attest {
return Ok(Some(EncryptionConfig::Attest));
}

let Some(key) = resolve_key(key, key_file)? else {
return Ok(None);
};

validate_base64_key(&key)?;

Ok(Some(EncryptionConfig::Key {
alg,
key: key.into(),
}))
}
3 changes: 3 additions & 0 deletions cli/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4607,6 +4607,7 @@ impl App {
until: None,
format: RecordFormat::default(),
output: RecordsOut::Stdout,
encryption: Default::default(),
};

match ops::read(&s2, &args).await {
Expand Down Expand Up @@ -4679,6 +4680,7 @@ impl App {
until: None,
format: RecordFormat::default(),
output: RecordsOut::Stdout,
encryption: Default::default(),
};

match ops::read(&s2, &args).await {
Expand Down Expand Up @@ -4842,6 +4844,7 @@ impl App {
until,
format: record_format,
output: output.clone(),
encryption: Default::default(),
};

// Open file writer if output file is specified
Expand Down
27 changes: 27 additions & 0 deletions cli/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,33 @@ impl StreamConfig {
}
}

#[derive(ValueEnum, Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum EncryptionAlgorithm {
#[value(name = "aegis-256")]
Aegis256,
#[value(name = "aes-256-gcm")]
Aes256Gcm,
}

impl From<EncryptionAlgorithm> for sdk::types::EncryptionAlgorithm {
fn from(alg: EncryptionAlgorithm) -> Self {
match alg {
EncryptionAlgorithm::Aegis256 => Self::Aegis256,
EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
}
}
}

impl From<sdk::types::EncryptionAlgorithm> for EncryptionAlgorithm {
fn from(alg: sdk::types::EncryptionAlgorithm) -> Self {
match alg {
sdk::types::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
sdk::types::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
}
}
}

#[derive(ValueEnum, Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum StorageClass {
Expand Down
5 changes: 5 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ rkyv = ["dep:rkyv", "compact_str/rkyv"]
utoipa = ["dep:utoipa"]

[dependencies]
aegis = { workspace = true }
aes-gcm = { workspace = true }
axum = { workspace = true, optional = true }
base64ct = { workspace = true, features = ["alloc"] }
blake3 = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, optional = true, features = ["derive"] }
compact_str = { workspace = true, features = ["serde"] }
enum-ordinalize = { workspace = true }
enumset = { workspace = true }
http = { workspace = true }
rand = { workspace = true }
rkyv = { workspace = true, optional = true }
secrecy = { workspace = true }
serde = { workspace = true, features = ["derive"] }
strum = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
Expand Down
Loading
Loading