Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE TABLE kv_storage (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value BYTEA NOT NULL,
PRIMARY KEY (namespace, key)
);

CREATE TABLE set_storage (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value BYTEA NOT NULL,
PRIMARY KEY (namespace, key, value)
);

CREATE TABLE sorted_set_storage (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value_hash BYTEA NOT NULL,
value BYTEA NOT NULL,
score DOUBLE PRECISION NOT NULL,
PRIMARY KEY (namespace, key, value_hash)
);

CREATE INDEX idx_sorted_set_storage_namespace_key_score
ON sorted_set_storage (namespace, key, score);
25 changes: 25 additions & 0 deletions golem-worker-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ use crate::storage::indexed::sqlite::SqliteIndexedStorage;
use crate::storage::indexed::IndexedStorage;
use crate::storage::keyvalue::memory::InMemoryKeyValueStorage;
use crate::storage::keyvalue::multi_sqlite::MultiSqliteKeyValueStorage;
use crate::storage::keyvalue::postgres::PostgresKeyValueStorage;
use crate::storage::keyvalue::redis::RedisKeyValueStorage;
use crate::storage::keyvalue::routed::RoutedKeyValueStorage;
use crate::storage::keyvalue::KeyValueStorage;
use crate::workerctx::WorkerCtx;
use anyhow::anyhow;
Expand Down Expand Up @@ -357,6 +359,29 @@ pub async fn create_worker_executor_impl<Ctx: WorkerCtx, A: Bootstrap<Ctx> + ?Si
Arc::new(RedisKeyValueStorage::new(pool.clone()));
(Some(pool), None, key_value_storage)
}
KeyValueStorageConfig::Postgres(postgres) => {
let key_value_storage: Arc<dyn KeyValueStorage + Send + Sync> = Arc::new(
PostgresKeyValueStorage::configured(postgres)
.await
.map_err(|err| anyhow!(err))?,
);
(None, None, key_value_storage)
}
KeyValueStorageConfig::Routed(routed) => {
let redis_pool = RedisPool::configured(&routed.redis)
.await
.map_err(|err| anyhow!(err))?;
let redis_storage: Arc<dyn KeyValueStorage + Send + Sync> =
Arc::new(RedisKeyValueStorage::new(redis_pool.clone()));
let postgres_storage: Arc<dyn KeyValueStorage + Send + Sync> = Arc::new(
PostgresKeyValueStorage::configured(&routed.postgres)
.await
.map_err(|err| anyhow!(err))?,
);
let key_value_storage: Arc<dyn KeyValueStorage + Send + Sync> =
Arc::new(RoutedKeyValueStorage::new(redis_storage, postgres_storage));
(Some(redis_pool), None, key_value_storage)
}
KeyValueStorageConfig::InMemory(_) => {
(None, None, Arc::new(InMemoryKeyValueStorage::new()))
}
Expand Down
39 changes: 39 additions & 0 deletions golem-worker-executor/src/services/golem_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ impl SafeDisplay for OplogConfig {
#[serde(tag = "type", content = "config")]
pub enum KeyValueStorageConfig {
Redis(RedisConfig),
Postgres(KeyValueStoragePostgresConfig),
Routed(KeyValueStorageRoutedConfig),
Sqlite(DbSqliteConfig),
MultiSqlite(KeyValueStorageMultiSqliteConfig),
InMemory(KeyValueStorageInMemoryConfig),
Expand All @@ -568,6 +570,14 @@ impl SafeDisplay for KeyValueStorageConfig {
let _ = writeln!(&mut result, "redis:");
let _ = writeln!(&mut result, "{}", inner.to_safe_string_indented());
}
KeyValueStorageConfig::Postgres(inner) => {
let _ = writeln!(&mut result, "postgres:");
let _ = writeln!(&mut result, "{}", inner.to_safe_string_indented());
}
KeyValueStorageConfig::Routed(inner) => {
let _ = writeln!(&mut result, "routed:");
let _ = writeln!(&mut result, "{}", inner.to_safe_string_indented());
}
KeyValueStorageConfig::Sqlite(inner) => {
let _ = writeln!(&mut result, "sqlite:");
let _ = writeln!(&mut result, "{}", inner.to_safe_string_indented());
Expand All @@ -585,6 +595,35 @@ impl SafeDisplay for KeyValueStorageConfig {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeyValueStoragePostgresConfig {
#[serde(flatten)]
pub postgres: DbPostgresConfig,
}

impl SafeDisplay for KeyValueStoragePostgresConfig {
fn to_safe_string(&self) -> String {
self.postgres.to_safe_string()
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeyValueStorageRoutedConfig {
pub redis: RedisConfig,
pub postgres: KeyValueStoragePostgresConfig,
}

impl SafeDisplay for KeyValueStorageRoutedConfig {
fn to_safe_string(&self) -> String {
let mut result = String::new();
let _ = writeln!(&mut result, "redis:");
let _ = writeln!(&mut result, "{}", self.redis.to_safe_string_indented());
let _ = writeln!(&mut result, "postgres:");
let _ = writeln!(&mut result, "{}", self.postgres.to_safe_string_indented());
result
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeyValueStorageMultiSqliteConfig {
pub root_dir: PathBuf,
Expand Down
79 changes: 44 additions & 35 deletions golem-worker-executor/src/storage/indexed/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use include_dir::include_dir;
use sqlx::{Postgres, QueryBuilder};
use std::time::Duration;

static DB_MIGRATIONS: include_dir::Dir = include_dir!("$CARGO_MANIFEST_DIR/db/migration");
static DB_MIGRATIONS: include_dir::Dir =
include_dir!("$CARGO_MANIFEST_DIR/db/migration/postgres/indexed");

#[derive(Debug, Clone)]
pub struct PostgresIndexedStorage {
Expand All @@ -33,6 +34,8 @@ pub struct PostgresIndexedStorage {
}

impl PostgresIndexedStorage {
const APPEND_MANY_CHUNK_SIZE: usize = 1024;

pub async fn configured(config: &IndexedStoragePostgresConfig) -> Result<Self, String> {
if config.drop_prefix_delete_batch_size == 0 {
return Err(
Expand Down Expand Up @@ -239,20 +242,23 @@ impl IndexedStorage for PostgresIndexedStorage {
self.pool
.with_tx(svc_name, api_name, |tx| {
async move {
let mut query_builder = QueryBuilder::<Postgres>::new(
"INSERT INTO index_storage (namespace, key, id, value) ",
);

query_builder.push_values(converted_pairs, |mut builder, (id, value)| {
builder
.push_bind(namespace.clone())
.push_bind(key.clone())
.push_bind(id)
.push_bind(value);
});

let query = query_builder.build();
tx.execute(query).await?;
for chunk in converted_pairs.chunks(Self::APPEND_MANY_CHUNK_SIZE) {
let mut query_builder = QueryBuilder::<Postgres>::new(
"INSERT INTO index_storage (namespace, key, id, value) ",
);

query_builder.push_values(chunk.iter(), |mut builder, (id, value)| {
builder
.push_bind(namespace.clone())
.push_bind(key.clone())
.push_bind(*id)
.push_bind(value);
});

let query = query_builder.build();
tx.execute(query).await?;
}

Ok(())
}
.boxed()
Expand Down Expand Up @@ -413,27 +419,30 @@ impl IndexedStorage for PostgresIndexedStorage {
self.drop_prefix_delete_batch_size,
"drop_prefix_delete_batch_size",
)?;
let mut deleted_rows = self.drop_prefix_delete_batch_size;

while deleted_rows >= self.drop_prefix_delete_batch_size {
let query = sqlx::query(
"WITH rows AS (SELECT ctid FROM index_storage WHERE namespace = $1 AND key = $2 AND id <= $3 ORDER BY id LIMIT $4) DELETE FROM index_storage t USING rows WHERE t.ctid = rows.ctid;",
)
.bind(&namespace)
.bind(&key)
.bind(last_dropped_id)
.bind(batch_size_i64);

deleted_rows = self
.pool
.with_rw(svc_name, api_name)
.execute(query)
.await
.map_err(|err| err.to_safe_string())?
.rows_affected();
}
let delete_batch_size = self.drop_prefix_delete_batch_size;
self.pool
.with_tx(svc_name, api_name, |tx| {
async move {
let mut deleted_rows = delete_batch_size;

while deleted_rows >= delete_batch_size {
let query = sqlx::query(
"WITH rows AS (SELECT ctid FROM index_storage WHERE namespace = $1 AND key = $2 AND id <= $3 ORDER BY id LIMIT $4) DELETE FROM index_storage t USING rows WHERE t.ctid = rows.ctid;",
)
.bind(&namespace)
.bind(&key)
.bind(last_dropped_id)
.bind(batch_size_i64);

Ok(())
deleted_rows = tx.execute(query).await?.rows_affected();
}

Ok(())
}
.boxed()
})
.await
.map_err(|err| err.to_safe_string())
}
}

Expand Down
2 changes: 2 additions & 0 deletions golem-worker-executor/src/storage/keyvalue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

pub mod memory;
pub mod multi_sqlite;
pub mod postgres;
pub mod redis;
pub mod routed;
pub mod sqlite;

use async_trait::async_trait;
Expand Down
Loading
Loading