Skip to content

Commit 505f8c4

Browse files
authored
feat: Cache for Storage Secrets (lakekeeper#1485)
1 parent d5db102 commit 505f8c4

17 files changed

Lines changed: 346 additions & 107 deletions

File tree

crates/lakekeeper/src/api/management/v1/warehouse/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ pub trait Service<C: CatalogStore, A: Authorizer, S: SecretStore> {
378378
context
379379
.v1_state
380380
.secrets
381-
.create_secret(storage_credential)
381+
.create_storage_secret(storage_credential)
382382
.await?,
383383
)
384384
} else {
@@ -832,7 +832,7 @@ pub trait Service<C: CatalogStore, A: Authorizer, S: SecretStore> {
832832
context
833833
.v1_state
834834
.secrets
835-
.create_secret(storage_credential)
835+
.create_storage_secret(storage_credential)
836836
.await?,
837837
)
838838
} else {
@@ -920,7 +920,7 @@ pub trait Service<C: CatalogStore, A: Authorizer, S: SecretStore> {
920920
context
921921
.v1_state
922922
.secrets
923-
.create_secret(new_storage_credential)
923+
.create_storage_secret(new_storage_credential)
924924
.await?,
925925
)
926926
} else {

crates/lakekeeper/src/config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ pub(crate) struct Cache {
436436
pub(crate) warehouse: WarehouseCache,
437437
/// Namespace cache configuration.
438438
pub(crate) namespace: NamespaceCache,
439+
/// Secrets cache configuration.
440+
pub(crate) secrets: SecretsCache,
439441
}
440442

441443
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -492,6 +494,25 @@ impl std::default::Default for NamespaceCache {
492494
}
493495
}
494496

497+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
498+
#[serde(default)]
499+
pub(crate) struct SecretsCache {
500+
pub(crate) enabled: bool,
501+
pub(crate) capacity: u64,
502+
/// Time-to-live for cache entries in seconds. Defaults to 60 seconds.
503+
pub(crate) time_to_live_secs: u64,
504+
}
505+
506+
impl std::default::Default for SecretsCache {
507+
fn default() -> Self {
508+
Self {
509+
enabled: true,
510+
capacity: 500,
511+
time_to_live_secs: 600,
512+
}
513+
}
514+
}
515+
495516
impl Default for DynAppConfig {
496517
fn default() -> Self {
497518
Self {

crates/lakekeeper/src/implementations/kv2.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use iceberg_ext::catalog::rest::IcebergErrorResponse;
66
use serde::{de::DeserializeOwned, Serialize};
77
use tokio::{sync::RwLock, time::Sleep};
88
use uuid::Uuid;
9-
use vaultrs::client::{Client, VaultClient};
9+
use vaultrs::{
10+
client::{Client, VaultClient},
11+
error::ClientError,
12+
};
1013
use vaultrs_login::{engines::userpass::UserpassLogin, LoginMethod};
1114

1215
use crate::{
@@ -21,27 +24,35 @@ use crate::{
2124
#[async_trait::async_trait]
2225
impl SecretStore for SecretsState {
2326
/// Get the secret for a given warehouse.
24-
async fn get_secret_by_id<S: DeserializeOwned>(
27+
async fn get_secret_by_id_impl<S: DeserializeOwned>(
2528
&self,
2629
secret_id: SecretId,
27-
) -> Result<Secret<S>> {
30+
) -> Result<Option<Secret<S>>> {
2831
// it seems there is no atomic get for metadata and secret so we read_metadata, and then
2932
// read the secret with the current version defined in the previously read metadata
3033
let metadata = vaultrs::kv2::read_metadata(
3134
&*self.vault_client.read().await,
3235
self.secret_mount.as_str(),
3336
&secret_ident_to_key(secret_id),
3437
)
35-
.await
36-
.map_err(|err| {
37-
IcebergErrorResponse::from(ErrorModel::internal(
38-
"secret metadata read failure",
39-
"SecretReadFailed",
40-
Some(Box::new(err)),
41-
))
42-
})?;
38+
.await;
39+
40+
let metadata = match metadata {
41+
Ok(meta) => meta,
42+
Err(err) => {
43+
if matches!(&err, ClientError::APIError { code: 404, .. }) {
44+
return Ok(None);
45+
}
4346

44-
Ok(Secret {
47+
return Err(IcebergErrorResponse::from(ErrorModel::internal(
48+
"secret metadata read failure",
49+
"SecretReadFailed",
50+
Some(Box::new(err)),
51+
)));
52+
}
53+
};
54+
55+
Ok(Some(Secret {
4556
secret_id,
4657
secret: vaultrs::kv2::read_version::<S>(
4758
&*self.vault_client.read().await,
@@ -71,11 +82,11 @@ impl SecretStore for SecretsState {
7182
Some(Box::new(err)),
7283
))
7384
})?),
74-
})
85+
}))
7586
}
7687

7788
/// Create a new secret
78-
async fn create_secret<S: Send + Sync + Serialize + std::fmt::Debug>(
89+
async fn create_secret_impl<S: Send + Sync + Serialize + std::fmt::Debug>(
7990
&self,
8091
secret: S,
8192
) -> Result<SecretId> {
@@ -98,7 +109,7 @@ impl SecretStore for SecretsState {
98109
}
99110

100111
/// Delete a secret
101-
async fn delete_secret(&self, secret_id: &SecretId) -> Result<()> {
112+
async fn delete_secret_impl(&self, secret_id: &SecretId) -> Result<()> {
102113
Ok(vaultrs::kv2::delete_metadata(
103114
&*self.vault_client.read().await,
104115
self.secret_mount.as_str(),
@@ -274,14 +285,26 @@ mod tests {
274285
})
275286
.into();
276287

277-
let secret_id = state.create_secret(secret.clone()).await.unwrap();
288+
let secret_id = state.create_storage_secret(secret.clone()).await.unwrap();
278289

279-
let read_secret = state
280-
.get_secret_by_id::<StorageCredential>(secret_id)
290+
let read_secret = state.require_storage_secret_by_id(secret_id).await.unwrap();
291+
292+
assert_eq!(&*read_secret.secret, &secret);
293+
}
294+
295+
#[tokio::test]
296+
async fn test_read_missing_secret() {
297+
let state = SecretsState::from_config(CONFIG.kv2.as_ref().unwrap())
281298
.await
282299
.unwrap();
283300

284-
assert_eq!(read_secret.secret, secret);
301+
let secret_id = SecretId::from(Uuid::new_v4());
302+
303+
let read_secret = state
304+
.get_secret_by_id_impl::<StorageCredential>(secret_id)
305+
.await;
306+
307+
assert!(read_secret.unwrap().is_none());
285308
}
286309

287310
#[tokio::test]
@@ -298,13 +321,13 @@ mod tests {
298321
.into();
299322

300323
let secret_id = state
301-
.create_secret(secret.clone())
324+
.create_storage_secret(secret.clone())
302325
.await
303326
.expect("create secret failed");
304327

305328
state.delete_secret(&secret_id).await.unwrap();
306329

307-
let read_secret = state.get_secret_by_id::<StorageCredential>(secret_id).await;
330+
let read_secret = state.require_storage_secret_by_id(secret_id).await;
308331

309332
assert!(read_secret.is_err());
310333
}

crates/lakekeeper/src/implementations/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,35 +74,35 @@ pub enum Secrets {
7474

7575
#[async_trait]
7676
impl SecretStore for Secrets {
77-
async fn get_secret_by_id<S: SecretInStorage + serde::de::DeserializeOwned>(
77+
async fn get_secret_by_id_impl<S: SecretInStorage + serde::de::DeserializeOwned>(
7878
&self,
7979
secret_id: SecretId,
80-
) -> crate::api::Result<Secret<S>> {
80+
) -> crate::api::Result<Option<Secret<S>>> {
8181
match self {
8282
#[cfg(feature = "sqlx-postgres")]
83-
Self::Postgres(state) => state.get_secret_by_id(secret_id).await,
84-
Self::KV2(state) => state.get_secret_by_id(secret_id).await,
83+
Self::Postgres(state) => state.get_secret_by_id_impl(secret_id).await,
84+
Self::KV2(state) => state.get_secret_by_id_impl(secret_id).await,
8585
}
8686
}
8787

88-
async fn create_secret<
88+
async fn create_secret_impl<
8989
S: SecretInStorage + Send + Sync + serde::Serialize + std::fmt::Debug,
9090
>(
9191
&self,
9292
secret: S,
9393
) -> crate::api::Result<SecretId> {
9494
match self {
9595
#[cfg(feature = "sqlx-postgres")]
96-
Self::Postgres(state) => state.create_secret(secret).await,
97-
Self::KV2(state) => state.create_secret(secret).await,
96+
Self::Postgres(state) => state.create_secret_impl(secret).await,
97+
Self::KV2(state) => state.create_secret_impl(secret).await,
9898
}
9999
}
100100

101-
async fn delete_secret(&self, secret_id: &SecretId) -> crate::api::Result<()> {
101+
async fn delete_secret_impl(&self, secret_id: &SecretId) -> crate::api::Result<()> {
102102
match self {
103103
#[cfg(feature = "sqlx-postgres")]
104-
Self::Postgres(state) => state.delete_secret(secret_id).await,
105-
Self::KV2(state) => state.delete_secret(secret_id).await,
104+
Self::Postgres(state) => state.delete_secret_impl(secret_id).await,
105+
Self::KV2(state) => state.delete_secret_impl(secret_id).await,
106106
}
107107
}
108108
}

crates/lakekeeper/src/implementations/postgres/secrets.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,17 @@ impl SecretsState {
5151
#[async_trait::async_trait]
5252
impl SecretStore for SecretsState {
5353
/// Get the secret for a given warehouse.
54-
async fn get_secret_by_id<S: for<'de> Deserialize<'de>>(
54+
async fn get_secret_by_id_impl<S: for<'de> Deserialize<'de>>(
5555
&self,
5656
secret_id: SecretId,
57-
) -> Result<Secret<S>> {
57+
) -> Result<Option<Secret<S>>> {
5858
struct SecretRow {
5959
secret: Option<String>,
6060
created_at: chrono::DateTime<chrono::Utc>,
6161
updated_at: Option<chrono::DateTime<chrono::Utc>>,
6262
}
6363

64-
let secret: SecretRow = sqlx::query_as!(
64+
let secret = sqlx::query_as!(
6565
SecretRow,
6666
r#"
6767
SELECT
@@ -74,25 +74,24 @@ impl SecretStore for SecretsState {
7474
secret_id.as_uuid(),
7575
CONFIG.pg_encryption_key
7676
)
77-
.fetch_one(&self.read_write.read_pool)
77+
.fetch_optional(&self.read_write.read_pool)
7878
.await
79-
.map_err(|e| match e {
80-
sqlx::Error::RowNotFound => ErrorModel::builder()
81-
.code(StatusCode::NOT_FOUND.into())
82-
.message("Secret not found".to_string())
83-
.r#type("SecretNotFound".to_string())
84-
.stack(vec![format!("secret_id: {}", secret_id), e.to_string()])
85-
.build(),
86-
_ => ErrorModel::builder()
79+
.map_err(|e| {
80+
ErrorModel::builder()
8781
.code(StatusCode::INTERNAL_SERVER_ERROR.into())
8882
.message("Error fetching secret".to_string())
8983
.r#type("SecretFetchError".to_string())
9084
.stack(vec![format!("secret_id: {}", secret_id), e.to_string()])
91-
.build(),
85+
.build()
9286
})?;
9387

88+
let Some(secret) = secret else {
89+
return Ok(None);
90+
};
91+
9492
let inner =
95-
serde_json::from_str(&secret.secret.unwrap_or("{}".to_string())).map_err(|_e| {
93+
serde_json::from_str(&secret.secret.unwrap_or("{}".to_string())).map_err(|e| {
94+
tracing::error!("Error parsing secret id {}: {}", secret_id, e);
9695
ErrorModel::builder()
9796
.code(StatusCode::INTERNAL_SERVER_ERROR.into())
9897
.message("Error parsing secret".to_string())
@@ -102,16 +101,16 @@ impl SecretStore for SecretsState {
102101
.build()
103102
})?;
104103

105-
Ok(Secret {
104+
Ok(Some(Secret {
106105
secret_id,
107106
secret: inner,
108107
created_at: secret.created_at,
109108
updated_at: secret.updated_at,
110-
})
109+
}))
111110
}
112111

113112
/// Create a new secret
114-
async fn create_secret<S: Send + Sync + Serialize + std::fmt::Debug>(
113+
async fn create_secret_impl<S: Send + Sync + Serialize + std::fmt::Debug>(
115114
&self,
116115
secret: S,
117116
) -> Result<SecretId> {
@@ -149,7 +148,7 @@ impl SecretStore for SecretsState {
149148
}
150149

151150
/// Delete a secret
152-
async fn delete_secret(&self, secret_id: &SecretId) -> Result<()> {
151+
async fn delete_secret_impl(&self, secret_id: &SecretId) -> Result<()> {
153152
sqlx::query!(
154153
r#"
155154
DELETE FROM secret
@@ -189,14 +188,21 @@ mod tests {
189188
})
190189
.into();
191190

192-
let secret_id = state.create_secret(secret.clone()).await.unwrap();
191+
let secret_id = state.create_storage_secret(secret.clone()).await.unwrap();
193192

194-
let read_secret = state
195-
.get_secret_by_id::<StorageCredential>(secret_id)
196-
.await
197-
.unwrap();
193+
let read_secret = state.require_storage_secret_by_id(secret_id).await.unwrap();
194+
195+
assert_eq!(&*read_secret.secret, &secret);
196+
}
198197

199-
assert_eq!(read_secret.secret, secret);
198+
#[sqlx::test]
199+
async fn test_read_missing_secret(pool: sqlx::PgPool) {
200+
let state = SecretsState::from_pools(pool.clone(), pool);
201+
let missing_secret_id = SecretId::from(uuid::Uuid::new_v4());
202+
let read_secret = state
203+
.get_secret_by_id_impl::<StorageCredential>(missing_secret_id)
204+
.await;
205+
assert!(read_secret.unwrap().is_none());
200206
}
201207

202208
#[sqlx::test]
@@ -210,11 +216,11 @@ mod tests {
210216
})
211217
.into();
212218

213-
let secret_id = state.create_secret(secret.clone()).await.unwrap();
219+
let secret_id = state.create_storage_secret(secret.clone()).await.unwrap();
214220

215221
state.delete_secret(&secret_id).await.unwrap();
216222

217-
let read_secret = state.get_secret_by_id::<StorageCredential>(secret_id).await;
223+
let read_secret = state.require_storage_secret_by_id(secret_id).await;
218224

219225
assert!(read_secret.is_err());
220226
}

crates/lakekeeper/src/server/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ fn require_warehouse_id(prefix: Option<&Prefix>) -> std::result::Result<Warehous
108108
pub(crate) async fn maybe_get_secret<S: SecretStore>(
109109
secret: Option<crate::SecretId>,
110110
state: &S,
111-
) -> Result<Option<StorageCredential>, IcebergErrorResponse> {
111+
) -> Result<Option<Arc<StorageCredential>>, IcebergErrorResponse> {
112112
if let Some(secret_id) = secret {
113-
Ok(Some(state.get_secret_by_id(secret_id).await?.secret))
113+
Ok(Some(
114+
state.require_storage_secret_by_id(secret_id).await?.secret,
115+
))
114116
} else {
115117
Ok(None)
116118
}

crates/lakekeeper/src/server/s3_signer/sign.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use crate::{
2222
},
2323
secrets::SecretStore,
2424
storage::{
25-
s3::S3UrlStyleDetectionMode, S3Credential, S3Profile, StorageCredential,
26-
StorageProfile, ValidationError,
25+
s3::S3UrlStyleDetectionMode, S3Credential, S3Profile, StorageProfile, ValidationError,
2726
},
2827
AuthZTableInfo, CatalogNamespaceOps, CatalogStore, CatalogTabularOps, CatalogWarehouseOps,
2928
GetTabularInfoByLocationError, ResolvedWarehouse, State, TableId, TableInfo,
@@ -214,7 +213,7 @@ impl<C: CatalogStore, A: Authorizer + Clone, S: SecretStore>
214213
state
215214
.v1_state
216215
.secrets
217-
.get_secret_by_id::<StorageCredential>(storage_secret_id)
216+
.require_storage_secret_by_id(storage_secret_id)
218217
.await?
219218
.secret,
220219
)

0 commit comments

Comments
 (0)