Skip to content

Commit fde4f56

Browse files
committed
Implement CancellationToken support in Async methods.
Instead of cancelling the current socket operation as soon as possible (which might leave the connection in an unknown state), cancelling the CancellationToken triggers MySqlCommand.Cancel, which executes "KILL QUERY n" against the database, logically terminating the current statement. This should result in the eventual cancellation of the query and OperationCanceledException being raised on a subsequent call to ReadAsync (or similar). We assume that the same CancellationToken keeps being passed to all DB methods involved in reading a single result set, and that the slight delay in actual cancellation is not significant to the caller.
1 parent 3e4d8f5 commit fde4f56

File tree

5 files changed

+310
-64
lines changed

5 files changed

+310
-64
lines changed

src/MySqlConnector/MySqlClient/CommandExecutors/TextCommandExecutor.cs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,34 @@ public virtual async Task<object> ExecuteScalarAsync(string commandText, MySqlPa
5050
public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection,
5151
CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
5252
{
53-
m_command.Connection.Session.StartQuerying(m_command);
54-
m_command.LastInsertedId = -1;
55-
var statementPreparerOptions = StatementPreparerOptions.None;
56-
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)
57-
statementPreparerOptions |= StatementPreparerOptions.AllowUserVariables;
58-
if (m_command.Connection.OldGuids)
59-
statementPreparerOptions |= StatementPreparerOptions.OldGuids;
60-
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
61-
var payload = new PayloadData(preparer.ParseAndBindParameters());
62-
try
53+
cancellationToken.ThrowIfCancellationRequested();
54+
using (cancellationToken.Register(m_command.Cancel))
6355
{
64-
await m_command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
65-
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
66-
}
67-
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
68-
{
69-
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
70-
// use "decimal megabytes" (to round up) when creating the exception message
71-
int megabytes = payload.ArraySegment.Count / 1_000_000;
72-
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
56+
m_command.Connection.Session.StartQuerying(m_command);
57+
m_command.LastInsertedId = -1;
58+
var statementPreparerOptions = StatementPreparerOptions.None;
59+
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)
60+
statementPreparerOptions |= StatementPreparerOptions.AllowUserVariables;
61+
if (m_command.Connection.OldGuids)
62+
statementPreparerOptions |= StatementPreparerOptions.OldGuids;
63+
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
64+
var payload = new PayloadData(preparer.ParseAndBindParameters());
65+
try
66+
{
67+
await m_command.Connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
68+
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, CancellationToken.None).ConfigureAwait(false);
69+
}
70+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
71+
{
72+
throw new OperationCanceledException(cancellationToken);
73+
}
74+
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
75+
{
76+
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
77+
// use "decimal megabytes" (to round up) when creating the exception message
78+
int megabytes = payload.ArraySegment.Count / 1_000_000;
79+
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
80+
}
7381
}
7482
}
7583

src/MySqlConnector/MySqlClient/MySqlDataReader.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,20 @@ private ValueTask<ResultSet> ScanResultSetAsync(IOBehavior ioBehavior, ResultSet
108108

109109
private async Task<ResultSet> ScanResultSetAsyncAwaited(IOBehavior ioBehavior, ResultSet resultSet, CancellationToken cancellationToken)
110110
{
111-
m_resultSetBuffered = await resultSet.ReadResultSetHeaderAsync(ioBehavior, cancellationToken);
112-
return m_resultSetBuffered;
111+
using (cancellationToken.Register(Command.Cancel))
112+
{
113+
try
114+
{
115+
m_resultSetBuffered = await resultSet.ReadResultSetHeaderAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
116+
return m_resultSetBuffered;
117+
}
118+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
119+
{
120+
m_resultSetBuffered = null;
121+
cancellationToken.ThrowIfCancellationRequested();
122+
throw;
123+
}
124+
}
113125
}
114126

115127
public override string GetName(int ordinal) => GetResultSet().GetName(ordinal);

src/MySqlConnector/MySqlClient/Results/ResultSet.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,15 @@ private ValueTask<Row> ScanRowAsync(IOBehavior ioBehavior, Row row, Cancellation
162162
if (BufferState == ResultSetState.HasMoreData || BufferState == ResultSetState.NoMoreData || BufferState == ResultSetState.None)
163163
return new ValueTask<Row>((Row)null);
164164

165-
var payloadValueTask = Session.ReceiveReplyAsync(ioBehavior, cancellationToken);
166-
return payloadValueTask.IsCompletedSuccessfully
167-
? new ValueTask<Row>(ScanRowAsyncRemainder(payloadValueTask.Result))
168-
: new ValueTask<Row>(ScanRowAsyncAwaited(payloadValueTask.AsTask()));
165+
using (cancellationToken.Register(Command.Cancel))
166+
{
167+
var payloadValueTask = Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None);
168+
return payloadValueTask.IsCompletedSuccessfully
169+
? new ValueTask<Row>(ScanRowAsyncRemainder(payloadValueTask.Result))
170+
: new ValueTask<Row>(ScanRowAsyncAwaited(payloadValueTask.AsTask(), cancellationToken));
171+
}
169172

170-
async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask)
173+
async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask, CancellationToken token)
171174
{
172175
try
173176
{
@@ -176,6 +179,7 @@ async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask)
176179
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
177180
{
178181
BufferState = State = ResultSetState.NoMoreData;
182+
token.ThrowIfCancellationRequested();
179183
throw;
180184
}
181185
}

tests/SideBySide/CancelFixture.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Dapper;
2+
3+
namespace SideBySide
4+
{
5+
public class CancelFixture : DatabaseFixture
6+
{
7+
public CancelFixture()
8+
{
9+
Connection.Open();
10+
Connection.Execute(@"drop table if exists integers;
11+
create table integers(value int not null primary key);
12+
insert into integers(value) values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11),(12),(13),(14),(15),(16),(17),(18),(19),(20);");
13+
}
14+
}
15+
}

0 commit comments

Comments
 (0)