Skip to content

Commit 0ea81c7

Browse files
authored
fix(api): Implement storable trait to make it more reliable to store configs in the db (supabase#283)
1 parent c883c56 commit 0ea81c7

File tree

9 files changed

+32
-13
lines changed

9 files changed

+32
-13
lines changed

etl-api/src/configs/destination.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
use crate::configs::encryption::{
2-
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
3-
decrypt_text, encrypt_text,
4-
};
51
use etl_config::SerializableSecretString;
62
use etl_config::shared::DestinationConfig;
73
use secrecy::ExposeSecret;
84
use serde::{Deserialize, Serialize};
95
use utoipa::ToSchema;
106

7+
use crate::configs::encryption::{
8+
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
9+
decrypt_text, encrypt_text,
10+
};
11+
use crate::configs::store::Store;
12+
1113
const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;
1214

1315
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
@@ -149,6 +151,8 @@ pub enum EncryptedStoredDestinationConfig {
149151
},
150152
}
151153

154+
impl Store for EncryptedStoredDestinationConfig {}
155+
152156
impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
153157
fn decrypt(
154158
self,

etl-api/src/configs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub mod encryption;
33
pub mod pipeline;
44
pub mod serde;
55
pub mod source;
6+
pub mod store;

etl-api/src/configs/pipeline.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::configs::store::Store;
12
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
23
use serde::{Deserialize, Serialize};
34
use utoipa::ToSchema;
@@ -83,6 +84,8 @@ impl StoredPipelineConfig {
8384
}
8485
}
8586

87+
impl Store for StoredPipelineConfig {}
88+
8689
impl From<FullApiPipelineConfig> for StoredPipelineConfig {
8790
fn from(value: FullApiPipelineConfig) -> Self {
8891
Self {

etl-api/src/configs/serde.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use serde::Serialize;
2-
use serde::de::DeserializeOwned;
32
use thiserror::Error;
43

54
use crate::configs::encryption::{
65
Decrypt, DecryptionError, Encrypt, EncryptionError, EncryptionKey,
76
};
7+
use crate::configs::store::Store;
88

99
/// Errors that can occur during serialization or encryption for database storage.
1010
#[derive(Debug, Error)]
@@ -35,7 +35,7 @@ pub enum DbDeserializationError {
3535
/// Returns an error if serialization fails.
3636
pub fn serialize<S>(value: S) -> Result<serde_json::Value, DbSerializationError>
3737
where
38-
S: Serialize,
38+
S: Store,
3939
{
4040
let serialized_value = serde_json::to_value(value)?;
4141

@@ -65,7 +65,7 @@ where
6565
/// Returns an error if deserialization fails.
6666
pub fn deserialize_from_value<S>(value: serde_json::Value) -> Result<S, DbDeserializationError>
6767
where
68-
S: DeserializeOwned,
68+
S: Store,
6969
{
7070
let deserialized_value = serde_json::from_value(value)?;
7171

@@ -84,7 +84,7 @@ pub fn decrypt_and_deserialize_from_value<T, S>(
8484
) -> Result<S, DbDeserializationError>
8585
where
8686
T: Decrypt<S>,
87-
T: DeserializeOwned,
87+
T: Store,
8888
{
8989
let deserialized_value: T = serde_json::from_value(value)?;
9090
let value = deserialized_value.decrypt(encryption_key)?;

etl-api/src/configs/source.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::configs::encryption::{
88
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
99
decrypt_text, encrypt_text,
1010
};
11+
use crate::configs::store::Store;
1112

1213
const DEFAULT_TLS_TRUSTED_ROOT_CERTS: &str = "";
1314
const DEFAULT_TLS_ENABLED: bool = false;
@@ -144,6 +145,8 @@ pub struct EncryptedStoredSourceConfig {
144145
password: Option<EncryptedValue>,
145146
}
146147

148+
impl Store for EncryptedStoredSourceConfig {}
149+
147150
impl Decrypt<StoredSourceConfig> for EncryptedStoredSourceConfig {
148151
fn decrypt(
149152
self,

etl-api/src/configs/store.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use serde::Serialize;
2+
use serde::de::DeserializeOwned;
3+
4+
/// Market trait that has to be implemented by configs that can be stored in the database.
5+
///
6+
/// With this trait we can enforce at compile time which structs can actually be stored and avoid
7+
/// storing the wrong struct.
8+
pub trait Store: Serialize + DeserializeOwned {}

etl-api/src/db/destinations_pipelines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub async fn update_destination_and_pipeline(
101101
pipeline_id,
102102
source_id,
103103
destination_id,
104-
&pipeline_config,
104+
pipeline_config,
105105
)
106106
.await?;
107107

etl-api/src/db/pipelines.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub async fn create_pipeline(
5555
image_id: i64,
5656
config: FullApiPipelineConfig,
5757
) -> Result<i64, PipelinesDbError> {
58-
let config = serialize(&config)?;
58+
let config = serialize(StoredPipelineConfig::from(config))?;
5959

6060
let replicator_id = create_replicator(txn.deref_mut(), tenant_id, image_id).await?;
6161
let record = sqlx::query!(
@@ -134,12 +134,12 @@ pub async fn update_pipeline<'c, E>(
134134
pipeline_id: i64,
135135
source_id: i64,
136136
destination_id: i64,
137-
config: &FullApiPipelineConfig,
137+
config: FullApiPipelineConfig,
138138
) -> Result<Option<i64>, PipelinesDbError>
139139
where
140140
E: PgExecutor<'c>,
141141
{
142-
let pipeline_config = serialize(config)?;
142+
let pipeline_config = serialize(StoredPipelineConfig::from(config))?;
143143

144144
let record = sqlx::query!(
145145
r#"

etl-api/src/routes/pipelines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ pub async fn update_pipeline(
500500
pipeline_id,
501501
pipeline.source_id,
502502
pipeline.destination_id,
503-
&pipeline.config,
503+
pipeline.config,
504504
)
505505
.await?
506506
.ok_or(PipelineError::PipelineNotFound(pipeline_id))?;

0 commit comments

Comments
 (0)