Skip to content

Commit c339949

Browse files
authored
Merge pull request #258 from IsaacHorvath/mysql_cancel
Add a cancellation token to AsyncMysqlConnection
2 parents 37e5e9e + ce9b84a commit c339949

File tree

4 files changed

+96
-0
lines changed

4 files changed

+96
-0
lines changed

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ mod transaction_manager;
128128
#[cfg(feature = "mysql")]
129129
#[doc(inline)]
130130
pub use self::mysql::AsyncMysqlConnection;
131+
#[cfg(feature = "mysql")]
132+
#[doc(inline)]
133+
pub use self::mysql::MysqlCancelToken;
131134
#[cfg(feature = "postgres")]
132135
#[doc(inline)]
133136
pub use self::pg::AsyncPgConnection;

src/mysql/cancel_token.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use mysql_async::prelude::Query;
2+
use mysql_async::{Opts, OptsBuilder};
3+
4+
use crate::mysql::error_helper::ErrorHelper;
5+
6+
/// The capability to request cancellation of in-progress queries on a
7+
/// connection.
8+
#[derive(Clone)]
9+
pub struct MysqlCancelToken {
10+
pub(crate) opts: Opts,
11+
pub(crate) kill_id: u32,
12+
}
13+
14+
impl MysqlCancelToken {
15+
/// Attempts to cancel the in-progress query on the connection associated
16+
/// with this `CancelToken`.
17+
///
18+
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
19+
/// only be returned if the client was unable to connect to the database.
20+
///
21+
/// Cancellation is inherently racy. There is no guarantee that the
22+
/// cancellation request will reach the server before the query terminates
23+
/// normally, or that the connection associated with this token is still
24+
/// active.
25+
pub async fn cancel_query(&self) -> diesel::result::ConnectionResult<()> {
26+
let builder = OptsBuilder::from_opts(self.opts.clone());
27+
28+
let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?;
29+
30+
format!("KILL QUERY {};", self.kill_id)
31+
.ignore(conn)
32+
.await
33+
.map_err(ErrorHelper)?;
34+
35+
Ok(())
36+
}
37+
}

src/mysql/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ use mysql_async::prelude::Queryable;
1919
use mysql_async::{Opts, OptsBuilder, Statement};
2020
use std::future::Future;
2121

22+
mod cancel_token;
2223
mod error_helper;
2324
mod row;
2425
mod serialize;
2526

27+
pub use self::cancel_token::MysqlCancelToken;
2628
use self::error_helper::ErrorHelper;
2729
use self::row::MysqlRow;
2830
use self::serialize::ToSqlHelper;
@@ -254,6 +256,14 @@ impl AsyncMysqlConnection {
254256
Ok(conn)
255257
}
256258

259+
/// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
260+
pub fn cancel_token(&self) -> MysqlCancelToken {
261+
let kill_id = self.conn.id();
262+
let opts = self.conn.opts().clone();
263+
264+
MysqlCancelToken { kill_id, opts }
265+
}
266+
257267
fn with_prepared_statement<'conn, T, F, R>(
258268
&'conn mut self,
259269
query: T,

tests/lib.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,52 @@ async fn postgres_cancel_token() {
167167
}
168168
}
169169

170+
#[cfg(feature = "mysql")]
171+
#[tokio::test]
172+
async fn mysql_cancel_token() {
173+
use diesel::result::{DatabaseErrorKind, Error};
174+
use std::time::Duration;
175+
176+
let (sender, receiver) = tokio::sync::oneshot::channel();
177+
178+
// execute a long-running query in a separate future
179+
let query_future = async move {
180+
let conn = &mut connection().await;
181+
let token = conn.cancel_token();
182+
183+
// send the token back to the main thread via a oneshot channel
184+
sender
185+
.send(token)
186+
.unwrap_or_else(|_| panic!("couldn't send token"));
187+
188+
diesel::dsl::sql::<diesel::sql_types::Integer>("SELECT SLEEP(5)")
189+
.get_result::<i32>(conn)
190+
.await
191+
};
192+
let cancel_future = async move {
193+
// wait for the cancellation token to be sent
194+
if let Ok(token) = receiver.await {
195+
// give the query time to start before invoking the token
196+
tokio::time::sleep(Duration::from_millis(500)).await;
197+
token.cancel_query().await.unwrap();
198+
} else {
199+
panic!("Failed to receive cancel token");
200+
}
201+
};
202+
203+
let (task, _) = tokio::join!(query_future, cancel_future);
204+
205+
// make sure the query task resulted in a cancellation error or a return value of 1:
206+
match task {
207+
Err(Error::DatabaseError(DatabaseErrorKind::Unknown, v))
208+
if v.message() == "Query execution was interrupted" => {}
209+
Err(e) => panic!("unexpected error: {:?}", e),
210+
// mysql 8.4 returns 1 from a canceled sleep instead of an error
211+
Ok(1) => {}
212+
Ok(_) => panic!("query completed successfully without cancellation"),
213+
}
214+
}
215+
170216
#[cfg(feature = "postgres")]
171217
async fn setup(connection: &mut TestConnection) {
172218
diesel::sql_query(

0 commit comments

Comments
 (0)