From 0e08fafee22e03afd57eae637d60fe24d80d8859 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Fri, 1 Nov 2024 09:02:27 +0100 Subject: [PATCH 01/25] first (hacky) cut --- src/snowflake/client.rs | 32 +++++++-- src/snowflake/mod.rs | 144 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 164 insertions(+), 12 deletions(-) diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 7c470d6..79b91de 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -75,9 +75,15 @@ pub(crate) struct SnowflakeQueryData { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub(crate) struct SnowflakeStageCreds { - pub aws_key_id: String, - pub aws_secret_key: String, - pub aws_token: String, + // TODO: make this an enum + + // AWS + pub aws_key_id: Option, + pub aws_secret_key: Option, + pub aws_token: Option, + + // Azure + pub azure_sas_token: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -118,6 +124,7 @@ pub(crate) enum NormalizedStageInfo { storage_account: String, container: String, prefix: String, + azure_sas_token: String, #[serde(skip_serializing_if = "Option::is_none")] end_point: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -136,14 +143,25 @@ impl TryFrom<&SnowflakeStageInfo> for NormalizedStageInfo { bucket: bucket.to_string(), prefix: prefix.to_string(), region: value.region.clone(), - aws_key_id: value.creds.aws_key_id.clone(), - aws_secret_key: value.creds.aws_secret_key.clone(), - aws_token: value.creds.aws_token.clone(), + aws_key_id: value.creds.aws_key_id.clone().expect("AWS key ID is missing"), + aws_secret_key: value.creds.aws_secret_key.clone().expect("AWS secret key is missing"), + aws_token: value.creds.aws_token.clone().expect("AWS token is missing"), + end_point: value.end_point.clone(), + test_endpoint: value.test_endpoint.clone() + }) + } else if value.location_type == "AZURE" { + let (storage_account, container) = value.location.split_once('/') + .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; + return Ok(NormalizedStageInfo::BlobStorage { + storage_account: storage_account.to_string(), + container: container.to_string(), + prefix: value.path.clone(), + azure_sas_token: value.creds.azure_sas_token.clone().expect("Azure SAS token is missing"), end_point: value.end_point.clone(), test_endpoint: value.test_endpoint.clone() }) } else { - return Err(Error::not_implemented("Azure BlobStorage is not implemented")); + return Err(Error::not_implemented("Unknown location type: {value.location_type}")); } } } diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 66e6c8c..7912353 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -66,22 +66,26 @@ impl object_store::CredentialProvider for S3StageCredentialProvider { let mut locked = self.cached.lock().await; + let info_key_id = info.stage_info.creds.aws_key_id.clone().expect("aws_key_id is missing from stage info"); + let info_token = info.stage_info.creds.aws_token.clone().expect("aws_token is missing from stage info"); + let info_secret_key = info.stage_info.creds.aws_secret_key.clone().expect("aws_secret_key is missing from stage info"); + match locked.as_ref() { - Some(creds) => if creds.key_id == info.stage_info.creds.aws_key_id { + Some(creds) => if creds.key_id == info_key_id { return Ok(Arc::clone(creds)); } _ => {} } // The session token is empty when testing against minio - let token = match info.stage_info.creds.aws_token.trim() { + let token = match info_token.as_str().trim() { "" => None, token => Some(token.to_string()) }; let creds = Arc::new(object_store::aws::AwsCredential { - key_id: info.stage_info.creds.aws_key_id.clone(), - secret_key: info.stage_info.creds.aws_secret_key.clone(), + key_id: info_key_id, + secret_key: info_secret_key, token }); @@ -92,6 +96,88 @@ impl object_store::CredentialProvider for S3StageCredentialProvider { } +#[derive(Debug)] +pub(crate) struct SnowflakeAzureBlobExtension { + stage: String, + client: Arc, +} + +#[async_trait::async_trait] +impl Extension for SnowflakeAzureBlobExtension { + fn as_any(&self) -> &dyn std::any::Any { + self + } + async fn current_stage_info(&self) -> crate::Result { + let stage_info = &self + .client + .current_upload_info(&self.stage) + .await? + .stage_info; + let stage_info: NormalizedStageInfo = stage_info.try_into()?; + let string = serde_json::to_string(&stage_info) + .context("failed to encode stage_info as json").to_err()?; + Ok(string) + } +} + +#[derive(Debug)] +pub(crate) struct AzureStageCredentialProvider { + stage: String, + client: Arc, + cached: Mutex>> +} + +impl AzureStageCredentialProvider { + pub(crate) fn new(stage: impl AsRef, client: Arc) -> AzureStageCredentialProvider { + AzureStageCredentialProvider { stage: stage.as_ref().to_string(), client, cached: Mutex::new(None) } + } +} + +#[async_trait::async_trait] +impl object_store::CredentialProvider for AzureStageCredentialProvider { + type Credential = object_store::azure::AzureCredential; + async fn get_credential(&self) -> object_store::Result> { + let info = self.client.current_upload_info(&self.stage).await + .map_err(|e| { + object_store::Error::Generic { + store: "Azure", + source: e.into() + } + })?; + + let mut locked = self.cached.lock().await; + + // TODO: figure out what the caching is about + // match locked.as_ref() { + // Some(creds) => if creds.account_name == info.stage_info.creds.azure_account_name { + // return Ok(Arc::clone(creds)); + // } + // _ => {} + // } + + // TODO: minio handling + + let sas_token = info.stage_info.creds.azure_sas_token.as_ref().ok_or_else(|| { + object_store::Error::Generic { + store: "Azure", + source: "Azure SAS token is missing from stage info".into(), + } + })?; + + let pairs = url::form_urlencoded::parse(sas_token.trim_start_matches('?').as_bytes()) + .into_owned() + .collect(); + + let creds = Arc::new(object_store::azure::AzureCredential::SASToken( + pairs, + )); + + *locked = Some(Arc::clone(&creds)); + + Ok(creds) + } +} + #[repr(C)] pub struct StageInfoResponse { result: CResult, @@ -303,8 +389,56 @@ pub(crate) async fn build_store_for_snowflake_stage( Ok((Arc::new(store), crypto_material_provider, stage_prefix.to_string(), extension)) } + "AZURE" => { + let (container, stage_prefix) = info.stage_info.location.split_once('/') + .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the container name"))?; + + + let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); + + // TODO: clean this up + let url = &format!( + "https://{}.{}/{}/{}", + info.stage_info.storage_account.as_ref().unwrap(), + info.stage_info.end_point.as_ref().unwrap(), + container, + stage_prefix, + ); + + // TODO: support test endpoint + + let mut builder = object_store::azure::MicrosoftAzureBuilder::default() + .with_url(url) + .with_credentials(Arc::new(provider)) + .with_retry(retry_config); + + for (key, value) in config_map { + builder = builder.with_config(key.parse()?, value); + } + + let store = builder.build()?; + + if config.kms_config.is_some() && !info.stage_info.is_client_side_encrypted { + return Err(ErrorKind::StorageNotEncrypted(config.stage.clone()).into()); + } + + let crypto_material_provider = if info.stage_info.is_client_side_encrypted { + let kms_config = config.kms_config.unwrap_or_default(); + let stage_kms = SnowflakeStageKms::new(client.clone(), &config.stage, stage_prefix, kms_config); + Some::>(Arc::new(stage_kms)) + } else { + None + }; + + let extension = Arc::new(SnowflakeAzureBlobExtension { + stage: config.stage.clone(), + client + }); + + Ok((Arc::new(store), crypto_material_provider, stage_prefix.to_string(), extension)) + } _ => { - unimplemented!("unknown stage location type"); + unimplemented!("unknown stage location type: {}", info.stage_info.location_type); } } } From 6cd0aeb43e0ac42d499fe7499bbc996c31f39b30 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Fri, 1 Nov 2024 09:47:16 +0100 Subject: [PATCH 02/25] clean up a bit based on Andre's variant --- src/snowflake/client.rs | 61 +++++++++++++++------ src/snowflake/mod.rs | 115 ++++++++++++++++++++++++---------------- 2 files changed, 114 insertions(+), 62 deletions(-) diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 79b91de..420fec5 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -74,16 +74,40 @@ pub(crate) struct SnowflakeQueryData { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub(crate) struct SnowflakeStageCreds { - // TODO: make this an enum +pub(crate) struct SnowflakeStageAwsCreds { + pub aws_key_id: String, + pub aws_secret_key: String, + pub aws_token: String, +} - // AWS - pub aws_key_id: Option, - pub aws_secret_key: Option, - pub aws_token: Option, +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub(crate) struct SnowflakeStageAzureCreds { + pub azure_sas_token: String, +} - // Azure - pub azure_sas_token: Option, +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub(crate) enum SnowflakeStageCreds { + Aws(SnowflakeStageAwsCreds), + Azure(SnowflakeStageAzureCreds), +} + +impl SnowflakeStageCreds { + pub(crate) fn as_aws(&self) -> crate::Result<&SnowflakeStageAwsCreds> { + match self { + SnowflakeStageCreds::Aws(creds) => Ok(creds), + SnowflakeStageCreds::Azure(_) => Err(Error::invalid_response("Expected AWS credentials, but got Azure ones")), + } + } + + pub(crate) fn as_azure(&self) -> crate::Result<&SnowflakeStageAzureCreds> { + match self { + SnowflakeStageCreds::Azure(creds) => Ok(creds), + SnowflakeStageCreds::Aws(_) => Err(Error::invalid_response("Expected Azure credentials, but got AWS ones")), + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -139,29 +163,34 @@ impl TryFrom<&SnowflakeStageInfo> for NormalizedStageInfo { if value.location_type == "S3" { let (bucket, prefix) = value.location.split_once('/') .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the bucket name"))?; + let creds = value.creds.as_aws()?; return Ok(NormalizedStageInfo::S3 { bucket: bucket.to_string(), prefix: prefix.to_string(), region: value.region.clone(), - aws_key_id: value.creds.aws_key_id.clone().expect("AWS key ID is missing"), - aws_secret_key: value.creds.aws_secret_key.clone().expect("AWS secret key is missing"), - aws_token: value.creds.aws_token.clone().expect("AWS token is missing"), + aws_key_id: creds.aws_key_id.clone(), + aws_secret_key: creds.aws_secret_key.clone(), + aws_token: creds.aws_token.clone(), end_point: value.end_point.clone(), test_endpoint: value.test_endpoint.clone() }) } else if value.location_type == "AZURE" { - let (storage_account, container) = value.location.split_once('/') + let (container, prefix) = value.location.split_once('/') + .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the container name"))?; + let creds = value.creds.as_azure()?; + let storage_account = value.storage_account + .clone() .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; return Ok(NormalizedStageInfo::BlobStorage { - storage_account: storage_account.to_string(), + storage_account: storage_account, container: container.to_string(), - prefix: value.path.clone(), - azure_sas_token: value.creds.azure_sas_token.clone().expect("Azure SAS token is missing"), + prefix: prefix.to_string(), + azure_sas_token: creds.azure_sas_token.clone(), end_point: value.end_point.clone(), test_endpoint: value.test_endpoint.clone() }) } else { - return Err(Error::not_implemented("Unknown location type: {value.location_type}")); + return Err(Error::not_implemented(format!("Location type {} is not implemented", value.location_type))); } } } diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 7912353..c11e86d 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -8,7 +8,7 @@ use client::{NormalizedStageInfo, SnowflakeClient, SnowflakeClientConfig}; pub(crate) mod kms; use kms::{SnowflakeStageKms, SnowflakeStageKmsConfig}; -use object_store::{RetryConfig, ObjectStore}; +use object_store::{azure::AzureCredential, RetryConfig, ObjectStore}; use tokio::sync::Mutex; use std::sync::Arc; @@ -64,28 +64,37 @@ impl object_store::CredentialProvider for S3StageCredentialProvider { } })?; - let mut locked = self.cached.lock().await; + if info.stage_info.location_type != "S3" { + return Err(object_store::Error::Generic { + store: "S3", + source: Error::invalid_response("Location type must be S3 for this provider").into() + }) + } - let info_key_id = info.stage_info.creds.aws_key_id.clone().expect("aws_key_id is missing from stage info"); - let info_token = info.stage_info.creds.aws_token.clone().expect("aws_token is missing from stage info"); - let info_secret_key = info.stage_info.creds.aws_secret_key.clone().expect("aws_secret_key is missing from stage info"); + let new_creds = info.stage_info.creds.as_aws() + .map_err(|e| object_store::Error::Generic { + store: "S3", + source: e.into() + })?; + + let mut locked = self.cached.lock().await; match locked.as_ref() { - Some(creds) => if creds.key_id == info_key_id { + Some(creds) => if creds.key_id == new_creds.aws_key_id { return Ok(Arc::clone(creds)); } _ => {} } // The session token is empty when testing against minio - let token = match info_token.as_str().trim() { + let token = match new_creds.aws_token.trim() { "" => None, token => Some(token.to_string()) }; let creds = Arc::new(object_store::aws::AwsCredential { - key_id: info_key_id, - secret_key: info_secret_key, + key_id: new_creds.aws_key_id.clone(), + secret_key: new_creds.aws_secret_key.clone(), token }); @@ -97,13 +106,13 @@ impl object_store::CredentialProvider for S3StageCredentialProvider { #[derive(Debug)] -pub(crate) struct SnowflakeAzureBlobExtension { +pub(crate) struct SnowflakeAzureExtension { stage: String, client: Arc, } #[async_trait::async_trait] -impl Extension for SnowflakeAzureBlobExtension { +impl Extension for SnowflakeAzureExtension { fn as_any(&self) -> &dyn std::any::Any { self } @@ -140,37 +149,44 @@ impl object_store::CredentialProvider for AzureStageCredentialProvider { let info = self.client.current_upload_info(&self.stage).await .map_err(|e| { object_store::Error::Generic { - store: "Azure", + store: "MicrosoftAzure", source: e.into() } })?; - - let mut locked = self.cached.lock().await; - - // TODO: figure out what the caching is about - // match locked.as_ref() { - // Some(creds) => if creds.account_name == info.stage_info.creds.azure_account_name { - // return Ok(Arc::clone(creds)); - // } - // _ => {} - // } - // TODO: minio handling + if info.stage_info.location_type != "AZURE" { + return Err(object_store::Error::Generic { + store: "MicrosoftAzure", + source: Error::invalid_response("Location type must be AZURE for this provider").into() + }) + } - let sas_token = info.stage_info.creds.azure_sas_token.as_ref().ok_or_else(|| { - object_store::Error::Generic { - store: "Azure", - source: "Azure SAS token is missing from stage info".into(), - } - })?; + let new_creds = info.stage_info.creds.as_azure() + .map_err(|e| object_store::Error::Generic { + store: "MicrosoftAzure", + source: e.into() + })?; + - let pairs = url::form_urlencoded::parse(sas_token.trim_start_matches('?').as_bytes()) + // TODO: understand differences to Andre's method + let new_pairs = url::form_urlencoded::parse(new_creds.azure_sas_token.trim_start_matches('?').as_bytes()) .into_owned() .collect(); - let creds = Arc::new(object_store::azure::AzureCredential::SASToken( - pairs, - )); + + let mut locked = self.cached.lock().await; + + // TODO: understand what the caching here does + match locked.as_ref() { + Some(creds) => { + if matches!(creds.as_ref(), AzureCredential::SASToken(pairs) if *pairs == new_pairs) { + return Ok(Arc::clone(creds)); + } + } + _ => {} + } + + let creds = Arc::new(AzureCredential::SASToken(new_pairs)); *locked = Some(Arc::clone(&creds)); @@ -392,23 +408,23 @@ pub(crate) async fn build_store_for_snowflake_stage( "AZURE" => { let (container, stage_prefix) = info.stage_info.location.split_once('/') .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the container name"))?; + let storage_account = info.stage_info.storage_account + .clone() + .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); - // TODO: clean this up - let url = &format!( - "https://{}.{}/{}/{}", - info.stage_info.storage_account.as_ref().unwrap(), - info.stage_info.end_point.as_ref().unwrap(), - container, - stage_prefix, - ); - - // TODO: support test endpoint + match info.stage_info.test_endpoint.as_deref() { + Some(_) => { + unimplemented!("test endpoint for azure blob storage is not supported"); + } + None => {} + } let mut builder = object_store::azure::MicrosoftAzureBuilder::default() - .with_url(url) + .with_account(storage_account) + .with_container_name(container) .with_credentials(Arc::new(provider)) .with_retry(retry_config); @@ -424,17 +440,24 @@ pub(crate) async fn build_store_for_snowflake_stage( let crypto_material_provider = if info.stage_info.is_client_side_encrypted { let kms_config = config.kms_config.unwrap_or_default(); - let stage_kms = SnowflakeStageKms::new(client.clone(), &config.stage, stage_prefix, kms_config); + let stage_kms = SnowflakeStageKms::new(client.clone(), &config.stage, stage_prefix, kms_config); // TODO: Azure KMS Some::>(Arc::new(stage_kms)) } else { None }; - let extension = Arc::new(SnowflakeAzureBlobExtension { + let extension = Arc::new(SnowflakeAzureExtension { stage: config.stage.clone(), client }); + // Andre's version has the following. TODO: understand why. Cleaner to return an option rather than an empty stage prefix? + // let stage_prefix = if stage_prefix.is_empty() { + // None + // } else { + // Some(stage_prefix.to_string()) + // }; + Ok((Arc::new(store), crypto_material_provider, stage_prefix.to_string(), extension)) } _ => { From 3aaa71b9233131755e7cd2d20214fa8d20e4b0b7 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Fri, 1 Nov 2024 15:24:26 +0100 Subject: [PATCH 03/25] first cut at encryption --- src/snowflake/kms.rs | 210 +++++++++++++++++++++++++++++++++++++++++-- src/snowflake/mod.rs | 6 +- 2 files changed, 206 insertions(+), 10 deletions(-) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index 789791b..e857a53 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -37,7 +37,7 @@ impl Default for SnowflakeStageKmsConfig { } #[derive(Clone)] -pub(crate) struct SnowflakeStageKms { +pub(crate) struct SnowflakeStageS3Kms { client: Arc, stage: String, prefix: String, @@ -45,9 +45,9 @@ pub(crate) struct SnowflakeStageKms { keyring: Cache } -impl std::fmt::Debug for SnowflakeStageKms { +impl std::fmt::Debug for SnowflakeStageS3Kms { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SnowflakeStageKms") + f.debug_struct("SnowflakeStageS3Kms") .field("client", &self.client) .field("stage", &self.stage) .field("config", &self.config) @@ -56,14 +56,14 @@ impl std::fmt::Debug for SnowflakeStageKms { } } -impl SnowflakeStageKms { +impl SnowflakeStageS3Kms { pub(crate) fn new( client: Arc, stage: impl Into, prefix: impl Into, config: SnowflakeStageKmsConfig - ) -> SnowflakeStageKms { - SnowflakeStageKms { + ) -> SnowflakeStageS3Kms { + SnowflakeStageS3Kms { client, stage: stage.into(), prefix: prefix.into(), @@ -77,7 +77,7 @@ impl SnowflakeStageKms { } #[async_trait::async_trait] -impl CryptoMaterialProvider for SnowflakeStageKms { +impl CryptoMaterialProvider for SnowflakeStageS3Kms { async fn material_for_write(&self, _path: &str, data_len: Option) -> crate::Result<(ContentCryptoMaterial, Attributes)> { let _guard = duration_on_drop!(metrics::material_for_write_duration); let info = self.client.current_upload_info(&self.stage).await?; @@ -193,3 +193,199 @@ impl CryptoMaterialProvider for SnowflakeStageKms { Ok(content_material) } } + +#[derive(Clone)] +pub(crate) struct SnowflakeStageAzureKms { + client: Arc, + stage: String, + prefix: String, + config: SnowflakeStageKmsConfig, + keyring: Cache, +} + +impl std::fmt::Debug for SnowflakeStageAzureKms { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SnowflakeStageAzureKms") + .field("client", &self.client) + .field("stage", &self.stage) + .field("config", &self.config) + .field("keyring", &"redacted") + .finish() + } +} + +impl SnowflakeStageAzureKms { + pub(crate) fn new( + client: Arc, + stage: impl Into, + prefix: impl Into, + config: SnowflakeStageKmsConfig + ) -> SnowflakeStageAzureKms { + SnowflakeStageAzureKms { + client, + stage: stage.into(), + prefix: prefix.into(), + keyring: Cache::builder() + .max_capacity(config.keyring_capacity as u64) + .time_to_live(config.keyring_ttl) + .build(), + config + } + } +} + +const AZURE_MATDESC_KEY: &str = "matdesc"; +const AZURE_ENCDATA_KEY: &str = "encryptiondata"; + +#[async_trait::async_trait] +impl CryptoMaterialProvider for SnowflakeStageAzureKms { + async fn material_for_write(&self, _path: &str, data_len: Option) -> crate::Result<(ContentCryptoMaterial, Attributes)> { + let _guard = duration_on_drop!(metrics::material_for_write_duration); + let info = self.client.current_upload_info(&self.stage).await?; + + let encryption_material = info.encryption_material.as_ref() + .ok_or_else(|| ErrorKind::StorageNotEncrypted(self.stage.clone()))?; + + let description = MaterialDescription { + smk_id: encryption_material.smk_id.to_string(), + query_id: encryption_material.query_id.clone(), + key_size: "128".to_string() + }; + let master_key = Key::from_base64(&encryption_material.query_stage_master_key) + .map_err(ErrorKind::MaterialDecode)?; + + let scheme = self.config.crypto_scheme; + let material = ContentCryptoMaterial::generate(scheme); + let encrypted_cek = material.cek.clone().encrypt_aes_128_ecb(&master_key) + .map_err(ErrorKind::MaterialCrypt)?; + // TODO: should this be AES_256 or 128 for Azure? I am confused because the metadata + // says 256, but the master key that I am seeing has 128. + + let mut attributes = Attributes::new(); + + // TODO: do we need to add aad? + + // We hardcode most of these values as the Go Snowflake client does (see + // https://github.com/snowflakedb/gosnowflake/blob/099708d318689634a558f705ccc19b3b7b278972/azure_storage_client.go#L152) + let encryption_data = EncryptionData { + encryption_mode: "FullBlob".to_string(), + wrapped_content_key: WrappedContentKey { + key_id: "symmKey1".to_string(), + encrypted_key: encrypted_cek.as_base64(), + algorithm: "AES_CBC_256".to_string(), + }, + encryption_agent: EncryptionAgent { + protocol: "1.0".to_string(), + encryption_algorithm: "AES_CBC_128".to_string(), + }, + content_encryption_i_v: material.iv.as_base64(), + key_wrapping_metadata: KeyWrappingMetadata { + encryption_library: "Java 5.3.0".to_string(), + }, + }; + + attributes.insert( + Attribute::Metadata(AZURE_ENCDATA_KEY.into()), + AttributeValue::from(serde_json::to_string(&encryption_data).context("failed to encode encryption data").to_err()?) + ); + + attributes.insert( + Attribute::Metadata(AZURE_MATDESC_KEY.into()), + AttributeValue::from(serde_json::to_string(&description).context("failed to encode matdesc").to_err()?) + ); + + // TODO: try to attach the (ununcrypted) content length to the file somehow + // TODO: try to attach a hash of the file + + Ok((material, attributes)) + } + + async fn material_from_metadata(&self, path: &str, attr: &Attributes) -> crate::Result { + // TODO: factor out code that is shared with S3 variant? + + let _guard = duration_on_drop!(metrics::material_from_metadata_duration); + let path = path.strip_prefix(&self.prefix).unwrap_or(path); + let required_attribute = |key: &'static str| { + let v: &str = attr.get(&Attribute::Metadata(key.into())) + .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? + .as_ref(); + Ok::<_, Error>(v) + }; + + let material_description: MaterialDescription = deserialize_str(required_attribute(AZURE_MATDESC_KEY)?) + .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; + + let master_key = self.keyring.try_get_with(material_description.query_id, async { + let info = self.client.fetch_path_info(&self.stage, path).await?; + let position = info.src_locations.iter().position(|l| l == path) + .ok_or_else(|| Error::invalid_response("path not found"))?; + let encryption_material = info.encryption_material.get(position) + .cloned() + .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? + .ok_or_else(|| Error::invalid_response("path not encrypted"))?; + + let master_key = Key::from_base64(&encryption_material.query_stage_master_key) + .map_err(ErrorKind::MaterialDecode)?; + counter!(metrics::total_keyring_miss).increment(1); + Ok::<_, Error>(master_key) + }).await?; + counter!(metrics::total_keyring_get).increment(1); + + let encryption_data: EncryptionData = deserialize_str(required_attribute(AZURE_ENCDATA_KEY)?) + .map_err(Error::deserialize_response_err("failed to deserialize encryption data"))?; + + let cek = EncryptedKey::from_base64(&encryption_data.wrapped_content_key.encrypted_key) + .map_err(ErrorKind::MaterialDecode)?; + let cek = cek.decrypt_aes_128_ecb(&master_key) + .map_err(ErrorKind::MaterialCrypt)?; + let iv = Iv::from_base64(&encryption_data.content_encryption_i_v) + .map_err(ErrorKind::MaterialDecode)?; + + let scheme = match encryption_data.encryption_agent.encryption_algorithm.as_str() { + "AES_GCM_256" => CryptoScheme::Aes256Gcm, + "AES_CBC_128" => CryptoScheme::Aes128Cbc, + v => unimplemented!("encryption algorithm `{}` not implemented", v) + }; + + let content_material = ContentCryptoMaterial { + scheme, + cek, + iv, + aad: None + }; + + Ok(content_material) + } +} + + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct EncryptionData { + encryption_mode: String, + wrapped_content_key: WrappedContentKey, + content_encryption_i_v: String, + encryption_agent: EncryptionAgent, + key_wrapping_metadata: KeyWrappingMetadata, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct WrappedContentKey { + key_id: String, + encrypted_key: String, + algorithm: String, // alg for encrypting the key +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct EncryptionAgent { + protocol: String, + encryption_algorithm: String, // alg for encryption the content +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct KeyWrappingMetadata { + encryption_library: String, +} \ No newline at end of file diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index c11e86d..a0546fb 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -6,7 +6,7 @@ use anyhow::Context as AnyhowContext; use client::{NormalizedStageInfo, SnowflakeClient, SnowflakeClientConfig}; pub(crate) mod kms; -use kms::{SnowflakeStageKms, SnowflakeStageKmsConfig}; +use kms::{SnowflakeStageS3Kms, SnowflakeStageAzureKms, SnowflakeStageKmsConfig}; use object_store::{azure::AzureCredential, RetryConfig, ObjectStore}; use tokio::sync::Mutex; @@ -392,7 +392,7 @@ pub(crate) async fn build_store_for_snowflake_stage( let crypto_material_provider = if info.stage_info.is_client_side_encrypted { let kms_config = config.kms_config.unwrap_or_default(); - let stage_kms = SnowflakeStageKms::new(client.clone(), &config.stage, stage_prefix, kms_config); + let stage_kms = SnowflakeStageS3Kms::new(client.clone(), &config.stage, stage_prefix, kms_config); Some::>(Arc::new(stage_kms)) } else { None @@ -440,7 +440,7 @@ pub(crate) async fn build_store_for_snowflake_stage( let crypto_material_provider = if info.stage_info.is_client_side_encrypted { let kms_config = config.kms_config.unwrap_or_default(); - let stage_kms = SnowflakeStageKms::new(client.clone(), &config.stage, stage_prefix, kms_config); // TODO: Azure KMS + let stage_kms = SnowflakeStageAzureKms::new(client.clone(), &config.stage, stage_prefix, kms_config); Some::>(Arc::new(stage_kms)) } else { None From a9c307d23df8580e22f07affa92bb7be2343685d Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 5 Nov 2024 08:55:30 +0100 Subject: [PATCH 04/25] some cleanup --- src/snowflake/mod.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index a0546fb..d9d341c 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -168,15 +168,13 @@ impl object_store::CredentialProvider for AzureStageCredentialProvider { })?; - // TODO: understand differences to Andre's method - let new_pairs = url::form_urlencoded::parse(new_creds.azure_sas_token.trim_start_matches('?').as_bytes()) + let token_bytes = new_creds.azure_sas_token.trim_start_matches('?').as_bytes(); + let new_pairs = url::form_urlencoded::parse(token_bytes) .into_owned() .collect(); - let mut locked = self.cached.lock().await; - // TODO: understand what the caching here does match locked.as_ref() { Some(creds) => { if matches!(creds.as_ref(), AzureCredential::SASToken(pairs) if *pairs == new_pairs) { @@ -337,7 +335,7 @@ pub(crate) async fn build_store_for_snowflake_stage( ) -> crate::Result<( Arc, Option>, - String, + Option, ClientExtension )> { let config = validate_config_for_snowflake(&mut config_map, retry_config.clone())?; @@ -403,7 +401,7 @@ pub(crate) async fn build_store_for_snowflake_stage( client }); - Ok((Arc::new(store), crypto_material_provider, stage_prefix.to_string(), extension)) + Ok((Arc::new(store), crypto_material_provider, Some(stage_prefix.to_string()), extension)) } "AZURE" => { let (container, stage_prefix) = info.stage_info.location.split_once('/') @@ -440,7 +438,12 @@ pub(crate) async fn build_store_for_snowflake_stage( let crypto_material_provider = if info.stage_info.is_client_side_encrypted { let kms_config = config.kms_config.unwrap_or_default(); - let stage_kms = SnowflakeStageAzureKms::new(client.clone(), &config.stage, stage_prefix, kms_config); + let stage_kms = SnowflakeStageAzureKms::new( + client.clone(), + &config.stage, + stage_prefix, + kms_config, + ); Some::>(Arc::new(stage_kms)) } else { None @@ -451,14 +454,13 @@ pub(crate) async fn build_store_for_snowflake_stage( client }); - // Andre's version has the following. TODO: understand why. Cleaner to return an option rather than an empty stage prefix? - // let stage_prefix = if stage_prefix.is_empty() { - // None - // } else { - // Some(stage_prefix.to_string()) - // }; + let stage_prefix = if stage_prefix.is_empty() { + None + } else { + Some(stage_prefix.to_string()) + }; - Ok((Arc::new(store), crypto_material_provider, stage_prefix.to_string(), extension)) + Ok((Arc::new(store), crypto_material_provider, stage_prefix, extension)) } _ => { unimplemented!("unknown stage location type: {}", info.stage_info.location_type); From d5cce06c72492c8df368ba5fa608f20cda180c02 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 5 Nov 2024 09:43:55 +0100 Subject: [PATCH 05/25] more cleanup --- src/lib.rs | 6 +-- src/snowflake/client.rs | 27 +++++++++++- src/snowflake/kms.rs | 97 +++++++++++++++-------------------------- src/util.rs | 8 ++++ 4 files changed, 72 insertions(+), 66 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5f465c9..fe87f7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -433,9 +433,9 @@ impl Client { let (store, crypto_material_provider, stage_prefix, extension) = build_store_for_snowflake_stage(map, config.retry_config.clone()).await?; let prefix = match (stage_prefix, config.prefix) { - (s, Some(u)) if s.ends_with("/") => Some(format!("{s}{u}")), - (s, Some(u)) => Some(format!("{s}/{u}")), - (s, None) => Some(s) + (Some(s), Some(u)) if s.ends_with("/") => Some(format!("{s}{u}")), + (Some(s), Some(u)) => Some(format!("{s}/{u}")), + (s, u) => s.or(u) }; config.prefix = prefix; diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 420fec5..9a948e8 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -5,8 +5,9 @@ use std::{collections::HashMap, sync::Arc, time::{Duration, Instant, SystemTime, use tokio::sync::Mutex; use zeroize::Zeroize; use moka::future::Cache; -use crate::{duration_on_drop, error::{Error, RetryState}, metrics}; +use crate::{duration_on_drop, error::{Error, RetryState, Kind as ErrorKind}, metrics}; use crate::util::{deserialize_str, deserialize_slice}; +use crate::encryption::Key; // use anyhow::anyhow; @@ -661,6 +662,30 @@ impl SnowflakeClient { }).await?; Ok(stage_info) } + pub(crate) async fn get_master_key( + &self, + query_id: String, + path: &str, + stage: &str, + keyring: &Cache, + ) -> crate::Result { + let master_key = keyring.try_get_with(query_id, async { + let info = self.fetch_path_info(stage, path).await?; + let position = info.src_locations.iter().position(|l| l == path) + .ok_or_else(|| Error::invalid_response("path not found"))?; + let encryption_material = info.encryption_material.get(position) + .cloned() + .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? + .ok_or_else(|| Error::invalid_response("path not encrypted"))?; + + let master_key = Key::from_base64(&encryption_material.query_stage_master_key) + .map_err(ErrorKind::MaterialDecode)?; + counter!(metrics::total_keyring_miss).increment(1); + Ok::<_, Error>(master_key) + }).await?; + counter!(metrics::total_keyring_get).increment(1); + Ok(master_key) + } } #[cfg(test)] diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index e857a53..7e2b239 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -1,7 +1,6 @@ -use crate::{duration_on_drop, encryption::{ContentCryptoMaterial, CryptoMaterialProvider, CryptoScheme, EncryptedKey, Iv, Key}, error::{Error, ErrorExt}, metrics, snowflake::SnowflakeClient, util::deserialize_str}; +use crate::{duration_on_drop, encryption::{ContentCryptoMaterial, CryptoMaterialProvider, CryptoScheme, EncryptedKey, Iv, Key}, error::{Error, ErrorExt}, metrics, snowflake::SnowflakeClient, util::{deserialize_str, required_attribute}}; use crate::error::Kind as ErrorKind; -use ::metrics::counter; use serde::{Serialize, Deserialize}; use object_store::{Attributes, Attribute, AttributeValue}; use anyhow::Context; @@ -137,40 +136,25 @@ impl CryptoMaterialProvider for SnowflakeStageS3Kms { async fn material_from_metadata(&self, path: &str, attr: &Attributes) -> crate::Result { let _guard = duration_on_drop!(metrics::material_from_metadata_duration); let path = path.strip_prefix(&self.prefix).unwrap_or(path); - let required_attribute = |key: &'static str| { - let v: &str = attr.get(&Attribute::Metadata(key.into())) - .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? - .as_ref(); - Ok::<_, Error>(v) - }; - - let material_description: MaterialDescription = deserialize_str(required_attribute("x-amz-matdesc")?) + let material_description: MaterialDescription = + deserialize_str(required_attribute("x-amz-matdesc", &attr)?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; - let master_key = self.keyring.try_get_with(material_description.query_id, async { - let info = self.client.fetch_path_info(&self.stage, path).await?; - let position = info.src_locations.iter().position(|l| l == path) - .ok_or_else(|| Error::invalid_response("path not found"))?; - let encryption_material = info.encryption_material.get(position) - .cloned() - .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? - .ok_or_else(|| Error::invalid_response("path not encrypted"))?; - - let master_key = Key::from_base64(&encryption_material.query_stage_master_key) - .map_err(ErrorKind::MaterialDecode)?; - counter!(metrics::total_keyring_miss).increment(1); - Ok::<_, Error>(master_key) - }).await?; - counter!(metrics::total_keyring_get).increment(1); - - let cek = EncryptedKey::from_base64(required_attribute("x-amz-key")?) + let master_key = &self.client.get_master_key( + material_description.query_id.clone(), + path, + &self.stage, + &self.keyring, + ).await?; + + let cek = EncryptedKey::from_base64(required_attribute("x-amz-key", &attr)?) .map_err(ErrorKind::MaterialDecode)?; let cek = cek.decrypt_aes_128_ecb(&master_key) .map_err(ErrorKind::MaterialCrypt)?; - let iv = Iv::from_base64(required_attribute("x-amz-iv")?) + let iv = Iv::from_base64(required_attribute("x-amz-iv", &attr)?) .map_err(ErrorKind::MaterialDecode)?; - let alg = required_attribute("x-amz-cek-alg"); + let alg = required_attribute("x-amz-cek-alg", &attr); let scheme = match alg { Ok("AES/GCM/NoPadding") => CryptoScheme::Aes256Gcm, @@ -258,13 +242,9 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { let material = ContentCryptoMaterial::generate(scheme); let encrypted_cek = material.cek.clone().encrypt_aes_128_ecb(&master_key) .map_err(ErrorKind::MaterialCrypt)?; - // TODO: should this be AES_256 or 128 for Azure? I am confused because the metadata - // says 256, but the master key that I am seeing has 128. let mut attributes = Attributes::new(); - // TODO: do we need to add aad? - // We hardcode most of these values as the Go Snowflake client does (see // https://github.com/snowflakedb/gosnowflake/blob/099708d318689634a558f705ccc19b3b7b278972/azure_storage_client.go#L152) let encryption_data = EncryptionData { @@ -286,52 +266,45 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { attributes.insert( Attribute::Metadata(AZURE_ENCDATA_KEY.into()), - AttributeValue::from(serde_json::to_string(&encryption_data).context("failed to encode encryption data").to_err()?) + AttributeValue::from( + serde_json::to_string(&encryption_data) + .context("failed to encode encryption data") + .to_err()? + ) ); attributes.insert( Attribute::Metadata(AZURE_MATDESC_KEY.into()), - AttributeValue::from(serde_json::to_string(&description).context("failed to encode matdesc").to_err()?) + AttributeValue::from( + serde_json::to_string(&description) + .context("failed to encode matdesc") + .to_err()? + ) ); - // TODO: try to attach the (ununcrypted) content length to the file somehow // TODO: try to attach a hash of the file + // TODO: do we need to add aad? Ok((material, attributes)) } async fn material_from_metadata(&self, path: &str, attr: &Attributes) -> crate::Result { - // TODO: factor out code that is shared with S3 variant? - let _guard = duration_on_drop!(metrics::material_from_metadata_duration); let path = path.strip_prefix(&self.prefix).unwrap_or(path); - let required_attribute = |key: &'static str| { - let v: &str = attr.get(&Attribute::Metadata(key.into())) - .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? - .as_ref(); - Ok::<_, Error>(v) - }; - let material_description: MaterialDescription = deserialize_str(required_attribute(AZURE_MATDESC_KEY)?) + let material_description: MaterialDescription = + deserialize_str(required_attribute(AZURE_MATDESC_KEY, &attr)?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; - let master_key = self.keyring.try_get_with(material_description.query_id, async { - let info = self.client.fetch_path_info(&self.stage, path).await?; - let position = info.src_locations.iter().position(|l| l == path) - .ok_or_else(|| Error::invalid_response("path not found"))?; - let encryption_material = info.encryption_material.get(position) - .cloned() - .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? - .ok_or_else(|| Error::invalid_response("path not encrypted"))?; - - let master_key = Key::from_base64(&encryption_material.query_stage_master_key) - .map_err(ErrorKind::MaterialDecode)?; - counter!(metrics::total_keyring_miss).increment(1); - Ok::<_, Error>(master_key) - }).await?; - counter!(metrics::total_keyring_get).increment(1); - - let encryption_data: EncryptionData = deserialize_str(required_attribute(AZURE_ENCDATA_KEY)?) + let master_key = &self.client.get_master_key( + material_description.query_id.clone(), + path, + &self.stage, + &self.keyring, + ).await?; + + let encryption_data: EncryptionData = + deserialize_str(required_attribute(AZURE_ENCDATA_KEY, &attr)?) .map_err(Error::deserialize_response_err("failed to deserialize encryption data"))?; let cek = EncryptedKey::from_base64(&encryption_data.wrapped_content_key.encrypted_key) diff --git a/src/util.rs b/src/util.rs index c688243..1b6961a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -8,6 +8,7 @@ use object_store::path::Path; use object_store::{Attribute, AttributeValue, Attributes, GetOptions, ObjectStore, TagSet}; use pin_project::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt}; +use crate::error::Error; use crate::error::Kind as ErrorKind; use std::error::Error as StdError; @@ -485,3 +486,10 @@ where let de = &mut serde_json::Deserializer::from_str(v); serde_path_to_error::deserialize(de) } + +pub(crate) fn required_attribute<'a>(key: &'static str, attr: &'a Attributes) -> Result<&'a str, Error> { + let v: &str = attr.get(&Attribute::Metadata(key.into())) + .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? + .as_ref(); + Ok::<_, Error>(v) +} \ No newline at end of file From cd2cf6be2e461bfabe1fdca0e7168e9e1ce93f19 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 5 Nov 2024 09:56:51 +0100 Subject: [PATCH 06/25] minor --- src/snowflake/client.rs | 1 - src/util.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 9a948e8..f7b93c4 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -8,7 +8,6 @@ use moka::future::Cache; use crate::{duration_on_drop, error::{Error, RetryState, Kind as ErrorKind}, metrics}; use crate::util::{deserialize_str, deserialize_slice}; use crate::encryption::Key; -// use anyhow::anyhow; #[derive(Debug, Serialize, Deserialize)] diff --git a/src/util.rs b/src/util.rs index 1b6961a..520f0f5 100644 --- a/src/util.rs +++ b/src/util.rs @@ -492,4 +492,4 @@ pub(crate) fn required_attribute<'a>(key: &'static str, attr: &'a Attributes) -> .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? .as_ref(); Ok::<_, Error>(v) -} \ No newline at end of file +} From 531fdc9fbea2a428c63f1d31d33f9b7cfd845e9a Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 5 Nov 2024 10:06:58 +0100 Subject: [PATCH 07/25] retried; hashes now show up in SF --- src/snowflake/kms.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index 7e2b239..10bdd48 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -282,7 +282,6 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { ) ); - // TODO: try to attach a hash of the file // TODO: do we need to add aad? Ok((material, attributes)) From e5c150c3c6aa05f35fb1edadff0c1e739cd41aeb Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 5 Nov 2024 10:45:36 +0100 Subject: [PATCH 08/25] remove 256 GCM option for now --- src/snowflake/kms.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index 10bdd48..e5e7086 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -282,8 +282,6 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { ) ); - // TODO: do we need to add aad? - Ok((material, attributes)) } @@ -314,7 +312,6 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { .map_err(ErrorKind::MaterialDecode)?; let scheme = match encryption_data.encryption_agent.encryption_algorithm.as_str() { - "AES_GCM_256" => CryptoScheme::Aes256Gcm, "AES_CBC_128" => CryptoScheme::Aes128Cbc, v => unimplemented!("encryption algorithm `{}` not implemented", v) }; @@ -323,7 +320,7 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { scheme, cek, iv, - aad: None + aad: None, }; Ok(content_material) From 9fabc2bec4320e5ae9c850bb7a689ea258516e0c Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Thu, 7 Nov 2024 09:34:07 +0100 Subject: [PATCH 09/25] TEMP logs about url config --- src/error.rs | 2 +- src/lib.rs | 2 ++ src/snowflake/client.rs | 4 ++++ src/snowflake/mod.rs | 3 +++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index e77f2db..9db4a7e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -384,7 +384,7 @@ impl RetryState { } pub(crate) fn should_retry_logic(&self) -> bool { - let max_retries = self.retry_config.max_retries; + let max_retries = 1; // TEMP: lower for easier debugging let retry_timeout = self.retry_config.retry_timeout; let elapsed = self.start.elapsed(); let all_retries = self.retries(); diff --git a/src/lib.rs b/src/lib.rs index fe87f7b..8bf7037 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -376,6 +376,8 @@ impl Client { let url = map.remove("url") .ok_or(Error::invalid_config("config object must have a key named 'url'"))?; + tracing::info!("creating client for url: {}", url); + let url = url::Url::parse(&url) .map_err(Error::invalid_config_err("failed to parse `url`"))?; diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index f7b93c4..e60abc2 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -181,6 +181,10 @@ impl TryFrom<&SnowflakeStageInfo> for NormalizedStageInfo { let storage_account = value.storage_account .clone() .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; + tracing::info!( + "Azure. container: {}, prefix: {}, storage account: {}, end point: {:?}", + container, prefix, storage_account, value.end_point.clone(), + ); return Ok(NormalizedStageInfo::BlobStorage { storage_account: storage_account, container: container.to_string(), diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index d9d341c..7c86c45 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -342,6 +342,8 @@ pub(crate) async fn build_store_for_snowflake_stage( let client = SnowflakeClient::new(config.client_config); let info = client.current_upload_info(&config.stage).await?; + tracing::info!("stage info: {:?}", info); + match info.stage_info.location_type.as_ref() { "S3" => { let (bucket, stage_prefix) = info.stage_info.location.split_once('/') @@ -410,6 +412,7 @@ pub(crate) async fn build_store_for_snowflake_stage( .clone() .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; + tracing::info!("set up azure store for container: {}, storage account: {}", container, storage_account); let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); From 208558de146815a0a86a1e19ae1a50f77743042e Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Thu, 7 Nov 2024 12:32:11 +0100 Subject: [PATCH 10/25] TEMP Switch back to trust-dns --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index aa8e661..236b219 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ bytes = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } futures-util = "0.3" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "trust-dns"] } # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } From 79be98c9128bde438e0f042518cce00f740c12f6 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Thu, 7 Nov 2024 18:05:27 +0100 Subject: [PATCH 11/25] hijack hickory with a version that has a larger buffer --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 236b219..6473cb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,3 +81,6 @@ openssl = { version = "0.10.66", features = ["vendored"] } [dev-dependencies] criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support", "html_reports"] } + +[patch.crates-io] +hickory-resolver = { git = "https://github.com/alexrenz/hickory-dns.git", branch = "larger-buffer-024" } From ee274ab488206933b4891666a0a6ec18b09e3989 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Thu, 7 Nov 2024 18:20:05 +0100 Subject: [PATCH 12/25] actually switch back to hickory --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6473cb2..a2b72e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ bytes = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } futures-util = "0.3" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "trust-dns"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] } # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } From 5e03c8d23d4d40962707c51bb635e4b0f5f3a79a Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Fri, 8 Nov 2024 07:32:03 +0100 Subject: [PATCH 13/25] Re-activate retries --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 9db4a7e..e77f2db 100644 --- a/src/error.rs +++ b/src/error.rs @@ -384,7 +384,7 @@ impl RetryState { } pub(crate) fn should_retry_logic(&self) -> bool { - let max_retries = 1; // TEMP: lower for easier debugging + let max_retries = self.retry_config.max_retries; let retry_timeout = self.retry_config.retry_timeout; let elapsed = self.start.elapsed(); let all_retries = self.retries(); From 015a137420a888b1b3c3405ab93b385704675daf Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:35:05 +0100 Subject: [PATCH 14/25] Use eDNS --- Cargo.lock | 1 + Cargo.toml | 4 +--- src/snowflake/client.rs | 2 ++ src/snowflake/mod.rs | 2 ++ src/snowflake/resolver.rs | 47 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 src/snowflake/resolver.rs diff --git a/Cargo.lock b/Cargo.lock index 6485faf..bacfebd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1422,6 +1422,7 @@ dependencies = [ "flate2", "flume", "futures-util", + "hickory-resolver", "hyper", "metrics", "metrics-util", diff --git a/Cargo.toml b/Cargo.toml index a2b72e8..f747f8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls" # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } +hickory-resolver = "0.24" thiserror = "1" anyhow = { version = "1", features = ["backtrace"] } once_cell = "1.18" @@ -81,6 +82,3 @@ openssl = { version = "0.10.66", features = ["vendored"] } [dev-dependencies] criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support", "html_reports"] } - -[patch.crates-io] -hickory-resolver = { git = "https://github.com/alexrenz/hickory-dns.git", branch = "larger-buffer-024" } diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index e60abc2..2b3b190 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -8,6 +8,7 @@ use moka::future::Cache; use crate::{duration_on_drop, error::{Error, RetryState, Kind as ErrorKind}, metrics}; use crate::util::{deserialize_str, deserialize_slice}; use crate::encryption::Key; +use super::resolver::HickoryResolverWithEdns; #[derive(Debug, Serialize, Deserialize)] @@ -348,6 +349,7 @@ impl SnowflakeClient { let client = SnowflakeClient { config, client: reqwest::Client::builder() + .dns_resolver(Arc::new(HickoryResolverWithEdns::default())) .timeout(Duration::from_secs(180)) .build().unwrap(), token: Arc::new(Mutex::new(None)), diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 7c86c45..3d749d9 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -8,6 +8,8 @@ use client::{NormalizedStageInfo, SnowflakeClient, SnowflakeClientConfig}; pub(crate) mod kms; use kms::{SnowflakeStageS3Kms, SnowflakeStageAzureKms, SnowflakeStageKmsConfig}; +mod resolver; + use object_store::{azure::AzureCredential, RetryConfig, ObjectStore}; use tokio::sync::Mutex; use std::sync::Arc; diff --git a/src/snowflake/resolver.rs b/src/snowflake/resolver.rs new file mode 100644 index 0000000..fccc360 --- /dev/null +++ b/src/snowflake/resolver.rs @@ -0,0 +1,47 @@ +use hickory_resolver::{TokioAsyncResolver, system_conf}; +use reqwest::dns::{Addrs, Name, Resolving, Resolve}; +use std::sync::Arc; +use std::net::SocketAddr; +use once_cell::sync::OnceCell; + +/// A hickory resolver that uses extended DNS (eDNS) to resolve domain names. We use this to +/// circumvent a bug in the hickory resolver: hickory allocates a buffer of 512 bytes for +/// name server replies, but we observed >512 bytes replies in Azure. Enabling eDNS +/// circumvents this problem because hickory determines the receive buffer size differently +/// with eDNS. Unfortunately, we have not figured out an easier way to enable eDNS than +/// implementing a custom resolver. Our implementation is based on reqwest's hickory +/// wrapper, see https://github.com/Xuanwo/reqwest-hickory-resolver/blob/main/src/lib.rs. +#[derive(Debug, Default, Clone)] +pub(super) struct HickoryResolverWithEdns { + // Delay construction as initialization might be outside the Tokio runtime context. + state: Arc>, +} + +impl Resolve for HickoryResolverWithEdns { + fn resolve(&self, name: Name) -> Resolving { + let hickory_resolver = self.clone(); + Box::pin(async move { + let resolver = hickory_resolver.state.get_or_try_init(new_resolver)?; + + let lookup = resolver.lookup_ip(name.as_str()).await?; + + let addrs: Addrs = Box::new(lookup.into_iter().map(|addr| SocketAddr::new(addr, 0))); + Ok(addrs) + }) + } +} + +/// Create a new resolver with the default configuration, +/// which reads from `/etc/resolve.conf`. +fn new_resolver() -> std::io::Result { + let (config, mut opts) = system_conf::read_system_conf().map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("error reading DNS system conf: {}", e), + ) + })?; + + opts.edns0 = true; + + Ok(TokioAsyncResolver::tokio(config, opts)) +} \ No newline at end of file From 1e251011dfeec469dd3dddde70d1d30724e83df4 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:37:29 +0100 Subject: [PATCH 15/25] clean up log msgs --- src/lib.rs | 2 -- src/snowflake/client.rs | 4 ---- src/snowflake/mod.rs | 4 ---- 3 files changed, 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8bf7037..fe87f7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -376,8 +376,6 @@ impl Client { let url = map.remove("url") .ok_or(Error::invalid_config("config object must have a key named 'url'"))?; - tracing::info!("creating client for url: {}", url); - let url = url::Url::parse(&url) .map_err(Error::invalid_config_err("failed to parse `url`"))?; diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 2b3b190..88b2d95 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -182,10 +182,6 @@ impl TryFrom<&SnowflakeStageInfo> for NormalizedStageInfo { let storage_account = value.storage_account .clone() .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; - tracing::info!( - "Azure. container: {}, prefix: {}, storage account: {}, end point: {:?}", - container, prefix, storage_account, value.end_point.clone(), - ); return Ok(NormalizedStageInfo::BlobStorage { storage_account: storage_account, container: container.to_string(), diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 3d749d9..ac406ec 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -344,8 +344,6 @@ pub(crate) async fn build_store_for_snowflake_stage( let client = SnowflakeClient::new(config.client_config); let info = client.current_upload_info(&config.stage).await?; - tracing::info!("stage info: {:?}", info); - match info.stage_info.location_type.as_ref() { "S3" => { let (bucket, stage_prefix) = info.stage_info.location.split_once('/') @@ -414,8 +412,6 @@ pub(crate) async fn build_store_for_snowflake_stage( .clone() .ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?; - tracing::info!("set up azure store for container: {}, storage account: {}", container, storage_account); - let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); match info.stage_info.test_endpoint.as_deref() { From 4c3b1a660fda70b0c4914336144e00543b4cad50 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:45:11 +0100 Subject: [PATCH 16/25] make get_master_key a free function --- src/snowflake/kms.rs | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index e5e7086..8f851b8 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -1,4 +1,5 @@ use crate::{duration_on_drop, encryption::{ContentCryptoMaterial, CryptoMaterialProvider, CryptoScheme, EncryptedKey, Iv, Key}, error::{Error, ErrorExt}, metrics, snowflake::SnowflakeClient, util::{deserialize_str, required_attribute}}; +use ::metrics::counter; use crate::error::Kind as ErrorKind; use serde::{Serialize, Deserialize}; @@ -141,7 +142,8 @@ impl CryptoMaterialProvider for SnowflakeStageS3Kms { deserialize_str(required_attribute("x-amz-matdesc", &attr)?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; - let master_key = &self.client.get_master_key( + let master_key = get_master_key( + &self.client, material_description.query_id.clone(), path, &self.stage, @@ -293,7 +295,8 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { deserialize_str(required_attribute(AZURE_MATDESC_KEY, &attr)?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; - let master_key = &self.client.get_master_key( + let master_key = get_master_key( + &self.client, material_description.query_id.clone(), path, &self.stage, @@ -357,4 +360,29 @@ struct EncryptionAgent { #[serde(rename_all = "PascalCase")] struct KeyWrappingMetadata { encryption_library: String, -} \ No newline at end of file +} + +async fn get_master_key( + client: &SnowflakeClient, + query_id: String, + path: &str, + stage: &str, + keyring: &Cache, +) -> crate::Result { + let master_key = keyring.try_get_with(query_id, async { + let info = client.fetch_path_info(stage, path).await?; + let position = info.src_locations.iter().position(|l| l == path) + .ok_or_else(|| Error::invalid_response("path not found"))?; + let encryption_material = info.encryption_material.get(position) + .cloned() + .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? + .ok_or_else(|| Error::invalid_response("path not encrypted"))?; + + let master_key = Key::from_base64(&encryption_material.query_stage_master_key) + .map_err(ErrorKind::MaterialDecode)?; + counter!(metrics::total_keyring_miss).increment(1); + Ok::<_, Error>(master_key) + }).await?; + counter!(metrics::total_keyring_get).increment(1); + Ok(master_key) +} From cb3022342e0309a504ca507fe0b03bb895acc461 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:46:24 +0100 Subject: [PATCH 17/25] simplify --- src/snowflake/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index ac406ec..8091b23 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -414,11 +414,8 @@ pub(crate) async fn build_store_for_snowflake_stage( let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); - match info.stage_info.test_endpoint.as_deref() { - Some(_) => { - unimplemented!("test endpoint for azure blob storage is not supported"); - } - None => {} + if info.stage_info.test_endpoint.is_some() { + unimplemented!("test endpoint for azure blob storage is not supported"); } let mut builder = object_store::azure::MicrosoftAzureBuilder::default() From f3509a4efa5d32f1072b93fe4d1985ab888a1ffe Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:49:54 +0100 Subject: [PATCH 18/25] actually delete old get_master_key --- src/snowflake/client.rs | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index 88b2d95..d5078bf 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -663,30 +663,6 @@ impl SnowflakeClient { }).await?; Ok(stage_info) } - pub(crate) async fn get_master_key( - &self, - query_id: String, - path: &str, - stage: &str, - keyring: &Cache, - ) -> crate::Result { - let master_key = keyring.try_get_with(query_id, async { - let info = self.fetch_path_info(stage, path).await?; - let position = info.src_locations.iter().position(|l| l == path) - .ok_or_else(|| Error::invalid_response("path not found"))?; - let encryption_material = info.encryption_material.get(position) - .cloned() - .ok_or_else(|| Error::invalid_response("src locations and encryption material length mismatch"))? - .ok_or_else(|| Error::invalid_response("path not encrypted"))?; - - let master_key = Key::from_base64(&encryption_material.query_stage_master_key) - .map_err(ErrorKind::MaterialDecode)?; - counter!(metrics::total_keyring_miss).increment(1); - Ok::<_, Error>(master_key) - }).await?; - counter!(metrics::total_keyring_get).increment(1); - Ok(master_key) - } } #[cfg(test)] From a7ec1a72b2471ecd5ebbcfa702732445b3c53b97 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 12 Nov 2024 10:52:09 +0100 Subject: [PATCH 19/25] more cleanup --- src/snowflake/client.rs | 3 +-- src/snowflake/kms.rs | 14 +++++++------- src/util.rs | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/snowflake/client.rs b/src/snowflake/client.rs index d5078bf..38d4db6 100644 --- a/src/snowflake/client.rs +++ b/src/snowflake/client.rs @@ -5,9 +5,8 @@ use std::{collections::HashMap, sync::Arc, time::{Duration, Instant, SystemTime, use tokio::sync::Mutex; use zeroize::Zeroize; use moka::future::Cache; -use crate::{duration_on_drop, error::{Error, RetryState, Kind as ErrorKind}, metrics}; +use crate::{duration_on_drop, error::{Error, RetryState}, metrics}; use crate::util::{deserialize_str, deserialize_slice}; -use crate::encryption::Key; use super::resolver::HickoryResolverWithEdns; diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index 8f851b8..60a3c28 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -139,7 +139,7 @@ impl CryptoMaterialProvider for SnowflakeStageS3Kms { let path = path.strip_prefix(&self.prefix).unwrap_or(path); let material_description: MaterialDescription = - deserialize_str(required_attribute("x-amz-matdesc", &attr)?) + deserialize_str(required_attribute(&attr, "x-amz-matdesc")?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; let master_key = get_master_key( @@ -150,13 +150,13 @@ impl CryptoMaterialProvider for SnowflakeStageS3Kms { &self.keyring, ).await?; - let cek = EncryptedKey::from_base64(required_attribute("x-amz-key", &attr)?) + let cek = EncryptedKey::from_base64(required_attribute(&attr, "x-amz-key")?) .map_err(ErrorKind::MaterialDecode)?; let cek = cek.decrypt_aes_128_ecb(&master_key) .map_err(ErrorKind::MaterialCrypt)?; - let iv = Iv::from_base64(required_attribute("x-amz-iv", &attr)?) + let iv = Iv::from_base64(required_attribute(&attr, "x-amz-iv")?) .map_err(ErrorKind::MaterialDecode)?; - let alg = required_attribute("x-amz-cek-alg", &attr); + let alg = required_attribute(&attr, "x-amz-cek-alg"); let scheme = match alg { Ok("AES/GCM/NoPadding") => CryptoScheme::Aes256Gcm, @@ -225,7 +225,7 @@ const AZURE_ENCDATA_KEY: &str = "encryptiondata"; #[async_trait::async_trait] impl CryptoMaterialProvider for SnowflakeStageAzureKms { - async fn material_for_write(&self, _path: &str, data_len: Option) -> crate::Result<(ContentCryptoMaterial, Attributes)> { + async fn material_for_write(&self, _path: &str, _data_len: Option) -> crate::Result<(ContentCryptoMaterial, Attributes)> { let _guard = duration_on_drop!(metrics::material_for_write_duration); let info = self.client.current_upload_info(&self.stage).await?; @@ -292,7 +292,7 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { let path = path.strip_prefix(&self.prefix).unwrap_or(path); let material_description: MaterialDescription = - deserialize_str(required_attribute(AZURE_MATDESC_KEY, &attr)?) + deserialize_str(required_attribute(&attr, AZURE_MATDESC_KEY)?) .map_err(Error::deserialize_response_err("failed to deserialize matdesc"))?; let master_key = get_master_key( @@ -304,7 +304,7 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { ).await?; let encryption_data: EncryptionData = - deserialize_str(required_attribute(AZURE_ENCDATA_KEY, &attr)?) + deserialize_str(required_attribute(&attr, AZURE_ENCDATA_KEY)?) .map_err(Error::deserialize_response_err("failed to deserialize encryption data"))?; let cek = EncryptedKey::from_base64(&encryption_data.wrapped_content_key.encrypted_key) diff --git a/src/util.rs b/src/util.rs index 520f0f5..ef4fee2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -487,7 +487,7 @@ where serde_path_to_error::deserialize(de) } -pub(crate) fn required_attribute<'a>(key: &'static str, attr: &'a Attributes) -> Result<&'a str, Error> { +pub(crate) fn required_attribute<'a>(attr: &'a Attributes, key: &'static str) -> Result<&'a str, Error> { let v: &str = attr.get(&Attribute::Metadata(key.into())) .ok_or_else(|| Error::required_config(format!("missing required attribute `{}`", key)))? .as_ref(); From ea65e80e08fa0850723890e58bf19a6bd92a64bc Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Fri, 15 Nov 2024 09:34:01 +0100 Subject: [PATCH 20/25] Decrypt `AES_CBC_256` files with AES 128 --- src/snowflake/kms.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index 60a3c28..dd73d5b 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -316,6 +316,7 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { let scheme = match encryption_data.encryption_agent.encryption_algorithm.as_str() { "AES_CBC_128" => CryptoScheme::Aes128Cbc, + "AES_CBC_256" => CryptoScheme::Aes128Cbc, // TODO: discuss v => unimplemented!("encryption algorithm `{}` not implemented", v) }; From b16efc2c83bee23553cb8f1d8cb4f1f3372b572b Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Mon, 18 Nov 2024 09:17:16 +0100 Subject: [PATCH 21/25] cleanup --- src/snowflake/kms.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/kms.rs b/src/snowflake/kms.rs index dd73d5b..adfaa51 100644 --- a/src/snowflake/kms.rs +++ b/src/snowflake/kms.rs @@ -316,7 +316,7 @@ impl CryptoMaterialProvider for SnowflakeStageAzureKms { let scheme = match encryption_data.encryption_agent.encryption_algorithm.as_str() { "AES_CBC_128" => CryptoScheme::Aes128Cbc, - "AES_CBC_256" => CryptoScheme::Aes128Cbc, // TODO: discuss + "AES_CBC_256" => CryptoScheme::Aes128Cbc, v => unimplemented!("encryption algorithm `{}` not implemented", v) }; From 8e8d425e50a9f6068fd60c38f2d02f3e7fc73713 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 12:33:46 +0100 Subject: [PATCH 22/25] Support test endpoint in Azure --- src/snowflake/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 8091b23..8ff634a 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -414,16 +414,17 @@ pub(crate) async fn build_store_for_snowflake_stage( let provider = AzureStageCredentialProvider::new(&config.stage, client.clone()); - if info.stage_info.test_endpoint.is_some() { - unimplemented!("test endpoint for azure blob storage is not supported"); - } - let mut builder = object_store::azure::MicrosoftAzureBuilder::default() .with_account(storage_account) .with_container_name(container) .with_credentials(Arc::new(provider)) .with_retry(retry_config); + if let Some(test_endpoint) = &info.stage_info.test_endpoint { + builder = builder.with_endpoint(test_endpoint.to_string()); + config_map.insert("allow_http".into(), "true".into()); + } + for (key, value) in config_map { builder = builder.with_config(key.parse()?, value); } From bec58f63c462122e7977f0390c62295e92b1ff9e Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 14:03:04 +0100 Subject: [PATCH 23/25] proper azurite support --- src/snowflake/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/snowflake/mod.rs b/src/snowflake/mod.rs index 8ff634a..50e3d60 100644 --- a/src/snowflake/mod.rs +++ b/src/snowflake/mod.rs @@ -422,7 +422,12 @@ pub(crate) async fn build_store_for_snowflake_stage( if let Some(test_endpoint) = &info.stage_info.test_endpoint { builder = builder.with_endpoint(test_endpoint.to_string()); - config_map.insert("allow_http".into(), "true".into()); + let mut azurite_host = url::Url::parse(&test_endpoint) + .map_err(Error::invalid_config_err("failed to parse azurite_host"))?; + azurite_host.set_path(""); + unsafe { std::env::set_var("AZURITE_BLOB_STORAGE_URL", azurite_host.as_str()) }; + config_map.insert("allow_invalid_certificates".into(), "true".into()); + config_map.insert("azure_storage_use_emulator".into(), "true".into()); } for (key, value) in config_map { From 6e54faa215ae22b1ea8bfd46708a77738423b1b9 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 19:45:05 +0100 Subject: [PATCH 24/25] Bump package version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1f3f5a7..03a8170 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "object_store_ffi" -version = "0.10.1" +version = "0.11" edition = "2021" [[bench]] From a15568e5fdbdcedd0d4a3220c6425bece874ab49 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Wed, 20 Nov 2024 08:52:35 +0100 Subject: [PATCH 25/25] fix version number --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 03a8170..4af6651 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "object_store_ffi" -version = "0.11" +version = "0.11.0" edition = "2021" [[bench]]