Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ pyvenv.cfg

# Log files
*.log

# Bin examples
!etl-examples/src/bin
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ etl = { git = "https://github.com/supabase/etl" }
etl-destinations = { git = "https://github.com/supabase/etl", features = ["bigquery"] }
```

- Redis

Out-of-the-box destinations are available in the `etl-destinations` crate:

```toml
[dependencies]
etl = { git = "https://github.com/supabase/etl" }
etl-destinations = { git = "https://github.com/supabase/etl", features = ["redis"] }
```

> Current limitation: doesn't support Redis in cluster mode yet.

## License

Apache‑2.0. See `LICENSE` for details.
Expand Down
20 changes: 14 additions & 6 deletions etl-destinations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ repository.workspace = true
homepage.workspace = true

[features]
redis = [
"dep:tracing",
"dep:tokio",
"dep:fred",
"dep:uuid",
"dep:serde",
"dep:serde_json",
]
bigquery = [
"dep:gcp-bigquery-client",
"dep:prost",
"dep:rustls",
"dep:tracing",
"dep:tokio",
"dep:base64",
Expand All @@ -20,7 +27,6 @@ iceberg = [
"dep:iceberg",
"dep:iceberg-catalog-rest",
"dep:arrow",
"dep:rustls",
"dep:tracing",
"dep:parquet",
"dep:uuid",
Expand All @@ -47,15 +53,17 @@ iceberg-catalog-rest = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, features = ["async", "arrow"] }
prost = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true, features = ["json"] }
rustls = { workspace = true, optional = true, features = [
rustls = { workspace = true, features = [
"aws-lc-rs",
"logging",
] }
serde = { workspace = true, optional = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["sync"] }
serde_json = {optional = true, workspace = true }
tokio = { workspace = true, optional = true, features = ["full", "sync"] }
tracing = { workspace = true, optional = true, default-features = true }
uuid = { workspace = true, optional = true, features = ["v4"] }
uuid = { workspace = true, optional = true, features = ["v4", "serde"] }
fred = { version = "10.1.0", optional = true, features = ["i-redis-json"] }
futures.workspace = true

[dev-dependencies]
etl = { workspace = true, features = ["test-utils"] }
Expand Down
2 changes: 2 additions & 0 deletions etl-destinations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ pub mod bigquery;
pub mod encryption;
#[cfg(feature = "iceberg")]
pub mod iceberg;
#[cfg(feature = "redis")]
pub mod redis;
199 changes: 199 additions & 0 deletions etl-destinations/src/redis/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use etl::types::{TableRow, TableSchema};
use fred::clients::Pipeline;
use fred::prelude::{
Client, ClientLike, EventInterface, FredResult, KeysInterface, Pool, ReconnectPolicy, Server,
ServerConfig, TcpConfig,
};
use fred::types::config::UnresponsiveConfig;
use fred::types::{Builder, Expiration, Key};
use futures::future::join_all;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error};

use crate::redis::RedisConfig;
use crate::redis::json_cell::JsonCell;

#[derive(Clone)]
pub(super) struct RedisClient {
client: Pool,
ttl: Option<Expiration>,
}

impl RedisClient {
pub(super) async fn new(config: RedisConfig) -> FredResult<Self> {
let pooled_client = Builder::default_centralized()
.with_config(|redis_config| {
redis_config.password = config.password;
redis_config.username = config.username;
redis_config.server = ServerConfig::Centralized {
server: Server::new(config.host, config.port),
};
})
.with_connection_config(|config| {
config.internal_command_timeout = Duration::from_secs(5);
config.reconnect_on_auth_error = true;
config.tcp = TcpConfig {
#[cfg(target_os = "linux")]
user_timeout: Some(Duration::from_secs(5)),
..Default::default()
};
config.unresponsive = UnresponsiveConfig {
max_timeout: Some(Duration::from_secs(10)),
interval: Duration::from_secs(3),
};
})
.with_performance_config(|config| {
config.default_command_timeout = Duration::from_secs(5);
})
.set_policy(ReconnectPolicy::new_exponential(0, 1, 2000, 5))
.build_pool(5)?;

for client in pooled_client.clients() {
// spawn tasks that listen for connection close or reconnect events
let mut error_rx = client.error_rx();
let mut reconnect_rx = client.reconnect_rx();
let mut unresponsive_rx = client.unresponsive_rx();

tokio::spawn(async move {
loop {
match error_rx.recv().await {
Ok((error, Some(server))) => {
error!("Redis client ({server:?}) error: {error:?}",);
}
Ok((error, None)) => {
error!("Redis client error: {error:?}",);
}
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
});

tokio::spawn(async move {
loop {
match unresponsive_rx.recv().await {
Ok(server) => {
error!("Redis client ({server:?}) unresponsive");
}
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
});

tokio::spawn(async move {
loop {
match reconnect_rx.recv().await {
Ok(server) => {
debug!("Redis client connected to {server:?}")
}
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
});
}
let client_handles = pooled_client.connect_pool();

debug!("Wait for connect");
pooled_client.wait_for_connect().await?;
debug!("Connected");

tokio::spawn(async move {
let _results = join_all(client_handles).await;
});

Ok(Self {
client: pooled_client,
ttl: config.ttl.map(Expiration::EX),
})
}

// Doesn't work with redis cluster
pub(super) async fn delete_by_table_name(&self, table_name: &str) -> FredResult<u64> {
let pattern = format!("{}::::*", table_name);
let mut cursor = "0".to_string();
let mut total_deleted = 0u64;

loop {
let (next_cursor, keys): (String, Vec<Key>) = self
.client
.scan_page(cursor, pattern.clone(), Some(100), None)
.await?;

if !keys.is_empty() {
let deleted: i64 = self.client.unlink(keys).await?;
total_deleted += deleted as u64;
}

cursor = next_cursor;
if cursor == "0" {
break;
}
}

Ok(total_deleted)
}

pub(super) async fn delete(&self, key: RedisKey) -> FredResult<()> {
self.client.del::<(), _>(key).await
}

pub(super) fn pipeline(&self) -> Pipeline<Client> {
self.client.next_connected().pipeline()
}

pub(super) async fn set(
&self,
pipeline: &Pipeline<Client>,
key: RedisKey,
map: HashMap<String, JsonCell<'_>>,
) -> Result<(), Box<dyn std::error::Error>> {
// We don't use json to be compliant with older versions of redis
pipeline
.set::<(), _, _>(
key,
serde_json::to_string(&map)?,
self.ttl.clone(),
None,
false,
)
.await?;

Ok(())
}
}

pub(super) struct RedisKey(String);

impl RedisKey {
pub(super) fn new(table_schema: &TableSchema, table_row: &TableRow) -> Self {
let table_name = &table_schema.name.name;
let primary_key = table_schema
.column_schemas
.iter()
.enumerate()
.filter_map(|(idx, col_schema)| {
if col_schema.primary {
let value =
serde_json::to_string(&JsonCell::Ref(table_row.values.get(idx)?)).ok()?; // FIXME: should be a parameter to avoid serializing twice
let col_name = &col_schema.name;
Some(format!("{col_name}:{value}"))
} else {
None
}
})
.collect::<Vec<String>>()
.join("::::");

Self(format!("{table_name}::::{primary_key}"))
}
}

impl From<RedisKey> for Key {
fn from(val: RedisKey) -> Self {
Key::from(val.0)
}
}
Loading