Skip to content

Commit c89ae1a

Browse files
committed
fix: use a dedicated DB pool only for out-of-band data migrations
In case of in-band migrations, the sea orm migration manager will execute all migrations inside a single transaction. This leads to the problem, that concurrent operations are not possible. However, there can be the strategy to run data migrations upfront, using concurrency. And then re-run the migrations, but skipping the data migration.
1 parent d37d014 commit c89ae1a

File tree

7 files changed

+104
-43
lines changed

7 files changed

+104
-43
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ mime = "0.3.17"
9696
moka = "0.12.10"
9797
native-tls = "0.2"
9898
num-traits = "0.2"
99+
num_cpus = "1"
99100
oci-client = "0.16.0"
100101
openid = "0.22.0"
101102
openssl = "0.10"

migration/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ futures = { workspace = true }
2828
futures-util = { workspace = true }
2929
humantime = { workspace = true }
3030
indicatif = { workspace = true, features = ["tokio", "futures"] }
31+
num_cpus = { workspace = true }
3132
osv = { workspace = true, features = ["schema"] }
3233
sea-orm = { workspace = true }
3334
sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] }

migration/src/data/migration.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
};
55
use clap::Parser;
66
use futures::executor::block_on;
7-
use sea_orm::DbErr;
7+
use sea_orm::{DatabaseConnection, DbErr};
88
use sea_orm_migration::{MigrationName, MigrationTrait, SchemaManager};
99
use std::{ffi::OsString, ops::Deref, sync::LazyLock};
1010
use tokio::task_local;
@@ -79,18 +79,10 @@ impl MigrationWithData {
7979
}
8080
}
8181

82-
impl<M> From<M> for MigrationWithData
83-
where
84-
M: MigrationTraitWithData + 'static,
85-
{
86-
fn from(value: M) -> Self {
87-
MigrationWithData::new(Box::new(value))
88-
}
89-
}
90-
9182
/// A [`SchemaManager`], extended with data migration features.
9283
pub struct SchemaDataManager<'c> {
9384
pub manager: &'c SchemaManager<'c>,
85+
pub db: Option<&'c DatabaseConnection>,
9486
storage: &'c DispatchBackend,
9587
options: &'c Options,
9688
}
@@ -106,11 +98,13 @@ impl<'a> Deref for SchemaDataManager<'a> {
10698
impl<'c> SchemaDataManager<'c> {
10799
pub fn new(
108100
manager: &'c SchemaManager<'c>,
101+
db: Option<&'c DatabaseConnection>,
109102
storage: &'c DispatchBackend,
110103
options: &'c Options,
111104
) -> Self {
112105
Self {
113106
manager,
107+
db,
114108
storage,
115109
options,
116110
}
@@ -126,7 +120,18 @@ impl<'c> SchemaDataManager<'c> {
126120
return Ok(());
127121
}
128122

129-
self.manager.process(self.storage, self.options, f).await
123+
match self.db {
124+
Some(db) => {
125+
self.manager
126+
.process(db, self.storage, self.options, f)
127+
.await
128+
}
129+
None => {
130+
self.manager
131+
.process(self.manager.get_connection(), self.storage, self.options, f)
132+
.await
133+
}
134+
}
130135
}
131136
}
132137

@@ -141,7 +146,7 @@ impl MigrationTrait for MigrationWithData {
141146
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
142147
MigrationTraitWithData::up(
143148
&*self.migration,
144-
&SchemaDataManager::new(manager, &self.storage, &self.options),
149+
&SchemaDataManager::new(manager, None, &self.storage, &self.options),
145150
)
146151
.await
147152
.inspect_err(|err| tracing::warn!("Migration failed: {err}"))
@@ -150,7 +155,7 @@ impl MigrationTrait for MigrationWithData {
150155
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
151156
MigrationTraitWithData::down(
152157
&*self.migration,
153-
&SchemaDataManager::new(manager, &self.storage, &self.options),
158+
&SchemaDataManager::new(manager, None, &self.storage, &self.options),
154159
)
155160
.await
156161
}

migration/src/data/mod.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,10 @@ use futures_util::{
1313
stream::{self, TryStreamExt},
1414
};
1515
use indicatif::{ProgressBar, ProgressStyle};
16-
use sea_orm::{DatabaseTransaction, DbErr, TransactionTrait};
16+
use sea_orm::{ConnectionTrait, DatabaseTransaction, DbErr, TransactionTrait};
1717
use sea_orm_migration::{MigrationTrait, SchemaManager};
18-
use std::{
19-
num::{NonZeroU64, NonZeroUsize},
20-
sync::Arc,
21-
};
18+
use std::{num::NonZeroU64, sync::Arc};
19+
use tracing::log;
2220
use trustify_module_storage::service::dispatch::DispatchBackend;
2321

2422
/// A handler for processing a [`Document`] data migration.
@@ -43,8 +41,10 @@ where
4341
#[derive(Clone, Debug, PartialEq, Eq, clap::Parser)]
4442
pub struct Options {
4543
/// Number of concurrent documents being processes
46-
#[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")]
47-
pub concurrent: NonZeroUsize,
44+
///
45+
/// If the value is zero, use the number of logical CPUs
46+
#[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "0")]
47+
pub concurrent: usize,
4848

4949
/// The instance number of the current runner (zero based)
5050
#[arg(long, env = "MIGRATION_DATA_CURRENT_RUNNER", default_value = "0")]
@@ -70,7 +70,7 @@ pub struct Options {
7070
impl Default for Options {
7171
fn default() -> Self {
7272
Self {
73-
concurrent: unsafe { NonZeroUsize::new_unchecked(5) },
73+
concurrent: 5,
7474
current: 0,
7575
total: unsafe { NonZeroU64::new_unchecked(1) },
7676
skip_all: false,
@@ -118,6 +118,7 @@ impl From<&Options> for Partition {
118118
pub trait DocumentProcessor {
119119
fn process<D>(
120120
&self,
121+
db: &(impl ConnectionTrait + TransactionTrait),
121122
storage: &DispatchBackend,
122123
options: &Options,
123124
f: impl Handler<D>,
@@ -165,6 +166,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
165166
/// actual system is still running from the read-only clone of the original data.
166167
async fn process<D>(
167168
&self,
169+
db: &(impl ConnectionTrait + TransactionTrait),
168170
storage: &DispatchBackend,
169171
options: &Options,
170172
f: impl Handler<D>,
@@ -173,7 +175,6 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
173175
D: Document,
174176
{
175177
let partition: Partition = options.into();
176-
let db = self.get_connection();
177178

178179
let tx = db.begin().await?;
179180
let all: Vec<_> = D::all(&tx)
@@ -189,12 +190,21 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
189190
ProgressStyle::with_template(
190191
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ({per_sec}) ({eta})",
191192
)
192-
.map_err(|err| DbErr::Migration(err.to_string()))?
193-
.progress_chars("##-"),
193+
.map_err(|err| DbErr::Migration(err.to_string()))?
194+
.progress_chars("#>-"),
194195
);
195196

196197
let pb = Some(pb);
197198

199+
// get concurrency value
200+
let mut concurrent = options.concurrent;
201+
if concurrent == 0 {
202+
// if zero, use number of logical CPUs
203+
concurrent = num_cpus::get();
204+
}
205+
206+
log::info!("Running {concurrent} parallel operations");
207+
198208
stream::iter(all)
199209
.map(async |model| {
200210
let tx = db.begin().await?;
@@ -220,7 +230,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
220230

221231
Ok::<_, DbErr>(())
222232
})
223-
.buffer_unordered(options.concurrent.into())
233+
.buffer_unordered(concurrent)
224234
.try_collect::<Vec<_>>()
225235
.await?;
226236

migration/src/data/run.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::data::{MigratorWithData, Options, SchemaDataManager};
22
use anyhow::bail;
3-
use sea_orm::ConnectOptions;
3+
use sea_orm::{ConnectOptions, DatabaseConnection, DbErr};
44
use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager};
55
use std::{collections::HashMap, time::SystemTime};
66
use trustify_module_storage::service::dispatch::DispatchBackend;
@@ -20,9 +20,39 @@ pub struct Runner {
2020
pub options: Options,
2121
}
2222

23+
#[derive(Clone)]
2324
pub enum Database {
2425
Config { url: String, schema: Option<String> },
25-
Provided(sea_orm::DatabaseConnection),
26+
Provided(DatabaseConnection),
27+
}
28+
29+
impl From<DatabaseConnection> for Database {
30+
fn from(value: DatabaseConnection) -> Self {
31+
Self::Provided(value)
32+
}
33+
}
34+
35+
impl From<trustify_common::db::Database> for Database {
36+
fn from(value: trustify_common::db::Database) -> Self {
37+
Self::Provided(value.into_connection())
38+
}
39+
}
40+
41+
impl Database {
42+
pub async fn try_into_connection(self) -> Result<DatabaseConnection, DbErr> {
43+
Ok(match self {
44+
Self::Config { url, schema } => {
45+
let schema = schema.clone().unwrap_or_else(|| "public".to_owned());
46+
47+
let connect_options = ConnectOptions::new(url)
48+
.set_schema_search_path(schema)
49+
.to_owned();
50+
51+
sea_orm::Database::connect(connect_options).await?
52+
}
53+
Self::Provided(database) => database.clone(),
54+
})
55+
}
2656
}
2757

2858
impl Runner {
@@ -41,21 +71,11 @@ impl Runner {
4171
running.push(migration);
4272
}
4373

44-
let database = match self.database {
45-
Database::Config { url, schema } => {
46-
let schema = schema.unwrap_or_else(|| "public".to_owned());
47-
48-
let connect_options = ConnectOptions::new(url)
49-
.set_schema_search_path(schema)
50-
.to_owned();
51-
52-
sea_orm::Database::connect(connect_options).await?
53-
}
54-
Database::Provided(database) => database,
55-
};
74+
let database = self.database.clone().try_into_connection().await?;
5675

5776
let manager = SchemaManager::new(database.into_schema_manager_connection());
58-
let manager = SchemaDataManager::new(&manager, &self.storage, &self.options);
77+
let manager =
78+
SchemaDataManager::new(&manager, Some(&database), &self.storage, &self.options);
5979

6080
for run in running {
6181
tracing::info!(name = run.name(), "Running data migration");

migration/tests/data/m0002010.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use migration::{
44
data::{Database, Direction, MigrationWithData, Options, Runner},
55
};
66
use sea_orm_migration::MigratorTrait;
7-
use std::num::NonZeroUsize;
87
use test_context::test_context;
98
use test_log::test;
9+
use tracing::log;
1010
use trustify_test_context::{TrustifyMigrationContext, commit, dump};
1111

1212
commit!(Commit("6d3ea814b4b44fe16ea8f21724dda5abb0fc7932"));
@@ -66,13 +66,36 @@ dump!(
6666
ignore = "enable with: cargo test --features long_running"
6767
)]
6868
async fn performance(ctx: &TrustifyMigrationContext<Ds4>) -> Result<(), anyhow::Error> {
69+
let migrations = vec![
70+
"m0002000_add_sbom_properties".into(),
71+
"m0002010_add_advisory_scores".into(),
72+
];
73+
74+
// we simulate running the migrations out-of-band
75+
76+
log::info!("Running data migrations out-of-band");
77+
78+
Runner {
79+
database: ctx.db.clone().into(),
80+
storage: ctx.storage.clone().into(),
81+
direction: Default::default(),
82+
migrations: migrations.clone(),
83+
options: Default::default(),
84+
}
85+
.run::<Migrator>()
86+
.await?;
87+
88+
// now run the standard migration, skipping the out-of-band ones
89+
90+
log::info!("Running migrations");
91+
6992
MigrationWithData::run_with_test(
7093
ctx.storage.clone(),
7194
Options {
72-
concurrent: NonZeroUsize::new(32).unwrap(),
95+
skip: migrations,
7396
..Options::default()
7497
},
75-
async { MigratorTest::up(&ctx.db, None).await },
98+
async { Migrator::up(&ctx.db, None).await },
7699
)
77100
.await?;
78101

0 commit comments

Comments
 (0)