Skip to content

Commit ed0fbf4

Browse files
committed
WIP migrations
1 parent 35b75a6 commit ed0fbf4

16 files changed

+442
-6
lines changed

common/src/database/redis.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ use crate::transformers::output_files_use_path::new;
5252

5353
use super::schema::{Migration, MigrationBase, Version};
5454

55+
const MIGRATION_TABLE_NAME: &str = "__schema_migrations";
56+
5557
#[async_trait]
5658
pub trait RedisMigration: Migration {
5759
/// Called when this migration is to be executed.
@@ -111,6 +113,18 @@ impl RedisDatabase {
111113

112114
Ok(db)
113115
}
116+
117+
/// Register a migration. If a migration with the same version is already registered, a warning
118+
/// is logged and the registration fails.
119+
pub fn register_migration(&mut self, migration: Arc<dyn RedisMigration + Send + Sync>) {
120+
let version = migration.version();
121+
if let Vacant(e) = self.migrations.entry(version) {
122+
e.insert(migration);
123+
} else {
124+
warn!("Migration with version {:?} is already registered", version);
125+
}
126+
}
127+
114128
async fn get_heartbeats_by_field(
115129
&self,
116130
fields: HashMap<RedisDomain, String>
@@ -457,28 +471,76 @@ impl Database for RedisDatabase {
457471
Ok(())
458472
}
459473

474+
/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
460475
async fn current_version(&self) -> Result<Option<Version>> {
461-
todo!()
476+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
477+
let key = MIGRATION_TABLE_NAME;
478+
let versions:Vec<String> = conn.zrange(key, -1, -1).await.context("There is no version info stored in DB.")?;
479+
let last_version = versions.last().and_then(|v| v.parse::<i64>().ok());
480+
Ok(last_version)
462481
}
463482

483+
/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
464484
async fn migrated_versions(&self) -> Result<BTreeSet<Version>> {
465-
todo!()
485+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
486+
let key = MIGRATION_TABLE_NAME;
487+
let versions:Vec<String> = conn.zrange(key, 0, -1).await.context("There is no version info stored in DB.")?;
488+
let result : BTreeSet<i64> = versions.into_iter().map(|v| v.parse::<i64>().context(format!("Failed to parse version: {}", v))).collect::<Result<_>>()?;
489+
Ok(result)
466490
}
467491

492+
/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
468493
async fn apply_migration(&self, version: Version) -> Result<()> {
469-
todo!()
494+
let migration = self
495+
.migrations
496+
.get(&version)
497+
.ok_or_else(|| anyhow!("Could not retrieve migration with version {}", version))?
498+
.clone();
499+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
500+
migration.up(&mut conn).await?;
501+
let key = MIGRATION_TABLE_NAME;
502+
let version = migration.version();
503+
let added_count: i64 = conn.zadd(key, version, version).await.context(format!("Unable to add version: {}", version))?;
504+
if added_count > 0 {
505+
println!("Successfully added version {} to sorted set", version);
506+
} else {
507+
println!("Version {} was not added (it may already exist)", version);
508+
}
509+
Ok(())
470510
}
471511

512+
/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
472513
async fn revert_migration(&self, version: Version) -> Result<()> {
473-
todo!()
514+
let migration = self
515+
.migrations
516+
.get(&version)
517+
.ok_or_else(|| anyhow!("Could not retrieve migration with version {}", version))?
518+
.clone();
519+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
520+
migration.down(&mut conn).await?;
521+
let key = MIGRATION_TABLE_NAME;
522+
let version = migration.version();
523+
let removed_count: i64 = conn.zrem(key, version).await.context("Failed to remove version")?;
524+
if removed_count > 0 {
525+
println!("Successfully removed version: {}", version);
526+
} else {
527+
println!("Version {} not found in the sorted set.", version);
528+
}
529+
Ok(())
474530
}
475531

532+
/// Create the tables required to keep track of schema state. If the tables already
533+
/// exist, this function has no operation.
476534
async fn setup_schema(&self) -> Result<()> {
477-
todo!()
535+
Ok(())
478536
}
479537

480538
async fn migrations(&self) -> BTreeMap<Version, Arc<dyn Migration + Send + Sync>> {
481-
todo!()
539+
let mut base_migrations = BTreeMap::new();
540+
for (version, migration) in self.migrations.iter() {
541+
base_migrations.insert(*version, migration.to_base());
542+
}
543+
base_migrations
482544
}
483545

484546
async fn get_stats(

common/src/database/schema/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::Database;
3434

3535
pub mod postgres;
3636
pub mod sqlite;
37+
pub mod redis;
3738

3839
/// The version type alias used to uniquely reference migrations.
3940
pub type Version = i64;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use anyhow::{Context, Result};
2+
use async_trait::async_trait;
3+
use redis::AsyncCommands;
4+
use crate::database::redis::RedisMigration;
5+
use crate::database::redisdomain::RedisDomain;
6+
use crate::migration;
7+
use deadpool_redis::*;
8+
9+
pub(super) struct CreateSubscriptionsTable;
10+
migration!(CreateSubscriptionsTable, 1, "create subscriptions table");
11+
12+
#[async_trait]
13+
impl RedisMigration for CreateSubscriptionsTable {
14+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
15+
Ok(())
16+
}
17+
18+
async fn down(&self, conn: &mut Connection) -> Result<()> {
19+
let key = format!("{}:{}:{}", RedisDomain::Subscription, RedisDomain::Any, RedisDomain::Any);
20+
let subs : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
21+
if !subs.is_empty() {
22+
let _: () = conn.del(subs).await.context("Failed to delete subscription data")?;
23+
}
24+
Ok(())
25+
}
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use anyhow::{Context, Result};
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::database::redisdomain::RedisDomain;
5+
use crate::migration;
6+
use deadpool_redis::*;
7+
use redis::AsyncCommands;
8+
9+
pub(super) struct CreateBookmarksTable;
10+
migration!(CreateBookmarksTable, 2, "create bookmarks table");
11+
12+
#[async_trait]
13+
impl RedisMigration for CreateBookmarksTable {
14+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
15+
Ok(())
16+
}
17+
18+
async fn down(&self, conn: &mut Connection) -> Result<()> {
19+
let key = format!("{}:{}:{}", RedisDomain::BookMark, RedisDomain::Any, RedisDomain::Any);
20+
let bms : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
21+
if !bms.is_empty() {
22+
let _: () = conn.del(bms).await.context("Failed to delete bookmark data")?;
23+
}
24+
Ok(())
25+
}
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use anyhow::{Context, Result};
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::database::redisdomain::RedisDomain;
5+
use crate::migration;
6+
use deadpool_redis::*;
7+
use redis::AsyncCommands;
8+
9+
pub(super) struct CreateHeartbeatsTable;
10+
migration!(CreateHeartbeatsTable, 3, "create heartbeats table");
11+
12+
#[async_trait]
13+
impl RedisMigration for CreateHeartbeatsTable {
14+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
15+
Ok(())
16+
}
17+
18+
async fn down(&self, conn: &mut Connection) -> Result<()> {
19+
let key = format!("{}:{}:{}", RedisDomain::Heartbeat, RedisDomain::Any, RedisDomain::Any);
20+
let hbs : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
21+
if !hbs.is_empty() {
22+
let _: () = conn.del(hbs).await.context("Failed to delete hearthbeat data")?;
23+
}
24+
Ok(())
25+
}
26+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::migration;
5+
use deadpool_redis::*;
6+
7+
pub(super) struct AddLastEventSeenFieldInHeartbeatsTable;
8+
migration!(
9+
AddLastEventSeenFieldInHeartbeatsTable,
10+
4,
11+
"add last_event_seen field in heartbeats table"
12+
);
13+
14+
#[async_trait]
15+
impl RedisMigration for AddLastEventSeenFieldInHeartbeatsTable {
16+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
17+
Ok(())
18+
}
19+
20+
async fn down(&self, _conn: &mut Connection) -> Result<()> {
21+
Ok(())
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::migration;
5+
use deadpool_redis::*;
6+
7+
pub(super) struct AddUriFieldInSubscriptionsTable;
8+
migration!(
9+
AddUriFieldInSubscriptionsTable,
10+
5,
11+
"add uri field in subscriptions table"
12+
);
13+
14+
#[async_trait]
15+
impl RedisMigration for AddUriFieldInSubscriptionsTable {
16+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
17+
Ok(())
18+
}
19+
20+
async fn down(&self, _conn: &mut Connection) -> Result<()> {
21+
Ok(())
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::migration;
5+
use deadpool_redis::*;
6+
7+
pub(super) struct AddContentFormatFieldInSubscriptionsTable;
8+
migration!(
9+
AddContentFormatFieldInSubscriptionsTable,
10+
6,
11+
"add content_format field in subscriptions table"
12+
);
13+
14+
#[async_trait]
15+
impl RedisMigration for AddContentFormatFieldInSubscriptionsTable {
16+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
17+
Ok(())
18+
}
19+
20+
async fn down(&self, _conn: &mut Connection) -> Result<()> {
21+
Ok(())
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::migration;
5+
use deadpool_redis::*;
6+
7+
pub(super) struct AddIgnoreChannelErrorFieldInSubscriptionsTable;
8+
migration!(
9+
AddIgnoreChannelErrorFieldInSubscriptionsTable,
10+
7,
11+
"add ignore_channel_error field in subscriptions table"
12+
);
13+
14+
#[async_trait]
15+
impl RedisMigration for AddIgnoreChannelErrorFieldInSubscriptionsTable {
16+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
17+
Ok(())
18+
}
19+
20+
async fn down(&self, _conn: &mut Connection) -> Result<()> {
21+
Ok(())
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use crate::database::redis::RedisMigration;
4+
use crate::migration;
5+
use deadpool_redis::*;
6+
7+
pub(super) struct AddPrincsFilterFieldsInSubscriptionsTable;
8+
migration!(
9+
AddPrincsFilterFieldsInSubscriptionsTable,
10+
8,
11+
"add princs_filter fields in subscriptions table"
12+
);
13+
14+
#[async_trait]
15+
impl RedisMigration for AddPrincsFilterFieldsInSubscriptionsTable {
16+
async fn up(&self, _conn: &mut Connection) -> Result<()> {
17+
Ok(())
18+
}
19+
20+
async fn down(&self, _conn: &mut Connection) -> Result<()> {
21+
Ok(())
22+
}
23+
}

0 commit comments

Comments
 (0)