Skip to content

Commit b86ec58

Browse files
authored
Merge pull request #229 from bgrainger/cancel-reader
Implement reader cancellation. Fixes #3
2 parents 456b253 + 2d81cf4 commit b86ec58

File tree

10 files changed

+763
-71
lines changed

10 files changed

+763
-71
lines changed

src/MySqlConnector/MySqlClient/CommandExecutors/TextCommandExecutor.cs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,34 @@ public virtual async Task<object> ExecuteScalarAsync(string commandText, MySqlPa
5050
public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection,
5151
CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
5252
{
53-
m_command.LastInsertedId = -1;
54-
var statementPreparerOptions = StatementPreparerOptions.None;
55-
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)
56-
statementPreparerOptions |= StatementPreparerOptions.AllowUserVariables;
57-
if (m_command.Connection.OldGuids)
58-
statementPreparerOptions |= StatementPreparerOptions.OldGuids;
59-
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
60-
var payload = new PayloadData(preparer.ParseAndBindParameters());
61-
try
53+
cancellationToken.ThrowIfCancellationRequested();
54+
using (cancellationToken.Register(m_command.Cancel))
6255
{
63-
await m_command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
64-
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
65-
}
66-
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
67-
{
68-
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
69-
// use "decimal megabytes" (to round up) when creating the exception message
70-
int megabytes = payload.ArraySegment.Count / 1_000_000;
71-
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
56+
m_command.Connection.Session.StartQuerying(m_command);
57+
m_command.LastInsertedId = -1;
58+
var statementPreparerOptions = StatementPreparerOptions.None;
59+
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)
60+
statementPreparerOptions |= StatementPreparerOptions.AllowUserVariables;
61+
if (m_command.Connection.OldGuids)
62+
statementPreparerOptions |= StatementPreparerOptions.OldGuids;
63+
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
64+
var payload = new PayloadData(preparer.ParseAndBindParameters());
65+
try
66+
{
67+
await m_command.Connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
68+
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior).ConfigureAwait(false);
69+
}
70+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
71+
{
72+
throw new OperationCanceledException(cancellationToken);
73+
}
74+
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
75+
{
76+
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
77+
// use "decimal megabytes" (to round up) when creating the exception message
78+
int megabytes = payload.ArraySegment.Count / 1_000_000;
79+
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
80+
}
7281
}
7382
}
7483

src/MySqlConnector/MySqlClient/MySqlCommand.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ public MySqlCommand(string commandText, MySqlConnection connection, MySqlTransac
4848
}
4949
}
5050

51-
public override void Cancel()
52-
{
53-
// documentation says this shouldn't throw (but just fail silently), but for now make it explicit that this doesn't work
54-
throw new NotSupportedException("Use the Async overloads with a CancellationToken.");
55-
}
51+
public override void Cancel() => Connection.Cancel(this);
5652

5753
public override int ExecuteNonQuery() =>
5854
ExecuteNonQueryAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
@@ -177,8 +173,6 @@ private void VerifyValid()
177173
throw new InvalidOperationException("The transaction associated with this command is not the connection's active transaction.");
178174
if (string.IsNullOrWhiteSpace(CommandText))
179175
throw new InvalidOperationException("CommandText must be specified");
180-
if (Connection.ActiveReader != null)
181-
throw new MySqlException("There is already an open DataReader associated with this Connection which must be closed first.");
182176
}
183177

184178
internal void ReaderClosed() => (m_commandExecutor as StoredProcedureCommandExecutor)?.SetParams();

src/MySqlConnector/MySqlClient/MySqlConnection.cs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,37 @@ internal MySqlSession Session
209209
}
210210
}
211211

212+
internal void Cancel(MySqlCommand command)
213+
{
214+
var session = Session;
215+
if (!session.TryStartCancel(command))
216+
return;
217+
218+
try
219+
{
220+
// open a dedicated connection to the server to kill the active query
221+
var csb = new MySqlConnectionStringBuilder(m_connectionStringBuilder.GetConnectionString(includePassword: true));
222+
csb.Pooling = false;
223+
if (m_session.IPAddress != null)
224+
csb.Server = m_session.IPAddress.ToString();
225+
csb.ConnectionTimeout = 3u;
226+
227+
using (var connection = new MySqlConnection(csb.ConnectionString))
228+
{
229+
connection.Open();
230+
using (var killCommand = new MySqlCommand("KILL QUERY {0}".FormatInvariant(command.Connection.ServerThread), connection))
231+
{
232+
session.DoCancel(command, killCommand);
233+
}
234+
}
235+
}
236+
catch (MySqlException)
237+
{
238+
// cancelling the query failed; setting the state back to 'Querying' will allow another call to 'Cancel' to try again
239+
session.AbortCancel(command);
240+
}
241+
}
242+
212243
internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, string name, CancellationToken cancellationToken)
213244
{
214245
if (State != ConnectionState.Open)
@@ -230,7 +261,7 @@ internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, s
230261
}
231262

232263
internal MySqlTransaction CurrentTransaction { get; set; }
233-
internal MySqlDataReader ActiveReader { get; set; }
264+
internal MySqlDataReader ActiveReader => m_session.ActiveReader;
234265
internal bool AllowUserVariables => m_connectionSettings.AllowUserVariables;
235266
internal bool BufferResultSets => m_connectionSettings.BufferResultSets;
236267
internal bool ConvertZeroDateTime => m_connectionSettings.ConvertZeroDateTime;
@@ -309,11 +340,8 @@ private void DoClose()
309340
private void CloseDatabase()
310341
{
311342
m_cachedProcedures = null;
312-
if (ActiveReader != null)
313-
{
314-
ActiveReader.Dispose();
315-
ActiveReader = null;
316-
}
343+
if (Session.ActiveReader != null)
344+
Session.ActiveReader.Dispose();
317345
if (CurrentTransaction != null && m_session.IsConnected)
318346
{
319347
CurrentTransaction.Dispose();

src/MySqlConnector/MySqlClient/MySqlDataReader.cs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,20 @@ private ValueTask<ResultSet> ScanResultSetAsync(IOBehavior ioBehavior, ResultSet
108108

109109
private async Task<ResultSet> ScanResultSetAsyncAwaited(IOBehavior ioBehavior, ResultSet resultSet, CancellationToken cancellationToken)
110110
{
111-
m_resultSetBuffered = await resultSet.ReadResultSetHeaderAsync(ioBehavior, cancellationToken);
112-
return m_resultSetBuffered;
111+
using (cancellationToken.Register(Command.Cancel))
112+
{
113+
try
114+
{
115+
m_resultSetBuffered = await resultSet.ReadResultSetHeaderAsync(ioBehavior).ConfigureAwait(false);
116+
return m_resultSetBuffered;
117+
}
118+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
119+
{
120+
m_resultSetBuffered = null;
121+
cancellationToken.ThrowIfCancellationRequested();
122+
throw;
123+
}
124+
}
113125
}
114126

115127
public override string GetName(int ordinal) => GetResultSet().GetName(ordinal);
@@ -212,22 +224,37 @@ protected override void Dispose(bool disposing)
212224
internal MySqlConnection Connection => Command?.Connection;
213225
internal MySqlSession Session => Command?.Connection.Session;
214226

215-
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
227+
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior)
216228
{
217229
var dataReader = new MySqlDataReader(command, behavior);
218-
await dataReader.ReadFirstResultSetAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
219-
command.Connection.ActiveReader = dataReader;
220-
if (command.Connection.BufferResultSets)
230+
command.Connection.Session.SetActiveReader(dataReader);
231+
232+
try
233+
{
234+
await dataReader.ReadFirstResultSetAsync(ioBehavior).ConfigureAwait(false);
235+
if (command.Connection.BufferResultSets)
236+
{
237+
while (await dataReader.BufferNextResultAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false) != null)
238+
{
239+
}
240+
}
241+
return dataReader;
242+
}
243+
catch (Exception)
244+
{
245+
dataReader.Dispose();
246+
throw;
247+
}
248+
finally
221249
{
222-
while (await dataReader.BufferNextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false) != null);
223-
command.Connection.ActiveReader = null;
250+
if (command.Connection.BufferResultSets)
251+
command.Connection.Session.FinishQuerying();
224252
}
225-
return dataReader;
226253
}
227254

228-
internal async Task ReadFirstResultSetAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
255+
internal async Task ReadFirstResultSetAsync(IOBehavior ioBehavior)
229256
{
230-
m_resultSet = await new ResultSet(this).ReadResultSetHeaderAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
257+
m_resultSet = await new ResultSet(this).ReadResultSetHeaderAsync(ioBehavior).ConfigureAwait(false);
231258
ActivateResultSet(m_resultSet);
232259
m_resultSetBuffered = m_resultSet;
233260
}
@@ -242,16 +269,24 @@ private void DoClose()
242269
{
243270
if (Command != null)
244271
{
245-
while (NextResult())
272+
try
273+
{
274+
while (NextResult())
275+
{
276+
}
277+
}
278+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
246279
{
280+
// ignore "Query execution was interrupted" exceptions when closing a data reader
247281
}
282+
248283
m_resultSet = null;
249284
m_resultSetBuffered = null;
250285
m_nextResultSetBuffer.Clear();
251286

252287
var connection = Command.Connection;
253-
if (connection.ActiveReader == this)
254-
connection.ActiveReader = null;
288+
if (!connection.BufferResultSets)
289+
connection.Session.FinishQuerying();
255290
Command.ReaderClosed();
256291
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
257292
{
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace MySql.Data.MySqlClient
2+
{
3+
/// <summary>
4+
/// MySQL Server error codes. Taken from <a href="https://dev.mysql.com/doc/refman/5.7/en/error-messages-server.html">Server Error Codes and Messages</a>.
5+
/// </summary>
6+
public enum MySqlErrorCode
7+
{
8+
/// <summary>
9+
/// You have an error in your SQL syntax (ER_PARSE_ERROR).
10+
/// </summary>
11+
ParseError = 1064,
12+
13+
/// <summary>
14+
/// Query execution was interrupted (ER_QUERY_INTERRUPTED).
15+
/// </summary>
16+
QueryInterrupted = 1317,
17+
}
18+
}

src/MySqlConnector/MySqlClient/Results/ResultSet.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public ResultSet(MySqlDataReader dataReader)
1616
DataReader = dataReader;
1717
}
1818

19-
public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
19+
public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior)
2020
{
2121
// ResultSet can be re-used, so initialize everything
2222
BufferState = ResultSetState.None;
@@ -34,7 +34,7 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
3434
{
3535
while (true)
3636
{
37-
var payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
37+
var payload = await Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
3838

3939
var firstByte = payload.HeaderByte;
4040
if (firstByte == OkPayload.Signature)
@@ -63,7 +63,7 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
6363
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
6464
{
6565
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
66-
await Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
66+
await Session.SendReplyAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
6767
}
6868
}
6969
}
@@ -73,7 +73,7 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
7373
ReadResultSetHeaderException = new MySqlException("Error during LOAD DATA LOCAL INFILE", ex);
7474
}
7575

76-
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
76+
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, CancellationToken.None).ConfigureAwait(false);
7777
}
7878
else
7979
{
@@ -85,11 +85,11 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
8585

8686
for (var column = 0; column < ColumnDefinitions.Length; column++)
8787
{
88-
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
88+
payload = await Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
8989
ColumnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
9090
}
9191

92-
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
92+
payload = await Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
9393
EofPayload.Create(payload);
9494

9595
LastInsertId = -1;
@@ -162,12 +162,27 @@ private ValueTask<Row> ScanRowAsync(IOBehavior ioBehavior, Row row, Cancellation
162162
if (BufferState == ResultSetState.HasMoreData || BufferState == ResultSetState.NoMoreData || BufferState == ResultSetState.None)
163163
return new ValueTask<Row>((Row)null);
164164

165-
var payloadValueTask = Session.ReceiveReplyAsync(ioBehavior, cancellationToken);
166-
return payloadValueTask.IsCompletedSuccessfully
167-
? new ValueTask<Row>(ScanRowAsyncRemainder(payloadValueTask.Result))
168-
: new ValueTask<Row>(ScanRowAsyncAwaited(payloadValueTask.AsTask()));
165+
using (cancellationToken.Register(Command.Cancel))
166+
{
167+
var payloadValueTask = Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None);
168+
return payloadValueTask.IsCompletedSuccessfully
169+
? new ValueTask<Row>(ScanRowAsyncRemainder(payloadValueTask.Result))
170+
: new ValueTask<Row>(ScanRowAsyncAwaited(payloadValueTask.AsTask(), cancellationToken));
171+
}
169172

170-
async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask) => ScanRowAsyncRemainder(await payloadTask.ConfigureAwait(false));
173+
async Task<Row> ScanRowAsyncAwaited(Task<PayloadData> payloadTask, CancellationToken token)
174+
{
175+
try
176+
{
177+
return ScanRowAsyncRemainder(await payloadTask.ConfigureAwait(false));
178+
}
179+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted)
180+
{
181+
BufferState = State = ResultSetState.NoMoreData;
182+
token.ThrowIfCancellationRequested();
183+
throw;
184+
}
185+
}
171186

172187
Row ScanRowAsyncRemainder(PayloadData payload)
173188
{

0 commit comments

Comments
 (0)