1
+ using System . Diagnostics ;
1
2
using System . Net . Sockets ;
2
3
using MySqlConnector . Logging ;
3
4
using MySqlConnector . Protocol . Serialization ;
@@ -7,62 +8,71 @@ namespace MySqlConnector.Core;
7
8
8
9
internal static class CommandExecutor
9
10
{
10
- public static async Task < MySqlDataReader > ExecuteReaderAsync ( IReadOnlyList < IMySqlCommand > commands , ICommandPayloadCreator payloadCreator , CommandBehavior behavior , IOBehavior ioBehavior , CancellationToken cancellationToken )
11
+ public static async Task < MySqlDataReader > ExecuteReaderAsync ( IReadOnlyList < IMySqlCommand > commands , ICommandPayloadCreator payloadCreator , CommandBehavior behavior , Activity ? activity , IOBehavior ioBehavior , CancellationToken cancellationToken )
11
12
{
12
- cancellationToken . ThrowIfCancellationRequested ( ) ;
13
- var commandListPosition = new CommandListPosition ( commands ) ;
14
- var command = commands [ 0 ] ;
13
+ try
14
+ {
15
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
16
+ var commandListPosition = new CommandListPosition ( commands ) ;
17
+ var command = commands [ 0 ] ;
15
18
16
- // pre-requisite: Connection is non-null must be checked before calling this method
17
- var connection = command . Connection ! ;
19
+ // pre-requisite: Connection is non-null must be checked before calling this method
20
+ var connection = command . Connection ! ;
18
21
19
- if ( Log . IsTraceEnabled ( ) )
20
- Log . Trace ( "Session{0} ExecuteReader {1} CommandCount: {2}" , connection . Session . Id , ioBehavior , commands . Count ) ;
22
+ if ( Log . IsTraceEnabled ( ) )
23
+ Log . Trace ( "Session{0} ExecuteReader {1} CommandCount: {2}" , connection . Session . Id , ioBehavior , commands . Count ) ;
21
24
22
- Dictionary < string , CachedProcedure ? > ? cachedProcedures = null ;
23
- foreach ( var command2 in commands )
24
- {
25
- if ( command2 . CommandType == CommandType . StoredProcedure )
25
+ Dictionary < string , CachedProcedure ? > ? cachedProcedures = null ;
26
+ foreach ( var command2 in commands )
26
27
{
27
- cachedProcedures ??= new ( ) ;
28
- var commandText = command2 . CommandText ! ;
29
- if ( ! cachedProcedures . ContainsKey ( commandText ) )
28
+ if ( command2 . CommandType == CommandType . StoredProcedure )
30
29
{
31
- cachedProcedures . Add ( commandText , await connection . GetCachedProcedure ( commandText , revalidateMissing : false , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ) ;
30
+ cachedProcedures ??= new ( ) ;
31
+ var commandText = command2 . CommandText ! ;
32
+ if ( ! cachedProcedures . ContainsKey ( commandText ) )
33
+ {
34
+ cachedProcedures . Add ( commandText , await connection . GetCachedProcedure ( commandText , revalidateMissing : false , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ) ;
32
35
33
- // because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
34
- // we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
35
- command . CancellableCommand . ResetCommandTimeout ( ) ;
36
+ // because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
37
+ // we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
38
+ command . CancellableCommand . ResetCommandTimeout ( ) ;
39
+ }
36
40
}
37
41
}
38
- }
39
42
40
- var writer = new ByteBufferWriter ( ) ;
41
- // cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
42
- if ( ! payloadCreator . WriteQueryCommand ( ref commandListPosition , cachedProcedures ! , writer ) )
43
- throw new InvalidOperationException ( "ICommandPayloadCreator failed to write query payload" ) ;
43
+ var writer = new ByteBufferWriter ( ) ;
44
+ // cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
45
+ if ( ! payloadCreator . WriteQueryCommand ( ref commandListPosition , cachedProcedures ! , writer ) )
46
+ throw new InvalidOperationException ( "ICommandPayloadCreator failed to write query payload" ) ;
44
47
45
- cancellationToken . ThrowIfCancellationRequested ( ) ;
48
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
46
49
47
- using var payload = writer . ToPayloadData ( ) ;
48
- connection . Session . StartQuerying ( command . CancellableCommand ) ;
49
- command . SetLastInsertedId ( - 1 ) ;
50
- try
51
- {
52
- await connection . Session . SendAsync ( payload , ioBehavior , CancellationToken . None ) . ConfigureAwait ( false ) ;
53
- return await MySqlDataReader . CreateAsync ( commandListPosition , payloadCreator , cachedProcedures , command , behavior , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
54
- }
55
- catch ( MySqlException ex ) when ( ex . ErrorCode == MySqlErrorCode . QueryInterrupted && cancellationToken . IsCancellationRequested )
56
- {
57
- Log . Info ( "Session{0} query was interrupted" , connection . Session . Id ) ;
58
- throw new OperationCanceledException ( ex . Message , ex , cancellationToken ) ;
50
+ using var payload = writer . ToPayloadData ( ) ;
51
+ connection . Session . StartQuerying ( command . CancellableCommand ) ;
52
+ command . SetLastInsertedId ( - 1 ) ;
53
+ try
54
+ {
55
+ await connection . Session . SendAsync ( payload , ioBehavior , CancellationToken . None ) . ConfigureAwait ( false ) ;
56
+ return await MySqlDataReader . CreateAsync ( commandListPosition , payloadCreator , cachedProcedures , command , behavior , activity , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
57
+ }
58
+ catch ( MySqlException ex ) when ( ex . ErrorCode == MySqlErrorCode . QueryInterrupted && cancellationToken . IsCancellationRequested )
59
+ {
60
+ Log . Info ( "Session{0} query was interrupted" , connection . Session . Id ) ;
61
+ throw new OperationCanceledException ( ex . Message , ex , cancellationToken ) ;
62
+ }
63
+ catch ( Exception ex ) when ( payload . Span . Length > 4_194_304 && ( ex is SocketException or IOException or MySqlProtocolException ) )
64
+ {
65
+ // the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
66
+ // use "decimal megabytes" (to round up) when creating the exception message
67
+ int megabytes = payload . Span . Length / 1_000_000 ;
68
+ throw new MySqlException ( "Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB." . FormatInvariant ( megabytes ) , ex ) ;
69
+ }
59
70
}
60
- catch ( Exception ex ) when ( payload . Span . Length > 4_194_304 && ( ex is SocketException or IOException or MySqlProtocolException ) )
71
+ catch ( Exception ex ) when ( activity is { IsAllDataRequested : true } )
61
72
{
62
- // the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
63
- // use "decimal megabytes" (to round up) when creating the exception message
64
- int megabytes = payload . Span . Length / 1_000_000 ;
65
- throw new MySqlException ( "Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB." . FormatInvariant ( megabytes ) , ex ) ;
73
+ activity . SetException ( ex ) ;
74
+ activity . Stop ( ) ;
75
+ throw ;
66
76
}
67
77
}
68
78
0 commit comments