36
36
using System . Threading ;
37
37
using System . Threading . Tasks ;
38
38
using RabbitMQ . Client . client . framing ;
39
+ using RabbitMQ . Client . Events ;
39
40
using RabbitMQ . Client . Exceptions ;
40
41
using RabbitMQ . Client . Framing . Impl ;
41
42
using RabbitMQ . Client . Logging ;
@@ -58,13 +59,13 @@ protected SessionBase(Connection connection, ushort channelNumber)
58
59
RabbitMqClientEventSource . Log . ChannelOpened ( ) ;
59
60
}
60
61
61
- public event EventHandler < ShutdownEventArgs > SessionShutdown
62
+ public event AsyncEventHandler < ShutdownEventArgs > SessionShutdownAsync
62
63
{
63
64
add
64
65
{
65
66
if ( CloseReason is null )
66
67
{
67
- _sessionShutdownWrapper . AddHandler ( value ) ;
68
+ _sessionShutdownAsyncWrapper . AddHandler ( value ) ;
68
69
}
69
70
else
70
71
{
@@ -73,10 +74,10 @@ public event EventHandler<ShutdownEventArgs> SessionShutdown
73
74
}
74
75
remove
75
76
{
76
- _sessionShutdownWrapper . RemoveHandler ( value ) ;
77
+ _sessionShutdownAsyncWrapper . RemoveHandler ( value ) ;
77
78
}
78
79
}
79
- private EventingWrapper < ShutdownEventArgs > _sessionShutdownWrapper ;
80
+ private AsyncEventingWrapper < ShutdownEventArgs > _sessionShutdownAsyncWrapper ;
80
81
81
82
public ushort ChannelNumber { get ; }
82
83
@@ -86,29 +87,17 @@ public event EventHandler<ShutdownEventArgs> SessionShutdown
86
87
[ MemberNotNullWhen ( false , nameof ( CloseReason ) ) ]
87
88
public bool IsOpen => CloseReason is null ;
88
89
89
- public Task OnConnectionShutdownAsync ( object ? conn , ShutdownEventArgs reason )
90
- {
91
- Close ( reason ) ;
92
- return Task . CompletedTask ;
93
- }
94
-
95
- public void OnSessionShutdown ( ShutdownEventArgs reason )
96
- {
97
- Connection . ConnectionShutdownAsync -= OnConnectionShutdownAsync ;
98
- _sessionShutdownWrapper . Invoke ( this , reason ) ;
99
- }
100
-
101
90
public override string ToString ( )
102
91
{
103
92
return $ "{ GetType ( ) . Name } #{ ChannelNumber } :{ Connection } ";
104
93
}
105
94
106
- public void Close ( ShutdownEventArgs reason )
95
+ public Task CloseAsync ( ShutdownEventArgs reason , CancellationToken cancellationToken )
107
96
{
108
- Close ( reason , true ) ;
97
+ return CloseAsync ( reason , true , cancellationToken ) ;
109
98
}
110
99
111
- public void Close ( ShutdownEventArgs reason , bool notify )
100
+ public Task CloseAsync ( ShutdownEventArgs reason , bool notify , CancellationToken cancellationToken )
112
101
{
113
102
if ( Interlocked . CompareExchange ( ref _closeReason , reason , null ) is null )
114
103
{
@@ -117,23 +106,25 @@ public void Close(ShutdownEventArgs reason, bool notify)
117
106
118
107
if ( notify )
119
108
{
120
- OnSessionShutdown ( CloseReason ! ) ;
109
+ return OnSessionShutdownAsync ( CloseReason ! ) ;
121
110
}
111
+
112
+ return Task . CompletedTask ;
122
113
}
123
114
124
115
public abstract Task HandleFrameAsync ( InboundFrame frame , CancellationToken cancellationToken ) ;
125
116
126
- public void Notify ( )
117
+ public Task NotifyAsync ( CancellationToken cancellationToken )
127
118
{
128
119
// Ensure that we notify only when session is already closed
129
120
// If not, throw exception, since this is a serious bug in the library
130
121
ShutdownEventArgs ? reason = CloseReason ;
131
122
if ( reason is null )
132
123
{
133
- throw new InvalidOperationException ( "Internal Error in SessionBase.Notify " ) ;
124
+ throw new InvalidOperationException ( "Internal Error in SessionBase.NotifyAsync " ) ;
134
125
}
135
126
136
- OnSessionShutdown ( reason ) ;
127
+ return OnSessionShutdownAsync ( reason ) ;
137
128
}
138
129
139
130
public virtual ValueTask TransmitAsync < T > ( in T cmd , CancellationToken cancellationToken ) where T : struct , IOutgoingAmqpMethod
@@ -162,6 +153,17 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
162
153
return Connection . WriteAsync ( bytes , cancellationToken ) ;
163
154
}
164
155
156
+ private Task OnConnectionShutdownAsync ( object ? conn , ShutdownEventArgs reason )
157
+ {
158
+ return CloseAsync ( reason , CancellationToken . None ) ;
159
+ }
160
+
161
+ private Task OnSessionShutdownAsync ( ShutdownEventArgs reason )
162
+ {
163
+ Connection . ConnectionShutdownAsync -= OnConnectionShutdownAsync ;
164
+ return _sessionShutdownAsyncWrapper . InvokeAsync ( this , reason ) ;
165
+ }
166
+
165
167
private void ThrowAlreadyClosedException ( )
166
168
=> throw new AlreadyClosedException ( CloseReason ! ) ;
167
169
}
0 commit comments