Skip to content

Commit 068858e

Browse files
authored
update migrate Transaction and AsyncTransaction execute functions (#346)
This allows avoiding double iteration when calling migrate with the grouped option active
1 parent b09e3cf commit 068858e

File tree

10 files changed

+81
-39
lines changed

10 files changed

+81
-39
lines changed

refinery_core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ description = "This crate should not be used directly, it is internally related
66
license = "MIT OR Apache-2.0"
77
documentation = "https://docs.rs/refinery/"
88
repository = "https://github.com/rust-db/refinery"
9-
edition = "2018"
9+
edition = "2021"
1010

1111
[features]
1212
default = []

refinery_core/src/drivers/config.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use std::convert::Infallible;
1818
impl Transaction for Config {
1919
type Error = Infallible;
2020

21-
fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
21+
fn execute<'a, T: Iterator<Item = &'a str>>(
22+
&mut self,
23+
_queries: T,
24+
) -> Result<usize, Self::Error> {
2225
Ok(0)
2326
}
2427
}
@@ -33,7 +36,10 @@ impl Query<Vec<Migration>> for Config {
3336
impl AsyncTransaction for Config {
3437
type Error = Infallible;
3538

36-
async fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
39+
async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
40+
&mut self,
41+
_queries: T,
42+
) -> Result<usize, Self::Error> {
3743
Ok(0)
3844
}
3945
}

refinery_core/src/drivers/mysql.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ fn query_applied_migrations(
4343
impl Transaction for Conn {
4444
type Error = MError;
4545

46-
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
46+
fn execute<'a, T: Iterator<Item = &'a str>>(
47+
&mut self,
48+
queries: T,
49+
) -> Result<usize, Self::Error> {
4750
let mut transaction = self.start_transaction(get_tx_opts())?;
4851
let mut count = 0;
49-
for query in queries.iter() {
52+
for query in queries {
5053
transaction.query_iter(query)?;
5154
count += 1;
5255
}
@@ -58,11 +61,14 @@ impl Transaction for Conn {
5861
impl Transaction for PooledConn {
5962
type Error = MError;
6063

61-
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
64+
fn execute<'a, T: Iterator<Item = &'a str>>(
65+
&mut self,
66+
queries: T,
67+
) -> Result<usize, Self::Error> {
6268
let mut transaction = self.start_transaction(get_tx_opts())?;
6369
let mut count = 0;
6470

65-
for query in queries.iter() {
71+
for query in queries {
6672
transaction.query_iter(query)?;
6773
count += 1;
6874
}

refinery_core/src/drivers/mysql_async.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,18 @@ async fn query_applied_migrations<'a>(
3939
impl AsyncTransaction for Pool {
4040
type Error = MError;
4141

42-
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
42+
async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
43+
&mut self,
44+
queries: T,
45+
) -> Result<usize, Self::Error> {
4346
let mut conn = self.get_conn().await?;
4447
let mut options = TxOpts::new();
4548
options.with_isolation_level(Some(IsolationLevel::ReadCommitted));
4649

4750
let mut transaction = conn.start_transaction(options).await?;
4851
let mut count = 0;
4952
for query in queries {
50-
transaction.query_drop(*query).await?;
53+
transaction.query_drop(query).await?;
5154
count += 1;
5255
}
5356
transaction.commit().await?;

refinery_core/src/drivers/postgres.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ fn query_applied_migrations(
3333
impl Transaction for PgClient {
3434
type Error = PgError;
3535

36-
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
36+
fn execute<'a, T: Iterator<Item = &'a str>>(
37+
&mut self,
38+
queries: T,
39+
) -> Result<usize, Self::Error> {
3740
let mut transaction = PgClient::transaction(self)?;
3841
let mut count = 0;
39-
for query in queries.iter() {
42+
for query in queries {
4043
PgTransaction::batch_execute(&mut transaction, query)?;
4144
count += 1;
4245
}

refinery_core/src/drivers/rusqlite.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ fn query_applied_migrations(
3232

3333
impl Transaction for RqlConnection {
3434
type Error = RqlError;
35-
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
35+
fn execute<'a, T: Iterator<Item = &'a str>>(
36+
&mut self,
37+
queries: T,
38+
) -> Result<usize, Self::Error> {
3639
let transaction = self.transaction()?;
3740
let mut count = 0;
38-
for query in queries.iter() {
41+
for query in queries {
3942
transaction.execute_batch(query)?;
4043
count += 1;
4144
}

refinery_core/src/drivers/tiberius.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,16 @@ where
4646
{
4747
type Error = Error;
4848

49-
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
49+
async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
50+
&mut self,
51+
queries: T,
52+
) -> Result<usize, Self::Error> {
5053
// Tiberius doesn't support transactions, see https://github.com/prisma/tiberius/issues/28
5154
self.simple_query("BEGIN TRAN T1;").await?;
5255
let mut count = 0;
5356
for query in queries {
5457
// Drop the returning `QueryStream<'a>` to avoid compiler complaning regarding lifetimes
55-
if let Err(err) = self.simple_query(*query).await.map(drop) {
58+
if let Err(err) = self.simple_query(query).await.map(drop) {
5659
if let Err(err) = self.simple_query("ROLLBACK TRAN T1").await {
5760
log::error!("could not ROLLBACK transaction, {}", err);
5861
}

refinery_core/src/drivers/tokio_postgres.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ async fn query_applied_migrations(
3535
impl AsyncTransaction for Client {
3636
type Error = PgError;
3737

38-
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
38+
async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
39+
&mut self,
40+
queries: T,
41+
) -> Result<usize, Self::Error> {
3942
let transaction = self.transaction().await?;
4043
let mut count = 0;
4144
for query in queries {

refinery_core/src/traits/async.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ use crate::traits::{
66
use crate::{Error, Migration, Report, Target};
77

88
use async_trait::async_trait;
9+
use std::ops::Deref;
910
use std::string::ToString;
1011

1112
#[async_trait]
1213
pub trait AsyncTransaction {
1314
type Error: std::error::Error + Send + Sync + 'static;
1415

15-
async fn execute(&mut self, query: &[&str]) -> Result<usize, Self::Error>;
16+
async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
17+
&mut self,
18+
queries: T,
19+
) -> Result<usize, Self::Error>;
1620
}
1721

1822
#[async_trait]
@@ -43,10 +47,13 @@ async fn migrate<T: AsyncTransaction>(
4347
migration.set_applied();
4448
let update_query = insert_migration_query(&migration, migration_table_name);
4549
transaction
46-
.execute(&[
47-
migration.sql().as_ref().expect("sql must be Some!"),
48-
&update_query,
49-
])
50+
.execute(
51+
[
52+
migration.sql().as_ref().expect("sql must be Some!"),
53+
update_query.as_str(),
54+
]
55+
.into_iter(),
56+
)
5057
.await
5158
.migration_err(
5259
&format!("error applying migration {}", migration),
@@ -105,10 +112,10 @@ async fn migrate_grouped<T: AsyncTransaction>(
105112
);
106113
}
107114

108-
let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect();
115+
let refs = grouped_migrations.iter().map(AsRef::as_ref);
109116

110117
transaction
111-
.execute(refs.as_ref())
118+
.execute(refs)
112119
.await
113120
.migration_err("error applying migrations", None)?;
114121

@@ -164,9 +171,11 @@ where
164171
target: Target,
165172
migration_table_name: &str,
166173
) -> Result<Report, Error> {
167-
self.execute(&[&Self::assert_migrations_table_query(migration_table_name)])
168-
.await
169-
.migration_err("error asserting migrations table", None)?;
174+
self.execute(
175+
[Self::assert_migrations_table_query(migration_table_name).as_str()].into_iter(),
176+
)
177+
.await
178+
.migration_err("error asserting migrations table", None)?;
170179

171180
let applied_migrations = self
172181
.query(

refinery_core/src/traits/sync.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::ops::Deref;
2+
13
use crate::error::WrapMigrationError;
24
use crate::traits::{
35
insert_migration_query, verify_migrations, ASSERT_MIGRATIONS_TABLE_QUERY,
@@ -8,7 +10,10 @@ use crate::{Error, Migration, Report, Target};
810
pub trait Transaction {
911
type Error: std::error::Error + Send + Sync + 'static;
1012

11-
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error>;
13+
fn execute<'a, T: Iterator<Item = &'a str>>(
14+
&mut self,
15+
queries: T,
16+
) -> Result<usize, Self::Error>;
1217
}
1318

1419
pub trait Query<T>: Transaction {
@@ -20,7 +25,7 @@ pub fn migrate<T: Transaction>(
2025
migrations: Vec<Migration>,
2126
target: Target,
2227
migration_table_name: &str,
23-
batched: bool,
28+
grouped: bool,
2429
) -> Result<Report, Error> {
2530
let mut migration_batch = Vec::new();
2631
let mut applied_migrations = Vec::new();
@@ -49,7 +54,7 @@ pub fn migrate<T: Transaction>(
4954
migration_batch.push(insert_migration);
5055
}
5156

52-
match (target, batched) {
57+
match (target, grouped) {
5358
(Target::Fake | Target::FakeVersion(_), _) => {
5459
log::info!("not going to apply any migration as fake flag is enabled");
5560
}
@@ -68,16 +73,14 @@ pub fn migrate<T: Transaction>(
6873
}
6974
};
7075

71-
let refs: Vec<&str> = migration_batch.iter().map(AsRef::as_ref).collect();
72-
73-
if batched {
76+
if grouped {
7477
transaction
75-
.execute(refs.as_ref())
78+
.execute(migration_batch.iter().map(Deref::deref))
7679
.migration_err("error applying migrations", None)?;
7780
} else {
78-
for (i, update) in refs.iter().enumerate() {
81+
for (i, update) in migration_batch.into_iter().enumerate() {
7982
transaction
80-
.execute(&[update])
83+
.execute([update.as_str()].into_iter())
8184
.migration_err("error applying update", Some(&applied_migrations[0..i / 2]))?;
8285
}
8386
}
@@ -92,10 +95,13 @@ where
9295
fn assert_migrations_table(&mut self, migration_table_name: &str) -> Result<usize, Error> {
9396
// Needed cause some database vendors like Mssql have a non sql standard way of checking the migrations table,
9497
// thou on this case it's just to be consistent with the async trait `AsyncMigrate`
95-
self.execute(&[ASSERT_MIGRATIONS_TABLE_QUERY
96-
.replace("%MIGRATION_TABLE_NAME%", migration_table_name)
97-
.as_str()])
98-
.migration_err("error asserting migrations table", None)
98+
self.execute(
99+
[ASSERT_MIGRATIONS_TABLE_QUERY
100+
.replace("%MIGRATION_TABLE_NAME%", migration_table_name)
101+
.as_str()]
102+
.into_iter(),
103+
)
104+
.migration_err("error asserting migrations table", None)
99105
}
100106

101107
fn get_last_applied_migration(

0 commit comments

Comments
 (0)