Skip to content

Commit 815c7c1

Browse files
IsaacHorvathweiznich
authored andcommitted
add mysql cancel token
1 parent 37e5e9e commit 815c7c1

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ mod transaction_manager;
128128
#[cfg(feature = "mysql")]
129129
#[doc(inline)]
130130
pub use self::mysql::AsyncMysqlConnection;
131+
#[cfg(feature = "mysql")]
132+
#[doc(inline)]
133+
pub use self::mysql::CancelToken;
131134
#[cfg(feature = "postgres")]
132135
#[doc(inline)]
133136
pub use self::pg::AsyncPgConnection;

src/mysql/cancel_token.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use mysql_async::prelude::Query;
2+
use mysql_async::{Opts, OptsBuilder};
3+
4+
use crate::mysql::error_helper::ErrorHelper;
5+
6+
/// The capability to request cancellation of in-progress queries on a
7+
/// connection.
8+
#[derive(Clone)]
9+
pub struct CancelToken {
10+
pub(crate) opts: Opts,
11+
pub(crate) kill_id: u32,
12+
}
13+
14+
impl CancelToken {
15+
/// Attempts to cancel the in-progress query on the connection associated
16+
/// with this `CancelToken`.
17+
///
18+
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
19+
/// only be returned if the client was unable to connect to the database.
20+
///
21+
/// Cancellation is inherently racy. There is no guarantee that the
22+
/// cancellation request will reach the server before the query terminates
23+
/// normally, or that the connection associated with this token is still
24+
/// active.
25+
pub async fn cancel_query(&self) -> Result<(), diesel::result::ConnectionError> {
26+
let builder = OptsBuilder::from_opts(self.opts.clone());
27+
28+
let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?;
29+
30+
format!("KILL QUERY {};", self.kill_id)
31+
.ignore(conn)
32+
.await
33+
.map_err(ErrorHelper)?;
34+
35+
Ok(())
36+
}
37+
}

src/mysql/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ use mysql_async::prelude::Queryable;
1919
use mysql_async::{Opts, OptsBuilder, Statement};
2020
use std::future::Future;
2121

22+
mod cancel_token;
2223
mod error_helper;
2324
mod row;
2425
mod serialize;
2526

27+
pub use self::cancel_token::CancelToken;
2628
use self::error_helper::ErrorHelper;
2729
use self::row::MysqlRow;
2830
use self::serialize::ToSqlHelper;
@@ -254,6 +256,14 @@ impl AsyncMysqlConnection {
254256
Ok(conn)
255257
}
256258

259+
/// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
260+
pub fn cancel_token(&self) -> CancelToken {
261+
let kill_id = self.conn.id();
262+
let opts = self.conn.opts().clone();
263+
264+
CancelToken { kill_id, opts }
265+
}
266+
257267
fn with_prepared_statement<'conn, T, F, R>(
258268
&'conn mut self,
259269
query: T,

tests/lib.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,51 @@ async fn postgres_cancel_token() {
167167
}
168168
}
169169

170+
#[cfg(feature = "mysql")]
171+
#[tokio::test]
172+
async fn mysql_cancel_token() {
173+
use std::time::Duration;
174+
175+
use diesel::result::{DatabaseErrorKind, Error};
176+
177+
let (sender, receiver) = tokio::sync::oneshot::channel();
178+
179+
// execute a long-running query on a separate thread
180+
let task = tokio::spawn(async move {
181+
let conn = &mut connection().await;
182+
let token = conn.cancel_token();
183+
184+
// send the token back to the main thread via a oneshot channel
185+
sender
186+
.send(token)
187+
.unwrap_or_else(|_| panic!("couldn't send token"));
188+
189+
diesel::dsl::sql::<diesel::sql_types::Integer>("SELECT SLEEP(5)")
190+
.load::<i32>(conn)
191+
.await
192+
});
193+
194+
// wait for the cancellation token to be sent
195+
if let Ok(token) = receiver.await {
196+
// give the query time to start before invoking the token
197+
tokio::time::sleep(Duration::from_millis(500)).await;
198+
token.cancel_query().await.unwrap();
199+
}
200+
201+
// make sure the query task resulted in a cancellation error or a return value of 1:
202+
match task.await.unwrap() {
203+
Err(e) => match e {
204+
Error::DatabaseError(DatabaseErrorKind::Unknown, v)
205+
if v.message() == "Query execution was interrupted" => {}
206+
_ => panic!("unexpected error: {:?}", e),
207+
},
208+
Ok(r) => match r[0] {
209+
1 => {}
210+
_ => panic!(""),
211+
},
212+
}
213+
}
214+
170215
#[cfg(feature = "postgres")]
171216
async fn setup(connection: &mut TestConnection) {
172217
diesel::sql_query(

0 commit comments

Comments
 (0)