Skip to content

Commit 82b6f13

Browse files
authored
Merge pull request #204 from caleblloyd/f_bufferResultSets
Implement BufferResultSets connection string option.
2 parents 0fbd032 + 289ef12 commit 82b6f13

File tree

13 files changed

+184
-61
lines changed

13 files changed

+184
-61
lines changed

.ci/config/config.buffer.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"Data": {
3+
"ConnectionString": "server=127.0.0.1;user id=mysqltest;password='test;key=\"val';port=3306;database=mysqltest;ssl mode=none;Use Affected Rows=true;BufferResultSets=true",
4+
"PasswordlessUser": "no_password",
5+
"SecondaryDatabase": "testdb2",
6+
"SupportsCachedProcedures": true,
7+
"SupportsJson": true,
8+
"MySqlBulkLoaderLocalCsvFile": "%TESTDATA%/LoadData_UTF8_BOM_Unix.CSV",
9+
"MySqlBulkLoaderLocalTsvFile": "%TESTDATA%/LoadData_UTF8_BOM_Unix.TSV"
10+
}
11+
}

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ script:
2222
- echo 'Executing tests with No Compression, SSL' && ./.ci/use-config.sh config.ssl.json 172.17.0.1 3307 && time dotnet test tests/SideBySide/SideBySide.csproj -c Release -f netcoreapp1.1.1
2323
- echo 'Executing tests with Compression, SSL' && ./.ci/use-config.sh config.compression+ssl.json 172.17.0.1 3307 && time dotnet test tests/SideBySide/SideBySide.csproj -c Release -f netcoreapp1.1.1
2424
- echo 'Executing tests with Unix Domain Socket, No Compression, No SSL' && ./.ci/use-config.sh config.uds.json && time dotnet test tests/SideBySide/SideBySide.csproj -c Release -f netcoreapp1.1.1
25+
- echo 'Executing tests with Buffering, No Compression, No SSL' && ./.ci/use-config.sh config.buffer.json 172.17.0.1 3307 && time dotnet test tests/SideBySide/SideBySide.csproj -c Release -f netcoreapp1.1.1
2526

2627
after_script:
2728
- chmod +x .ci/build-docs.sh && ./.ci/build-docs.sh

docs/content/connection-options.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,16 @@ These are the other options that MySqlConnector supports. They are set to sensi
127127
<th style="width: 70%">Descriotion</th>
128128
</thead>
129129
<tr>
130-
<td>AllowUserVariables</td>
131-
<td>true</td>
130+
<td>AllowUserVariables, Allow User Variables</td>
131+
<td>false</td>
132132
<td>Setting this to true indicates that the provider expects user variables in the SQL.</td>
133133
</tr>
134+
<tr>
135+
<td>BufferResultSets, Buffer Result Sets</td>
136+
<td>false</td>
137+
<td>Setting this to true immediately buffers all result sets to memory upon calling ExecuteReader/ExecuteReaderAsync. This will allow the connection
138+
to execute another statement while still holding the original postion of the reader. Do not use when result sets are bigger than available memory.</td>
139+
</tr>
134140
<tr>
135141
<td>Compress, Use Compression, UseCompression</td>
136142
<td>false</td>

src/MySqlConnector/MySqlClient/CommandExecutors/TextCommandExecutor.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, M
5656
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
5757
var payload = new PayloadData(preparer.ParseAndBindParameters());
5858
await m_command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
59-
var reader = await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
60-
m_command.Connection.ActiveReader = reader;
61-
return reader;
59+
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
6260
}
6361

6462
readonly MySqlCommand m_command;

src/MySqlConnector/MySqlClient/MySqlConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, s
226226
internal MySqlTransaction CurrentTransaction { get; set; }
227227
internal MySqlDataReader ActiveReader { get; set; }
228228
internal bool AllowUserVariables => m_connectionSettings.AllowUserVariables;
229+
internal bool BufferResultSets => m_connectionSettings.BufferResultSets;
229230
internal bool ConvertZeroDateTime => m_connectionSettings.ConvertZeroDateTime;
230231
internal bool OldGuids => m_connectionSettings.OldGuids;
231232
internal bool TreatTinyAsBoolean => m_connectionSettings.TreatTinyAsBoolean;

src/MySqlConnector/MySqlClient/MySqlConnectionStringBuilder.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ public bool AllowUserVariables
9898
set { MySqlConnectionStringOption.AllowUserVariables.SetValue(this, value); }
9999
}
100100

101+
public bool BufferResultSets
102+
{
103+
get { return MySqlConnectionStringOption.BufferResultSets.GetValue(this); }
104+
set { MySqlConnectionStringOption.BufferResultSets.SetValue(this, value); }
105+
}
106+
101107
public string CharacterSet
102108
{
103109
get { return MySqlConnectionStringOption.CharacterSet.GetValue(this); }
@@ -223,6 +229,7 @@ internal abstract class MySqlConnectionStringOption
223229

224230
// Other Options
225231
public static readonly MySqlConnectionStringOption<bool> AllowUserVariables;
232+
public static readonly MySqlConnectionStringOption<bool> BufferResultSets;
226233
public static readonly MySqlConnectionStringOption<string> CharacterSet;
227234
public static readonly MySqlConnectionStringOption<uint> ConnectionTimeout;
228235
public static readonly MySqlConnectionStringOption<bool> ConvertZeroDateTime;
@@ -324,6 +331,10 @@ static MySqlConnectionStringOption()
324331
keys: new[] { "AllowUserVariables", "Allow User Variables" },
325332
defaultValue: false));
326333

334+
AddOption(BufferResultSets = new MySqlConnectionStringOption<bool>(
335+
keys: new[] { "BufferResultSets", "Buffer Result Sets" },
336+
defaultValue: false));
337+
327338
AddOption(CharacterSet = new MySqlConnectionStringOption<string>(
328339
keys: new[] { "CharSet", "Character Set", "CharacterSet" },
329340
defaultValue: ""));

src/MySqlConnector/MySqlClient/MySqlDataReader.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
6969

7070
private void ActivateResultSet(ResultSet resultSet)
7171
{
72+
if (resultSet.ReadResultSetHeaderException != null)
73+
throw resultSet.ReadResultSetHeaderException;
74+
7275
Command.LastInsertedId = resultSet.LastInsertId;
7376
m_recordsAffected += resultSet.RecordsAffected;
7477
}
@@ -222,6 +225,12 @@ internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, Co
222225
{
223226
var dataReader = new MySqlDataReader(command, behavior);
224227
await dataReader.ReadFirstResultSetAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
228+
command.Connection.ActiveReader = dataReader;
229+
if (command.Connection.BufferResultSets)
230+
{
231+
while (await dataReader.BufferNextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false) != null);
232+
command.Connection.ActiveReader = null;
233+
}
225234
return dataReader;
226235
}
227236

@@ -250,7 +259,8 @@ private void DoClose()
250259
m_nextResultSetBuffer.Clear();
251260

252261
var connection = Command.Connection;
253-
connection.ActiveReader = null;
262+
if (connection.ActiveReader == this)
263+
connection.ActiveReader = null;
254264
Command.ReaderClosed();
255265
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
256266
{

src/MySqlConnector/MySqlClient/Results/ResultSet.cs

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -31,75 +31,85 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
3131
m_rowBuffered = null;
3232
MySqlException exception = null;
3333

34-
while (true)
34+
try
3535
{
36-
var payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
37-
38-
var firstByte = payload.HeaderByte;
39-
if (firstByte == OkPayload.Signature)
40-
{
41-
var ok = OkPayload.Create(payload);
42-
RecordsAffected = ok.AffectedRowCount;
43-
LastInsertId = ok.LastInsertId;
44-
ColumnDefinitions = null;
45-
State = (ok.ServerStatus & ServerStatus.MoreResultsExist) == 0
46-
? ResultSetState.NoMoreData
47-
: ResultSetState.HasMoreData;
48-
if (State == ResultSetState.NoMoreData)
49-
break;
50-
}
51-
else if (firstByte == LocalInfilePayload.Signature)
36+
while (true)
5237
{
53-
try
38+
var payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
39+
40+
var firstByte = payload.HeaderByte;
41+
if (firstByte == OkPayload.Signature)
42+
{
43+
var ok = OkPayload.Create(payload);
44+
RecordsAffected = ok.AffectedRowCount;
45+
LastInsertId = ok.LastInsertId;
46+
ColumnDefinitions = null;
47+
State = (ok.ServerStatus & ServerStatus.MoreResultsExist) == 0
48+
? ResultSetState.NoMoreData
49+
: ResultSetState.HasMoreData;
50+
if (State == ResultSetState.NoMoreData)
51+
break;
52+
}
53+
else if (firstByte == LocalInfilePayload.Signature)
5454
{
55-
var localInfile = LocalInfilePayload.Create(payload);
56-
using (var stream = localInfile.FileName.StartsWith(MySqlBulkLoader.StreamPrefix, StringComparison.Ordinal) ?
57-
MySqlBulkLoader.GetAndRemoveStream(localInfile.FileName) :
58-
File.OpenRead(localInfile.FileName))
55+
try
5956
{
60-
byte[] readBuffer = new byte[65536];
61-
int byteCount;
62-
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
57+
var localInfile = LocalInfilePayload.Create(payload);
58+
using (var stream = localInfile.FileName.StartsWith(MySqlBulkLoader.StreamPrefix, StringComparison.Ordinal) ?
59+
MySqlBulkLoader.GetAndRemoveStream(localInfile.FileName) :
60+
File.OpenRead(localInfile.FileName))
6361
{
64-
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
65-
await Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
62+
byte[] readBuffer = new byte[65536];
63+
int byteCount;
64+
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
65+
{
66+
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
67+
await Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
68+
}
6669
}
6770
}
71+
catch (Exception ex)
72+
{
73+
// store the exception, to be thrown after reading the response packet from the server
74+
exception = new MySqlException("Error during LOAD DATA LOCAL INFILE", ex);
75+
}
76+
77+
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
6878
}
69-
catch (Exception ex)
79+
else
7080
{
71-
// store the exception, to be thrown after reading the response packet from the server
72-
exception = new MySqlException("Error during LOAD DATA LOCAL INFILE", ex);
73-
}
81+
var reader = new ByteArrayReader(payload.ArraySegment);
82+
var columnCount = (int) reader.ReadLengthEncodedInteger();
83+
ColumnDefinitions = new ColumnDefinitionPayload[columnCount];
84+
m_dataOffsets = new int[columnCount];
85+
m_dataLengths = new int[columnCount];
7486

75-
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
76-
}
77-
else
78-
{
79-
var reader = new ByteArrayReader(payload.ArraySegment);
80-
var columnCount = (int) reader.ReadLengthEncodedInteger();
81-
ColumnDefinitions = new ColumnDefinitionPayload[columnCount];
82-
m_dataOffsets = new int[columnCount];
83-
m_dataLengths = new int[columnCount];
87+
for (var column = 0; column < ColumnDefinitions.Length; column++)
88+
{
89+
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
90+
ColumnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
91+
}
8492

85-
for (var column = 0; column < ColumnDefinitions.Length; column++)
86-
{
8793
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
88-
ColumnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
89-
}
94+
EofPayload.Create(payload);
9095

91-
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
92-
EofPayload.Create(payload);
93-
94-
LastInsertId = -1;
95-
State = ResultSetState.ReadResultSetHeader;
96-
break;
96+
LastInsertId = -1;
97+
State = ResultSetState.ReadResultSetHeader;
98+
break;
99+
}
97100
}
98-
}
99-
BufferState = State;
100101

101-
if (exception != null)
102-
throw exception;
102+
if (exception != null)
103+
throw exception;
104+
}
105+
catch (Exception ex)
106+
{
107+
ReadResultSetHeaderException = ex;
108+
}
109+
finally
110+
{
111+
BufferState = State;
112+
}
103113

104114
return this;
105115
}
@@ -384,6 +394,7 @@ public Row GetCurrentRow()
384394
}
385395

386396
public readonly MySqlDataReader DataReader;
397+
public Exception ReadResultSetHeaderException { get; private set; }
387398
public MySqlCommand Command => DataReader.Command;
388399
public MySqlConnection Connection => DataReader.Connection;
389400
public MySqlSession Session => DataReader.Session;

src/MySqlConnector/MySqlClient/Results/Row.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void ClearData()
5252
ArrayPool<int>.Shared.Return(m_dataLengths);
5353
ArrayPool<int>.Shared.Return(m_dataOffsets);
5454
ArrayPool<byte>.Shared.Return(m_payload);
55+
m_buffered = false;
5556
}
5657
m_dataLengths = null;
5758
m_dataOffsets = null;

src/MySqlConnector/Serialization/ConnectionSettings.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public ConnectionSettings(MySqlConnectionStringBuilder csb)
4444

4545
// Other Options
4646
AllowUserVariables = csb.AllowUserVariables;
47+
BufferResultSets = csb.BufferResultSets;
4748
ConnectionTimeout = (int)csb.ConnectionTimeout;
4849
ConvertZeroDateTime = csb.ConvertZeroDateTime;
4950
ForceSynchronous = csb.ForceSynchronous;
@@ -82,6 +83,7 @@ private ConnectionSettings(ConnectionSettings other, bool? useCompression)
8283

8384
// Other Options
8485
AllowUserVariables = other.AllowUserVariables;
86+
BufferResultSets = other.BufferResultSets;
8587
ConnectionTimeout = other.ConnectionTimeout;
8688
ConvertZeroDateTime = other.ConvertZeroDateTime;
8789
ForceSynchronous = other.ForceSynchronous;
@@ -116,6 +118,7 @@ private ConnectionSettings(ConnectionSettings other, bool? useCompression)
116118

117119
// Other Options
118120
internal readonly bool AllowUserVariables;
121+
internal readonly bool BufferResultSets;
119122
internal readonly int ConnectionTimeout;
120123
internal readonly bool ConvertZeroDateTime;
121124
internal readonly bool ForceSynchronous;

0 commit comments

Comments
 (0)