Skip to content

Commit f2b77bb

Browse files
committed
Add more fine-grained State to MySqlSession.
This will track that the connection to the MySQL Server is in a known state, and help implement cancellation.
1 parent 739101d commit f2b77bb

File tree

5 files changed

+141
-28
lines changed

5 files changed

+141
-28
lines changed

src/MySqlConnector/MySqlClient/CommandExecutors/TextCommandExecutor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public virtual async Task<object> ExecuteScalarAsync(string commandText, MySqlPa
5050
public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection,
5151
CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
5252
{
53+
m_command.Connection.Session.StartQuerying();
5354
m_command.LastInsertedId = -1;
5455
var statementPreparerOptions = StatementPreparerOptions.None;
5556
if (m_command.Connection.AllowUserVariables || m_command.CommandType == CommandType.StoredProcedure)

src/MySqlConnector/MySqlClient/MySqlCommand.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,6 @@ private void VerifyValid()
177177
throw new InvalidOperationException("The transaction associated with this command is not the connection's active transaction.");
178178
if (string.IsNullOrWhiteSpace(CommandText))
179179
throw new InvalidOperationException("CommandText must be specified");
180-
if (Connection.ActiveReader != null)
181-
throw new MySqlException("There is already an open DataReader associated with this Connection which must be closed first.");
182180
}
183181

184182
internal void ReaderClosed() => (m_commandExecutor as StoredProcedureCommandExecutor)?.SetParams();

src/MySqlConnector/MySqlClient/MySqlConnection.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, s
230230
}
231231

232232
internal MySqlTransaction CurrentTransaction { get; set; }
233-
internal MySqlDataReader ActiveReader { get; set; }
233+
internal MySqlDataReader ActiveReader => m_session.ActiveReader;
234234
internal bool AllowUserVariables => m_connectionSettings.AllowUserVariables;
235235
internal bool BufferResultSets => m_connectionSettings.BufferResultSets;
236236
internal bool ConvertZeroDateTime => m_connectionSettings.ConvertZeroDateTime;
@@ -309,11 +309,8 @@ private void DoClose()
309309
private void CloseDatabase()
310310
{
311311
m_cachedProcedures = null;
312-
if (ActiveReader != null)
313-
{
314-
ActiveReader.Dispose();
315-
ActiveReader = null;
316-
}
312+
if (Session.ActiveReader != null)
313+
Session.ActiveReader.Dispose();
317314
if (CurrentTransaction != null && m_session.IsConnected)
318315
{
319316
CurrentTransaction.Dispose();

src/MySqlConnector/MySqlClient/MySqlDataReader.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,29 @@ protected override void Dispose(bool disposing)
215215
internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
216216
{
217217
var dataReader = new MySqlDataReader(command, behavior);
218-
await dataReader.ReadFirstResultSetAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
219-
command.Connection.ActiveReader = dataReader;
220-
if (command.Connection.BufferResultSets)
218+
command.Connection.Session.SetActiveReader(dataReader);
219+
220+
try
221+
{
222+
await dataReader.ReadFirstResultSetAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
223+
if (command.Connection.BufferResultSets)
224+
{
225+
while (await dataReader.BufferNextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false) != null)
226+
{
227+
}
228+
}
229+
return dataReader;
230+
}
231+
catch (Exception)
232+
{
233+
dataReader.Dispose();
234+
throw;
235+
}
236+
finally
221237
{
222-
while (await dataReader.BufferNextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false) != null);
223-
command.Connection.ActiveReader = null;
238+
if (command.Connection.BufferResultSets)
239+
command.Connection.Session.FinishQuerying();
224240
}
225-
return dataReader;
226241
}
227242

228243
internal async Task ReadFirstResultSetAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
@@ -250,8 +265,8 @@ private void DoClose()
250265
m_nextResultSetBuffer.Clear();
251266

252267
var connection = Command.Connection;
253-
if (connection.ActiveReader == this)
254-
connection.ActiveReader = null;
268+
if (!connection.BufferResultSets)
269+
connection.Session.FinishQuerying();
255270
Command.ReaderClosed();
256271
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
257272
{

src/MySqlConnector/Serialization/MySqlSession.cs

Lines changed: 114 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public MySqlSession()
2323

2424
public MySqlSession(ConnectionPool pool, int poolGeneration)
2525
{
26+
m_lock = new object();
2627
CreatedUtc = DateTime.UtcNow;
2728
Pool = pool;
2829
PoolGeneration = poolGeneration;
@@ -38,13 +39,59 @@ public MySqlSession(ConnectionPool pool, int poolGeneration)
3839

3940
public void ReturnToPool() => Pool?.Return(this);
4041

41-
public bool IsConnected => m_state == State.Connected;
42+
public bool IsConnected
43+
{
44+
get
45+
{
46+
lock (m_lock)
47+
return m_state == State.Connected;
48+
}
49+
}
50+
51+
public void StartQuerying()
52+
{
53+
lock (m_lock)
54+
{
55+
if (m_state == State.Querying)
56+
throw new MySqlException("There is already an open DataReader associated with this Connection which must be closed first.");
57+
58+
VerifyState(State.Connected);
59+
m_state = State.Querying;
60+
}
61+
}
62+
63+
public MySqlDataReader ActiveReader => m_activeReader;
64+
65+
public void SetActiveReader(MySqlDataReader dataReader)
66+
{
67+
VerifyState(State.Querying);
68+
if (dataReader == null)
69+
throw new ArgumentNullException(nameof(dataReader));
70+
if (m_activeReader != null)
71+
throw new InvalidOperationException("Can't replace active reader.");
72+
m_activeReader = dataReader;
73+
}
74+
75+
public void FinishQuerying()
76+
{
77+
lock (m_lock)
78+
{
79+
VerifyState(State.Querying);
80+
m_state = State.Connected;
81+
m_activeReader = null;
82+
}
83+
}
4284

4385
public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
4486
{
4587
if (m_payloadHandler != null)
4688
{
4789
// attempt to gracefully close the connection, ignoring any errors (it may have been closed already by the server, etc.)
90+
lock (m_lock)
91+
{
92+
VerifyState(State.Connected, State.Failed);
93+
m_state = State.Closing;
94+
}
4895
try
4996
{
5097
m_payloadHandler.StartNewConversation();
@@ -59,18 +106,28 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
59106
}
60107
}
61108
ShutdownSocket();
62-
m_state = State.Closed;
109+
lock (m_lock)
110+
m_state = State.Closed;
63111
}
64112

65113
public async Task ConnectAsync(ConnectionSettings cs, IOBehavior ioBehavior, CancellationToken cancellationToken)
66114
{
115+
lock (m_lock)
116+
{
117+
VerifyState(State.Created);
118+
m_state = State.Connecting;
119+
}
67120
var connected = false;
68121
if (cs.ConnectionType == ConnectionType.Tcp)
69122
connected = await OpenTcpSocketAsync(cs, ioBehavior, cancellationToken).ConfigureAwait(false);
70123
else if (cs.ConnectionType == ConnectionType.Unix)
71124
connected = await OpenUnixSocketAsync(cs, ioBehavior, cancellationToken).ConfigureAwait(false);
72125
if (!connected)
126+
{
127+
lock (m_lock)
128+
m_state = State.Failed;
73129
throw new MySqlException("Unable to connect to any of the specified MySQL hosts.");
130+
}
74131

75132
var socketByteHandler = new SocketByteHandler(m_socket);
76133
m_payloadHandler = new StandardPayloadHandler(socketByteHandler);
@@ -114,6 +171,7 @@ public async Task ConnectAsync(ConnectionSettings cs, IOBehavior ioBehavior, Can
114171

115172
public async Task ResetConnectionAsync(ConnectionSettings cs, IOBehavior ioBehavior, CancellationToken cancellationToken)
116173
{
174+
VerifyState(State.Connected);
117175
if (ServerVersion.Version.CompareTo(ServerVersions.SupportsResetConnection) >= 0)
118176
{
119177
await SendAsync(ResetConnectionPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
@@ -150,6 +208,8 @@ public async Task ResetConnectionAsync(ConnectionSettings cs, IOBehavior ioBehav
150208

151209
public async Task<bool> TryPingAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
152210
{
211+
VerifyState(State.Connected);
212+
153213
// check if client socket is still connected
154214
// http://stackoverflow.com/questions/2661764/how-to-check-if-a-socket-is-connected-disconnected-in-c
155215
if (m_socket.Poll(1, SelectMode.SelectRead) && m_socket.Available == 0)
@@ -169,6 +229,7 @@ public async Task<bool> TryPingAsync(IOBehavior ioBehavior, CancellationToken ca
169229
{
170230
}
171231

232+
VerifyState(State.Failed);
172233
return false;
173234
}
174235

@@ -196,10 +257,13 @@ public ValueTask<int> SendReplyAsync(PayloadData payload, IOBehavior ioBehavior,
196257

197258
private void VerifyConnected()
198259
{
199-
if (m_state == State.Closed)
200-
throw new ObjectDisposedException(nameof(MySqlSession));
201-
if (m_state != State.Connected)
202-
throw new InvalidOperationException("MySqlSession is not connected.");
260+
lock (m_lock)
261+
{
262+
if (m_state == State.Closed)
263+
throw new ObjectDisposedException(nameof(MySqlSession));
264+
if (m_state != State.Connected && m_state != State.Querying && m_state != State.CancelingQuery && m_state != State.Closing)
265+
throw new InvalidOperationException("MySqlSession is not connected.");
266+
}
203267
}
204268

205269
private async Task<bool> OpenTcpSocketAsync(ConnectionSettings cs, IOBehavior ioBehavior, CancellationToken cancellationToken)
@@ -270,7 +334,8 @@ private async Task<bool> OpenTcpSocketAsync(ConnectionSettings cs, IOBehavior io
270334
m_socket = m_tcpClient.Client;
271335
m_networkStream = m_tcpClient.GetStream();
272336
SerializationUtility.SetKeepalive(m_socket, cs.Keepalive);
273-
m_state = State.Connected;
337+
lock (m_lock)
338+
m_state = State.Connected;
274339
return true;
275340
}
276341
}
@@ -316,7 +381,8 @@ private async Task<bool> OpenUnixSocketAsync(ConnectionSettings cs, IOBehavior i
316381
m_socket = socket;
317382
m_networkStream = new NetworkStream(socket);
318383

319-
m_state = State.Connected;
384+
lock (m_lock)
385+
m_state = State.Connected;
320386
return true;
321387
}
322388

@@ -398,7 +464,8 @@ private async Task InitSslAsync(ProtocolCapabilities serverCapabilities, Connect
398464
{
399465
ShutdownSocket();
400466
m_hostname = "";
401-
m_state = State.Failed;
467+
lock (m_lock)
468+
m_state = State.Failed;
402469
if (ex is AuthenticationException)
403470
throw new MySqlException("SSL Authentication Error", ex);
404471
if (ex is IOException && clientCertificates != null)
@@ -450,10 +517,10 @@ private void ShutdownSocket()
450517

451518
private ValueTask<int> TryAsync<TArg>(Func<TArg, IOBehavior, ValueTask<int>> func, TArg arg, IOBehavior ioBehavior, CancellationToken cancellationToken)
452519
{
453-
VerifyConnected();
454520
ValueTask<int> task;
455521
try
456522
{
523+
VerifyConnected();
457524
task = func(arg, ioBehavior);
458525
}
459526
catch (Exception ex)
@@ -480,10 +547,10 @@ private int TryAsyncContinuation(Task<int> task)
480547

481548
private ValueTask<PayloadData> TryAsync(Func<ProtocolErrorBehavior, IOBehavior, ValueTask<ArraySegment<byte>>> func, IOBehavior ioBehavior, CancellationToken cancellationToken)
482549
{
483-
VerifyConnected();
484550
ValueTask<ArraySegment<byte>> task;
485551
try
486552
{
553+
VerifyConnected();
487554
task = func(ProtocolErrorBehavior.Throw, ioBehavior);
488555
}
489556
catch (Exception ex)
@@ -516,22 +583,57 @@ private PayloadData TryAsyncContinuation(Task<ArraySegment<byte>> task)
516583

517584
private void SetFailed()
518585
{
519-
m_state = State.Failed;
586+
lock (m_lock)
587+
m_state = State.Failed;
588+
}
589+
590+
591+
private void VerifyState(State state)
592+
{
593+
if (m_state != state)
594+
throw new InvalidOperationException("Expected state to be {0} but was {1}.".FormatInvariant(state, m_state));
595+
}
596+
597+
private void VerifyState(State state1, State state2)
598+
{
599+
if (m_state != state1 && m_state != state2)
600+
throw new InvalidOperationException("Expected state to be ({0}|{1}) but was {2}.".FormatInvariant(state1, state2, m_state));
520601
}
521602

522603
private enum State
523604
{
605+
// The session has been created; no connection has been made.
524606
Created,
607+
608+
// The session is attempting to connect to a server.
609+
Connecting,
610+
611+
// The session is connected to a server; there is no active query.
525612
Connected,
613+
614+
// The session is connected to a server and a query is being made.
615+
Querying,
616+
617+
// The session is connected to a server and the active query is being cancelled.
618+
CancelingQuery,
619+
620+
// The session is closing.
621+
Closing,
622+
623+
// The session is closed.
526624
Closed,
625+
626+
// An unexpected error occurred; the session is in an unusable state.
527627
Failed,
528628
}
529629

630+
readonly object m_lock;
530631
State m_state;
531632
string m_hostname = "";
532633
TcpClient m_tcpClient;
533634
Socket m_socket;
534635
NetworkStream m_networkStream;
535636
IPayloadHandler m_payloadHandler;
637+
MySqlDataReader m_activeReader;
536638
}
537639
}

0 commit comments

Comments
 (0)