11using System ;
2+ using System . Collections . Generic ;
23using System . Text ;
34using System . Threading . Tasks ;
45using CanalSharp . Connections . Enums ;
78using Microsoft . Extensions . Logging ;
89using Newtonsoft . Json ;
910using NZookeeper ;
11+ using NZookeeper . ACL ;
1012using NZookeeper . Enums ;
13+ using NZookeeper . Node ;
14+ using org . apache . zookeeper ;
1115
1216namespace CanalSharp . Connections
1317{
1418 /// <summary>
1519 /// Support canal server cluster and client cluster.
1620 /// </summary>
17- public class ClusterCanalConnection : ICanalConnection
21+ public class ClusterCanalConnection
1822 {
1923 private readonly ClusterCanalOptions _options ;
2024 private readonly ILoggerFactory _loggerFactory ;
2125 private readonly ILogger < ClusterCanalConnection > _logger ;
26+ private readonly TaskCompletionSource < int > _completionSource ;
2227 // ReSharper disable once InconsistentNaming
23- private readonly string ZK_RUNNING_NODE ;
28+ private readonly string ZK_SERVER_RUNNING_NODE ;
29+ // ReSharper disable once InconsistentNaming
30+ private readonly string ZK_CLIENT_RUNNING_NODE ;
2431
25- // For reconnection
32+ // For reconnect
2633 private string _lastSubFilter ;
2734 private bool _serverRunningNodeReCreated ;
2835 private SimpleCanalConnection _currentConn ;
2936 private ZkConnection _zk ;
37+ private CanalClientRunningInfo _clientRunningInfo ;
3038
3139 public ClusterCanalConnection ( [ NotNull ] ClusterCanalOptions options , ILoggerFactory loggerFactory )
3240 {
3341 _options = options ;
3442 _loggerFactory = loggerFactory ;
3543 _logger = loggerFactory . CreateLogger < ClusterCanalConnection > ( ) ;
36- ZK_RUNNING_NODE = $ "/otter/canal/destinations/{ _options . Destination } /running";
44+ _completionSource = new TaskCompletionSource < int > ( ) ;
45+ ZK_SERVER_RUNNING_NODE = $ "/otter/canal/destinations/{ _options . Destination } /running";
46+ ZK_CLIENT_RUNNING_NODE = $ "/otter/canal/destinations/{ _options . Destination } /{ _options . ClientId } /running";
3747 }
3848
39-
4049 public async Task ReConnectAsync ( )
4150 {
4251 await DisConnectAsync ( ) ;
@@ -72,17 +81,78 @@ public async Task ReConnectAsync()
7281
7382 public async Task ConnectAsync ( )
7483 {
75- await ConnectToZkAsync ( ) ;
7684 //get canal address from zk
77- var nodeData = await _zk . GetDataAsync ( ZK_RUNNING_NODE ) ;
78- var runningInfo = JsonConvert . DeserializeObject < CanalRunningInfo > ( Encoding . UTF8 . GetString ( nodeData ) ) ;
85+ await ConnectToZkAsync ( ) ;
86+ var nodeData = await _zk . GetDataAsync ( ZK_SERVER_RUNNING_NODE ) ;
87+ var runningInfo = JsonConvert . DeserializeObject < CanalServerRunningInfo > ( Encoding . UTF8 . GetString ( nodeData ) ) ;
7988 _logger . LogInformation ( $ "get canal address from zookeeper success: { runningInfo . Address } ") ;
8089
8190 //connect to canal
8291 _currentConn = new SimpleCanalConnection ( CopyOptions ( runningInfo ) , _loggerFactory . CreateLogger < SimpleCanalConnection > ( ) ) ;
8392 await _currentConn . ConnectAsync ( ) ;
84-
8593 _serverRunningNodeReCreated = false ;
94+
95+ var localIp = _currentConn . GetLocalEndPoint ( ) . ToString ( ) ;
96+ _clientRunningInfo = new CanalClientRunningInfo ( )
97+ { Active = true , Address = localIp , ClientId = _options . ClientId } ;
98+ _ = GetZkLockAsync ( _clientRunningInfo ) ;
99+ await _completionSource . Task ;
100+
101+ _logger . LogInformation ( "Ready to use!" ) ;
102+ }
103+
104+ public async Task GetZkLockAsync ( CanalClientRunningInfo runningInfo , bool waiting = false )
105+ {
106+ var times = 0 ;
107+ while ( waiting && times < 60 )
108+ {
109+ await Task . Delay ( 1000 ) ;
110+ times ++ ;
111+ _logger . LogWarning ( $ "Waiting for get lock { times } -60...") ;
112+ }
113+
114+ if ( await _zk . NodeExistsAsync ( ZK_CLIENT_RUNNING_NODE ) )
115+ {
116+ _logger . LogInformation ( $ "Node { ZK_CLIENT_RUNNING_NODE } exits, get Zookeeper lock failed. Other instances are running.") ;
117+ _logger . LogWarning ( "Waiting..." ) ;
118+ }
119+ else
120+ {
121+ try
122+ {
123+ var clientNodeData = Encoding . UTF8 . GetBytes ( JsonConvert . SerializeObject ( runningInfo ) ) ;
124+
125+ var parentNode = ZK_CLIENT_RUNNING_NODE . Replace ( "/running" , "" ) ;
126+ if ( ! await _zk . NodeExistsAsync ( parentNode ) )
127+ await _zk . CreateNodeAsync ( parentNode , null ,
128+ new List < Acl > ( ) { new Acl ( AclPerm . All , AclScheme . World , AclId . World ( ) ) } , NodeType . Persistent ) ;
129+
130+ await _zk . CreateNodeAsync ( ZK_CLIENT_RUNNING_NODE , clientNodeData ,
131+ new List < Acl > ( ) { new Acl ( AclPerm . All , AclScheme . World , AclId . World ( ) ) } , NodeType . Ephemeral ) ;
132+ await _zk . CreateNodeAsync ( parentNode + "/" + runningInfo . Address , null ,
133+ new List < Acl > ( ) { new Acl ( AclPerm . All , AclScheme . World , AclId . World ( ) ) } , NodeType . Ephemeral ) ;
134+ await _zk . NodeExistsAsync ( ZK_CLIENT_RUNNING_NODE ) ;
135+ _completionSource . SetResult ( 0 ) ;
136+ _logger . LogInformation ( "Get zookeeper lock success." ) ;
137+ }
138+ catch ( KeeperException . NodeExistsException e )
139+ {
140+ if ( e . getPath ( ) != ZK_CLIENT_RUNNING_NODE )
141+ {
142+ _logger . LogError ( e , "Error during get lock." ) ;
143+ Environment . Exit ( - 1 ) ;
144+ }
145+
146+ _logger . LogInformation (
147+ $ "Node { ZK_CLIENT_RUNNING_NODE } exits, get Zookeeper lock failed. Other instances are running.") ;
148+ _logger . LogWarning ( "Waiting..." ) ;
149+ }
150+ catch ( Exception e )
151+ {
152+ _logger . LogError ( e , "Exception in lock" ) ;
153+ }
154+ }
155+
86156 }
87157
88158 private async Task ConnectToZkAsync ( )
@@ -105,33 +175,44 @@ private async Task ConnectToZkAsync()
105175 var times = 0 ;
106176 while ( times < 60 )
107177 {
108- if ( await _zk . NodeExistsAsync ( ZK_RUNNING_NODE ) )
178+ if ( await _zk . NodeExistsAsync ( ZK_SERVER_RUNNING_NODE ) )
109179 {
110180 return ;
111181 }
112182
113183 await Task . Delay ( 1000 ) ;
114184 times ++ ;
115185
116- _logger . LogWarning ( $ "Can not find node { ZK_RUNNING_NODE } on Zookeeper. Retrying... { times } ") ;
186+ _logger . LogWarning ( $ "Can not find node { ZK_SERVER_RUNNING_NODE } on Zookeeper. Retrying... { times } ") ;
117187 }
118188
119- throw new CanalConnectionException ( $ "Can not find node { ZK_RUNNING_NODE } on Zookeeper.") ;
189+ throw new CanalConnectionException ( $ "Can not find node { ZK_SERVER_RUNNING_NODE } on Zookeeper.") ;
120190 }
121191
122192 private async Task Zk_OnWatch ( ZkWatchEventArgs args )
123193 {
124- if ( args . Path == ZK_RUNNING_NODE && args . EventType == WatchEventType . NodeCreated )
194+ if ( args . Path == ZK_SERVER_RUNNING_NODE && args . EventType == WatchEventType . NodeCreated )
125195 {
126196 _serverRunningNodeReCreated = true ;
127- _logger . LogInformation ( $ "Zookeeper node { ZK_RUNNING_NODE } Created") ;
197+ _logger . LogInformation ( $ "Server node { ZK_SERVER_RUNNING_NODE } Created") ;
198+ }
199+
200+ if ( args . Path == ZK_SERVER_RUNNING_NODE && args . EventType == WatchEventType . NodeDeleted )
201+ {
202+ _logger . LogInformation ( $ "Server node { ZK_SERVER_RUNNING_NODE } Deleted") ;
128203 }
129- else if ( args . Path == ZK_RUNNING_NODE && args . EventType == WatchEventType . NodeDeleted )
204+
205+ if ( args . Path == ZK_CLIENT_RUNNING_NODE && args . EventType == WatchEventType . NodeDeleted )
130206 {
131- _logger . LogInformation ( $ "Zookeeper node { ZK_RUNNING_NODE } Deleted") ;
207+ _logger . LogInformation ( $ "Client node { ZK_SERVER_RUNNING_NODE } Deleted") ;
208+ _ = GetZkLockAsync ( _clientRunningInfo , true ) ;
132209 }
133210
134- await _zk . NodeExistsAsync ( ZK_RUNNING_NODE ) ;
211+ if ( _zk . Connected )
212+ {
213+ await _zk . NodeExistsAsync ( ZK_SERVER_RUNNING_NODE ) ;
214+ await _zk . NodeExistsAsync ( ZK_CLIENT_RUNNING_NODE ) ;
215+ }
135216 }
136217
137218 public Task SubscribeAsync ( string filter = ".*\\ ..*" )
@@ -202,7 +283,7 @@ public Task<Message> GetWithoutAckAsync(int fetchSize, long? timeout,
202283 return _currentConn . GetWithoutAckAsync ( fetchSize , timeout ) ;
203284 }
204285
205- private SimpleCanalOptions CopyOptions ( CanalRunningInfo runningInfo )
286+ private SimpleCanalOptions CopyOptions ( CanalServerRunningInfo runningInfo )
206287 {
207288 var tmpArr = runningInfo . Address . Split ( ":" ) ;
208289 var op = new SimpleCanalOptions ( tmpArr [ 0 ] , int . Parse ( tmpArr [ 1 ] ) , _options . ClientId )
0 commit comments