Skip to content

Commit d978ff1

Browse files
momobelweiznich
authored andcommitted
Add triggering code
1 parent b85a943 commit d978ff1

File tree

1 file changed

+61
-1
lines changed

1 file changed

+61
-1
lines changed

examples/sync-wrapper/src/main.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
44
use diesel_async::sync_connection_wrapper::SyncConnectionWrapper;
55
use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection};
66
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
7+
use futures_util::FutureExt;
78

89
// ordinary diesel model setup
910

@@ -15,7 +16,7 @@ table! {
1516
}
1617

1718
#[allow(dead_code)]
18-
#[derive(Debug, Queryable, Selectable)]
19+
#[derive(Debug, Queryable, QueryableByName, Selectable)]
1920
#[diesel(table_name = users)]
2021
struct User {
2122
id: i32,
@@ -47,6 +48,38 @@ where
4748
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
4849
}
4950

51+
async fn transaction(
52+
async_conn: &mut SyncConnectionWrapper<InnerConnection>,
53+
old_name: &str,
54+
new_name: &str,
55+
) -> Result<Vec<User>, diesel::result::Error> {
56+
async_conn
57+
.transaction::<Vec<User>, diesel::result::Error, _>(|c| {
58+
Box::pin(async {
59+
if old_name.is_empty() {
60+
Ok(Vec::new())
61+
} else {
62+
diesel::sql_query(
63+
r#"
64+
update
65+
users
66+
set
67+
name = ?2
68+
where
69+
name == ?1
70+
returning *
71+
"#,
72+
)
73+
.bind::<diesel::sql_types::Text, _>(old_name)
74+
.bind::<diesel::sql_types::Text, _>(new_name)
75+
.load(c)
76+
.await
77+
}
78+
})
79+
})
80+
.await
81+
}
82+
5083
#[tokio::main]
5184
async fn main() -> Result<(), Box<dyn std::error::Error>> {
5285
let db_url = std::env::var("DATABASE_URL").expect("Env var `DATABASE_URL` not set");
@@ -86,5 +119,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
86119
.await?;
87120
println!("{data:?}");
88121

122+
// let changed = transaction(&mut sync_wrapper, "iLuke", "JustLuke").await?;
123+
// println!("Changed {changed:?}");
124+
125+
// create an async connection for the migrations
126+
let mut conn_a: SyncConnectionWrapper<InnerConnection> = establish(&db_url).await?;
127+
let mut conn_b: SyncConnectionWrapper<InnerConnection> = establish(&db_url).await?;
128+
129+
tokio::spawn(async move {
130+
loop {
131+
let changed = transaction(&mut conn_a, "iLuke", "JustLuke").await;
132+
println!("Changed {changed:?}");
133+
std::thread::sleep(std::time::Duration::from_secs(1));
134+
}
135+
});
136+
137+
tokio::spawn(async move {
138+
loop {
139+
let changed = transaction(&mut conn_b, "JustLuke", "iLuke").await;
140+
println!("Changed {changed:?}");
141+
std::thread::sleep(std::time::Duration::from_secs(1));
142+
}
143+
});
144+
145+
loop {
146+
std::thread::sleep(std::time::Duration::from_secs(1));
147+
}
148+
89149
Ok(())
90150
}

0 commit comments

Comments
 (0)