@@ -13,51 +13,63 @@ public class MySQLFixture : IDisposable
1313 private const string _username = "root" ;
1414 private const string _password = "root" ;
1515
16- private readonly MySqlConnection _connection ;
17-
1816 public IReplicationClient Client { get ; private set ; }
1917
18+ private SemaphoreSlim _semaphore = new SemaphoreSlim ( 1 , 1 ) ;
19+
2020 public static MySQLFixture Instance { get ; } = new MySQLFixture ( ) ;
2121
2222 private MySQLFixture ( )
2323 {
24- _connection = new MySqlConnection ( $ "Server={ _host } ;Database=garden;Uid={ _username } ;Pwd={ _password } ;") ;
2524 Client = new ReplicationClient ( ) ;
2625 ConnectAsync ( ) . Wait ( ) ;
2726 }
2827
2928 private async Task ConnectAsync ( )
3029 {
31- await _connection . OpenAsync ( ) ;
3230 await Client . ConnectAsync ( _host , _username , _password , 1 ) ;
3331 }
3432
33+ private MySqlConnection GetConnection ( )
34+ {
35+ var connection = new MySqlConnection ( $ "Server={ _host } ;Database=garden;Uid={ _username } ;Pwd={ _password } ;") ;
36+ connection . OpenAsync ( ) . Wait ( ) ;
37+ return connection ;
38+ }
39+
3540 public MySqlCommand CreateCommand ( )
3641 {
37- return _connection . CreateCommand ( ) ;
42+ return GetConnection ( ) . CreateCommand ( ) ;
3843 }
3944
4045 public async Task < TLogEvent > ReceiveAsync < TLogEvent > ( CancellationToken cancellationToken = default )
4146 where TLogEvent : LogEvent
4247 {
43- while ( ! cancellationToken . IsCancellationRequested )
44- {
45- var logEvent = await Client . ReceiveAsync ( ) ;
48+ await _semaphore . WaitAsync ( cancellationToken ) ;
4649
47- if ( logEvent is TLogEvent requiredLogEvent )
50+ try
51+ {
52+ while ( ! cancellationToken . IsCancellationRequested )
4853 {
49- return requiredLogEvent ;
54+ var logEvent = await Client . ReceiveAsync ( ) ;
55+
56+ if ( logEvent is TLogEvent requiredLogEvent )
57+ {
58+ return requiredLogEvent ;
59+ }
5060 }
5161 }
62+ finally
63+ {
64+ _semaphore . Release ( ) ;
65+ }
5266
5367 return default ;
5468 }
5569
5670 public void Dispose ( )
5771 {
5872 Client ? . CloseAsync ( ) . AsTask ( ) . Wait ( ) ;
59- _connection ? . CloseAsync ( ) . Wait ( ) ;
60- _connection ? . Dispose ( ) ;
6173 }
6274 }
6375}
0 commit comments