Skip to content

Commit d80e18c

Browse files
committed
implement BufferedResultSets
1 parent c53ba0c commit d80e18c

File tree

7 files changed

+143
-59
lines changed

7 files changed

+143
-59
lines changed

src/MySqlConnector/MySqlClient/CommandExecutors/TextCommandExecutor.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ public virtual async Task<DbDataReader> ExecuteReaderAsync(string commandText, M
5656
var preparer = new MySqlStatementPreparer(commandText, parameterCollection, statementPreparerOptions);
5757
var payload = new PayloadData(preparer.ParseAndBindParameters());
5858
await m_command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
59-
var reader = await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
60-
m_command.Connection.ActiveReader = reader;
61-
return reader;
59+
return await MySqlDataReader.CreateAsync(m_command, behavior, ioBehavior, cancellationToken).ConfigureAwait(false);
6260
}
6361

6462
readonly MySqlCommand m_command;

src/MySqlConnector/MySqlClient/MySqlConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, s
226226
internal MySqlTransaction CurrentTransaction { get; set; }
227227
internal MySqlDataReader ActiveReader { get; set; }
228228
internal bool AllowUserVariables => m_connectionSettings.AllowUserVariables;
229+
internal bool BufferResultSets => m_connectionSettings.BufferResultSets;
229230
internal bool ConvertZeroDateTime => m_connectionSettings.ConvertZeroDateTime;
230231
internal bool OldGuids => m_connectionSettings.OldGuids;
231232
internal bool TreatTinyAsBoolean => m_connectionSettings.TreatTinyAsBoolean;

src/MySqlConnector/MySqlClient/MySqlDataReader.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
6969

7070
private void ActivateResultSet(ResultSet resultSet)
7171
{
72+
if (resultSet.ReadResultSetHeaderException != null)
73+
throw resultSet.ReadResultSetHeaderException;
74+
7275
Command.LastInsertedId = resultSet.LastInsertId;
7376
m_recordsAffected += resultSet.RecordsAffected;
7477
}
@@ -222,6 +225,12 @@ internal static async Task<MySqlDataReader> CreateAsync(MySqlCommand command, Co
222225
{
223226
var dataReader = new MySqlDataReader(command, behavior);
224227
await dataReader.ReadFirstResultSetAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
228+
command.Connection.ActiveReader = dataReader;
229+
if (command.Connection.BufferResultSets)
230+
{
231+
while (await dataReader.BufferNextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false) != null);
232+
command.Connection.ActiveReader = null;
233+
}
225234
return dataReader;
226235
}
227236

@@ -250,7 +259,8 @@ private void DoClose()
250259
m_nextResultSetBuffer.Clear();
251260

252261
var connection = Command.Connection;
253-
connection.ActiveReader = null;
262+
if (connection.ActiveReader == this)
263+
connection.ActiveReader = null;
254264
Command.ReaderClosed();
255265
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
256266
{

src/MySqlConnector/MySqlClient/Results/ResultSet.cs

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -31,75 +31,85 @@ public async Task<ResultSet> ReadResultSetHeaderAsync(IOBehavior ioBehavior, Can
3131
m_rowBuffered = null;
3232
MySqlException exception = null;
3333

34-
while (true)
34+
try
3535
{
36-
var payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
37-
38-
var firstByte = payload.HeaderByte;
39-
if (firstByte == OkPayload.Signature)
40-
{
41-
var ok = OkPayload.Create(payload);
42-
RecordsAffected = ok.AffectedRowCount;
43-
LastInsertId = ok.LastInsertId;
44-
ColumnDefinitions = null;
45-
State = (ok.ServerStatus & ServerStatus.MoreResultsExist) == 0
46-
? ResultSetState.NoMoreData
47-
: ResultSetState.HasMoreData;
48-
if (State == ResultSetState.NoMoreData)
49-
break;
50-
}
51-
else if (firstByte == LocalInfilePayload.Signature)
36+
while (true)
5237
{
53-
try
38+
var payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
39+
40+
var firstByte = payload.HeaderByte;
41+
if (firstByte == OkPayload.Signature)
42+
{
43+
var ok = OkPayload.Create(payload);
44+
RecordsAffected = ok.AffectedRowCount;
45+
LastInsertId = ok.LastInsertId;
46+
ColumnDefinitions = null;
47+
State = (ok.ServerStatus & ServerStatus.MoreResultsExist) == 0
48+
? ResultSetState.NoMoreData
49+
: ResultSetState.HasMoreData;
50+
if (State == ResultSetState.NoMoreData)
51+
break;
52+
}
53+
else if (firstByte == LocalInfilePayload.Signature)
5454
{
55-
var localInfile = LocalInfilePayload.Create(payload);
56-
using (var stream = localInfile.FileName.StartsWith(MySqlBulkLoader.StreamPrefix, StringComparison.Ordinal) ?
57-
MySqlBulkLoader.GetAndRemoveStream(localInfile.FileName) :
58-
File.OpenRead(localInfile.FileName))
55+
try
5956
{
60-
byte[] readBuffer = new byte[65536];
61-
int byteCount;
62-
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
57+
var localInfile = LocalInfilePayload.Create(payload);
58+
using (var stream = localInfile.FileName.StartsWith(MySqlBulkLoader.StreamPrefix, StringComparison.Ordinal) ?
59+
MySqlBulkLoader.GetAndRemoveStream(localInfile.FileName) :
60+
File.OpenRead(localInfile.FileName))
6361
{
64-
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
65-
await Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
62+
byte[] readBuffer = new byte[65536];
63+
int byteCount;
64+
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
65+
{
66+
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
67+
await Session.SendReplyAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
68+
}
6669
}
6770
}
71+
catch (Exception ex)
72+
{
73+
// store the exception, to be thrown after reading the response packet from the server
74+
exception = new MySqlException("Error during LOAD DATA LOCAL INFILE", ex);
75+
}
76+
77+
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
6878
}
69-
catch (Exception ex)
79+
else
7080
{
71-
// store the exception, to be thrown after reading the response packet from the server
72-
exception = new MySqlException("Error during LOAD DATA LOCAL INFILE", ex);
73-
}
81+
var reader = new ByteArrayReader(payload.ArraySegment);
82+
var columnCount = (int) reader.ReadLengthEncodedInteger();
83+
ColumnDefinitions = new ColumnDefinitionPayload[columnCount];
84+
m_dataOffsets = new int[columnCount];
85+
m_dataLengths = new int[columnCount];
7486

75-
await Session.SendReplyAsync(EmptyPayload.Create(), ioBehavior, cancellationToken).ConfigureAwait(false);
76-
}
77-
else
78-
{
79-
var reader = new ByteArrayReader(payload.ArraySegment);
80-
var columnCount = (int) reader.ReadLengthEncodedInteger();
81-
ColumnDefinitions = new ColumnDefinitionPayload[columnCount];
82-
m_dataOffsets = new int[columnCount];
83-
m_dataLengths = new int[columnCount];
87+
for (var column = 0; column < ColumnDefinitions.Length; column++)
88+
{
89+
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
90+
ColumnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
91+
}
8492

85-
for (var column = 0; column < ColumnDefinitions.Length; column++)
86-
{
8793
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
88-
ColumnDefinitions[column] = ColumnDefinitionPayload.Create(payload);
89-
}
94+
EofPayload.Create(payload);
9095

91-
payload = await Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
92-
EofPayload.Create(payload);
93-
94-
LastInsertId = -1;
95-
State = ResultSetState.ReadResultSetHeader;
96-
break;
96+
LastInsertId = -1;
97+
State = ResultSetState.ReadResultSetHeader;
98+
break;
99+
}
97100
}
98-
}
99-
BufferState = State;
100101

101-
if (exception != null)
102-
throw exception;
102+
if (exception != null)
103+
throw exception;
104+
}
105+
catch (Exception ex)
106+
{
107+
ReadResultSetHeaderException = ex;
108+
}
109+
finally
110+
{
111+
BufferState = State;
112+
}
103113

104114
return this;
105115
}
@@ -384,6 +394,7 @@ public Row GetCurrentRow()
384394
}
385395

386396
public readonly MySqlDataReader DataReader;
397+
public Exception ReadResultSetHeaderException { get; private set; }
387398
public MySqlCommand Command => DataReader.Command;
388399
public MySqlConnection Connection => DataReader.Connection;
389400
public MySqlSession Session => DataReader.Session;

src/MySqlConnector/MySqlClient/Results/Row.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void ClearData()
5252
ArrayPool<int>.Shared.Return(m_dataLengths);
5353
ArrayPool<int>.Shared.Return(m_dataOffsets);
5454
ArrayPool<byte>.Shared.Return(m_payload);
55+
m_buffered = false;
5556
}
5657
m_dataLengths = null;
5758
m_dataOffsets = null;

tests/SideBySide/Attributes.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ public BulkLoaderLocalTsvFileFactAttribute()
6767
}
6868
}
6969

70+
public class UnbufferedResultSetsFactAttribute : FactAttribute
71+
{
72+
public UnbufferedResultSetsFactAttribute()
73+
{
74+
var csb = AppConfig.CreateConnectionStringBuilder();
75+
if(csb.BufferResultSets == true)
76+
Skip = "Do not run when BufferResultSets are used";
77+
}
78+
}
79+
7080
public class TcpConnectionFactAttribute : FactAttribute
7181
{
7282
public TcpConnectionFactAttribute()

tests/SideBySide/QueryTests.cs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public async Task InvalidSql()
9696
}
9797
}
9898

99-
[Fact]
99+
[UnbufferedResultSetsFact]
100100
public async Task MultipleReaders()
101101
{
102102
using (var cmd = m_database.Connection.CreateCommand())
@@ -144,6 +144,59 @@ public async Task MultipleReaders()
144144
}
145145
}
146146

147+
#if BASELINE
148+
[Fact(Skip = "Does not support BufferResultSets")]
149+
#else
150+
[Fact]
151+
#endif
152+
public async Task MultipleBufferedReaders()
153+
{
154+
var csb = AppConfig.CreateConnectionStringBuilder();
155+
csb.BufferResultSets = true;
156+
157+
using (var connection = new MySqlConnection(csb.ConnectionString))
158+
{
159+
await connection.OpenAsync();
160+
using (var cmd = connection.CreateCommand())
161+
{
162+
cmd.CommandText = @"drop table if exists query_multiple_buffered_readers;
163+
create table query_multiple_buffered_readers(id integer not null primary key auto_increment);
164+
insert into query_multiple_buffered_readers(id) values(1), (2), (3);";
165+
await cmd.ExecuteNonQueryAsync();
166+
}
167+
168+
using (var cmd1 = connection.CreateCommand())
169+
using (var cmd2 = connection.CreateCommand())
170+
{
171+
var commandText = @"select id from query_multiple_buffered_readers order by id ASC;
172+
select id from query_multiple_buffered_readers order by id DESC;";
173+
cmd1.CommandText = commandText;
174+
cmd2.CommandText = commandText;
175+
176+
var readers = new[]{ await cmd1.ExecuteReaderAsync(), await cmd2.ExecuteReaderAsync() };
177+
foreach (var reader in readers){
178+
Assert.Equal(true, await reader.ReadAsync());
179+
Assert.Equal(1, reader.GetInt32(0));
180+
Assert.Equal(true, await reader.ReadAsync());
181+
Assert.Equal(2, reader.GetInt32(0));
182+
Assert.Equal(true, await reader.ReadAsync());
183+
Assert.Equal(3, reader.GetInt32(0));
184+
Assert.Equal(false, await reader.ReadAsync());
185+
Assert.Equal(true, await reader.NextResultAsync());
186+
187+
Assert.Equal(true, await reader.ReadAsync());
188+
Assert.Equal(3, reader.GetInt32(0));
189+
Assert.Equal(true, await reader.ReadAsync());
190+
Assert.Equal(2, reader.GetInt32(0));
191+
Assert.Equal(true, await reader.ReadAsync());
192+
Assert.Equal(1, reader.GetInt32(0));
193+
Assert.Equal(false, await reader.ReadAsync());
194+
Assert.Equal(false, await reader.NextResultAsync());
195+
}
196+
}
197+
}
198+
}
199+
147200
[Fact]
148201
public async Task UndisposedReader()
149202
{

0 commit comments

Comments
 (0)