Skip to content

Commit 20e763c

Browse files
committed
feat: allow spreading load across runners
1 parent ad42a6c commit 20e763c

File tree

5 files changed

+145
-21
lines changed

5 files changed

+145
-21
lines changed

TODO.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# ToDo
2+
3+
* [ ] Allowing skipping data part of the migration
4+
* [x] Allow concurrent instances (x of y)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
= Data migration guide
2+
3+
In some cases, it is necessary to also migrate data during an upgrade. This may require the re-ingestion of all stored
4+
documents.
5+
6+
This is the case, for example, a new field is added, which requires the extraction of information from the original
7+
document.
8+
9+
== Strategy
10+
11+
The overall strategy for this is:
12+
13+
* Prevent access to the in-migration database
14+
* Create new database features (columns, …) in a compatible way (e.g. "allow nulls")
15+
* Process re-ingestion of documents
16+
* Modify database features to the target definition (e.g. "remove nullable")
17+
* Switch to new software version
18+
* Grant access to the new database structures
19+
20+
This can be achieved with a read-only replica:
21+
22+
* Prevent access to the in-migration database
23+
* Create a read-only replica of the database
24+
* Reconfigure trustify to use the read-only replica
25+
* All mutable operations will fail, reporting `503 Service Unavailable`
26+
* Create new database features
27+
* Process re-ingestion of documents
28+
* Modify database features to the target definition
29+
* Switch to new software version
30+
* Grant access to the new database structures
31+
* Switch to new database version
32+
* Drop old, read-only replica
33+
34+
== Running the re-ingestion
35+
36+
The re-ingestion can be run in two ways:
37+
38+
* During the SeoORM migration
39+
* Before the SeoORM migration
40+
41+
The main difference is that when running during the SeoORM migration, you have less control over the process. It will
42+
be driven by the SeoORM migration, and you have to wait until everything is finished.
43+
44+
Running before the SeoORM migration, you can, for example, run multiple instances of the re-ingestion. You can also run
45+
re-ingestion on a database copy, and then switch over to replace the database with the migrated version.
46+
47+
Afterwards, you can run SeoORM migrations, which will then skip those DB modifications (as they are already applied) and
48+
also skip the re-ingestion.
49+
50+
== The lazy way
51+
52+
The lazy way, which is the default, will simply perform those steps during the SeaORM migration. However, there are a
53+
bunch of downsides. That's why it is not recommended for production setups. However, it may just work fine for small
54+
test setup. Making the process a lot easier.

migration/src/data/migration.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
async_trait,
3-
data::{Document, DocumentProcessor, Handler, Options},
3+
data::{Document, DocumentProcessor, Handler, Options, Partition},
44
};
55
use clap::Parser;
66
use futures::executor::block_on;
@@ -55,6 +55,7 @@ pub struct SchemaDataManager<'c> {
5555
pub manager: &'c SchemaManager<'c>,
5656
storage: &'c DispatchBackend,
5757
options: &'c Options,
58+
partition: Partition,
5859
}
5960

6061
impl<'c> SchemaDataManager<'c> {
@@ -63,10 +64,16 @@ impl<'c> SchemaDataManager<'c> {
6364
storage: &'c DispatchBackend,
6465
options: &'c Options,
6566
) -> Self {
67+
let partition = Partition {
68+
current: options.current,
69+
total: options.total,
70+
};
71+
6672
Self {
6773
manager,
6874
storage,
6975
options,
76+
partition,
7077
}
7178
}
7279

migration/src/data/mod.rs

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod migration;
2+
mod partition;
23
mod run;
34

45
pub use migration::*;
6+
pub use partition::*;
57
pub use run::*;
68

79
use anyhow::{anyhow, bail};
@@ -14,7 +16,7 @@ use sea_orm::{
1416
ConnectionTrait, DatabaseTransaction, DbErr, EntityTrait, ModelTrait, TransactionTrait,
1517
};
1618
use sea_orm_migration::{MigrationTrait, SchemaManager};
17-
use std::num::NonZeroUsize;
19+
use std::num::{NonZeroU64, NonZeroUsize};
1820
use trustify_common::id::Id;
1921
use trustify_entity::{sbom, source_document};
2022
use trustify_module_storage::service::{StorageBackend, StorageKey, dispatch::DispatchBackend};
@@ -27,7 +29,7 @@ pub enum Sbom {
2729

2830
#[allow(async_fn_in_trait)]
2931
pub trait Document: Sized + Send + Sync {
30-
type Model: Send;
32+
type Model: Partitionable + Send;
3133

3234
async fn all<C>(tx: &C) -> Result<Vec<Self::Model>, DbErr>
3335
where
@@ -95,14 +97,22 @@ where
9597

9698
#[derive(Clone, Debug, PartialEq, Eq, clap::Parser)]
9799
pub struct Options {
100+
/// Number of concurrent documents being processes
98101
#[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")]
99102
pub concurrent: NonZeroUsize,
103+
104+
#[arg(long, env = "MIGRATION_DATA_CURRENT_RUNNER", default_value = "0")]
105+
pub current: u64,
106+
#[arg(long, env = "MIGRATION_DATA_TOTAL_RUNNER", default_value = "1")]
107+
pub total: NonZeroU64,
100108
}
101109

102110
impl Default for Options {
103111
fn default() -> Self {
104112
Self {
105113
concurrent: unsafe { NonZeroUsize::new_unchecked(5) },
114+
current: 0,
115+
total: unsafe { NonZeroU64::new_unchecked(1) },
106116
}
107117
}
108118
}
@@ -128,30 +138,34 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
128138
where
129139
D: Document,
130140
{
141+
let partition = Partition::default();
131142
let db = self.get_connection();
132143

133144
let tx = db.begin().await?;
134145
let all = D::all(&tx).await?;
135146
drop(tx);
136147

137-
stream::iter(all)
138-
.map(async |model| {
139-
let tx = db.begin().await?;
140-
141-
let doc = D::source(&model, storage, &tx).await.map_err(|err| {
142-
DbErr::Migration(format!("Failed to load source document: {err}"))
143-
})?;
144-
f.call(doc, model, &tx).await.map_err(|err| {
145-
DbErr::Migration(format!("Failed to process document: {err}"))
146-
})?;
147-
148-
tx.commit().await?;
149-
150-
Ok::<_, DbErr>(())
151-
})
152-
.buffer_unordered(options.concurrent.into())
153-
.try_collect::<Vec<_>>()
154-
.await?;
148+
stream::iter(
149+
all.into_iter()
150+
.filter(|model| partition.is_selected::<D>(&model)),
151+
)
152+
.map(async |model| {
153+
let tx = db.begin().await?;
154+
155+
let doc = D::source(&model, storage, &tx).await.map_err(|err| {
156+
DbErr::Migration(format!("Failed to load source document: {err}"))
157+
})?;
158+
f.call(doc, model, &tx)
159+
.await
160+
.map_err(|err| DbErr::Migration(format!("Failed to process document: {err}")))?;
161+
162+
tx.commit().await?;
163+
164+
Ok::<_, DbErr>(())
165+
})
166+
.buffer_unordered(options.concurrent.into())
167+
.try_collect::<Vec<_>>()
168+
.await?;
155169

156170
Ok(())
157171
}

migration/src/data/partition.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use crate::data::Document;
2+
use std::hash::{DefaultHasher, Hash, Hasher};
3+
use std::num::{NonZeroU64, NonZeroUsize};
4+
use trustify_entity::sbom;
5+
6+
#[derive(Debug, Copy, Clone)]
7+
pub struct Partition {
8+
pub current: u64,
9+
pub total: NonZeroU64,
10+
}
11+
12+
pub trait Partitionable {
13+
fn hashed_id(&self) -> u64;
14+
}
15+
16+
impl Partitionable for sbom::Model {
17+
fn hashed_id(&self) -> u64 {
18+
let mut hasher = DefaultHasher::new();
19+
self.sbom_id.hash(&mut hasher);
20+
hasher.finish()
21+
}
22+
}
23+
24+
impl Default for Partition {
25+
fn default() -> Self {
26+
Self::new_one()
27+
}
28+
}
29+
30+
impl Partition {
31+
pub const fn new_one() -> Self {
32+
Self {
33+
current: 0,
34+
total: unsafe { NonZeroU64::new_unchecked(1) },
35+
}
36+
}
37+
38+
pub fn is_selected<D>(&self, document: &D::Model) -> bool
39+
where
40+
D: Document,
41+
D::Model: Partitionable,
42+
{
43+
document.hashed_id() % self.total == self.current
44+
}
45+
}

0 commit comments

Comments
 (0)