1
+ using org . apache . zookeeper ;
2
+ using org . apache . zookeeper . data ;
3
+ using System ;
4
+ using System . Collections . Generic ;
5
+ using System . Threading . Tasks ;
6
+
7
+ namespace Rabbit . Zookeeper
8
+ {
9
+ public interface IZookeeperClient : IDisposable
10
+ {
11
+ /// <summary>
12
+ /// 具体的zookeeper连接。
13
+ /// </summary>
14
+ ZooKeeper ZooKeeper { get ; }
15
+
16
+ /// <summary>
17
+ /// 客户端选项。
18
+ /// </summary>
19
+ ZookeeperClientOptions Options { get ; }
20
+
21
+ /// <summary>
22
+ /// 等待zk连接到具体的某一个状态。
23
+ /// </summary>
24
+ /// <param name="states">希望达到的状态。</param>
25
+ /// <param name="timeout">最长等待时间。</param>
26
+ /// <returns>如果成功则返回true,否则返回false。</returns>
27
+ bool WaitForKeeperState ( Watcher . Event . KeeperState states , TimeSpan timeout ) ;
28
+
29
+ /// <summary>
30
+ /// 重试直到zk连接上。
31
+ /// </summary>
32
+ /// <typeparam name="T">返回类型。</typeparam>
33
+ /// <param name="callable">执行的zk操作。</param>
34
+ /// <returns>执行结果。</returns>
35
+ Task < T > RetryUntilConnected < T > ( Func < Task < T > > callable ) ;
36
+
37
+ Task < IEnumerable < byte > > GetDataAsync ( string path ) ;
38
+
39
+ Task < IEnumerable < string > > GetChildrenAsync ( string path ) ;
40
+
41
+ Task < bool > ExistsAsync ( string path ) ;
42
+
43
+ Task CreateAsync ( string path , byte [ ] data , List < ACL > acls , CreateMode createMode ) ;
44
+
45
+ Task < Stat > SetDataAsync ( string path , byte [ ] data , int version = - 1 ) ;
46
+
47
+ Task DeleteAsync ( string path , int version = - 1 ) ;
48
+
49
+ Task SubscribeDataChange ( string path , NodeDataChangeHandler listener ) ;
50
+
51
+ void UnSubscribeDataChange ( string path , NodeDataChangeHandler listener ) ;
52
+
53
+ void SubscribeStatusChange ( ConnectionStateChangeHandler listener ) ;
54
+
55
+ void UnSubscribeStatusChange ( ConnectionStateChangeHandler listener ) ;
56
+
57
+ Task < IEnumerable < string > > SubscribeChildrenChange ( string path , NodeChildrenChangeHandler listener ) ;
58
+
59
+ void UnSubscribeChildrenChange ( string path , NodeChildrenChangeHandler listener ) ;
60
+ }
61
+
62
+ public static class ZookeeperClientExtensions
63
+ {
64
+ public static Task CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data )
65
+ {
66
+ return client . CreateEphemeralAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE ) ;
67
+ }
68
+
69
+ public static Task CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls )
70
+ {
71
+ return client . CreateAsync ( path , data , acls , CreateMode . EPHEMERAL ) ;
72
+ }
73
+
74
+ public static Task CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data )
75
+ {
76
+ return client . CreatePersistentAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE ) ;
77
+ }
78
+
79
+ public static Task CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls )
80
+ {
81
+ return client . CreateAsync ( path , data , acls , CreateMode . PERSISTENT ) ;
82
+ }
83
+
84
+ public static async Task < bool > DeleteRecursiveAsync ( this IZookeeperClient client , string path )
85
+ {
86
+ IEnumerable < string > children ;
87
+ try
88
+ {
89
+ children = await client . GetChildrenAsync ( path ) ;
90
+ }
91
+ catch ( KeeperException . NoNodeException )
92
+ {
93
+ return true ;
94
+ }
95
+
96
+ foreach ( string subPath in children )
97
+ {
98
+ if ( ! await client . DeleteRecursiveAsync ( path + "/" + subPath ) )
99
+ {
100
+ return false ;
101
+ }
102
+ }
103
+ await client . DeleteAsync ( path ) ;
104
+ return true ;
105
+ }
106
+
107
+ public static Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data , CreateMode createMode )
108
+ {
109
+ return client . CreateRecursiveAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE , createMode ) ;
110
+ }
111
+
112
+ public static async Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls , CreateMode createMode )
113
+ {
114
+ try
115
+ {
116
+ await client . CreateAsync ( path , data , acls , createMode ) ;
117
+ }
118
+ catch ( KeeperException . NodeExistsException )
119
+ {
120
+ }
121
+ catch ( KeeperException . NoNodeException )
122
+ {
123
+ var parentDir = path . Substring ( 0 , path . LastIndexOf ( '/' ) ) ;
124
+ await CreateRecursiveAsync ( client , parentDir , null , acls , CreateMode . PERSISTENT ) ;
125
+ await client . CreateAsync ( path , data , acls , createMode ) ;
126
+ }
127
+ }
128
+
129
+ /// <summary>
130
+ /// 等待直到zk连接成功,超时时间为zk选项中的操作超时时间配置值。
131
+ /// </summary>
132
+ /// <param name="client">zk客户端。</param>
133
+ public static void WaitForRetry ( this IZookeeperClient client )
134
+ {
135
+ client . WaitUntilConnected ( client . Options . OperatingTimeout ) ;
136
+ }
137
+
138
+ /// <summary>
139
+ /// 等待直到zk连接成功。
140
+ /// </summary>
141
+ /// <param name="client">zk客户端。</param>
142
+ /// <param name="timeout">最长等待时间。</param>
143
+ /// <returns>如果成功则返回true,否则返回false。</returns>
144
+ public static bool WaitUntilConnected ( this IZookeeperClient client , TimeSpan timeout )
145
+ {
146
+ return client . WaitForKeeperState ( Watcher . Event . KeeperState . SyncConnected , timeout ) ;
147
+ }
148
+ }
149
+ }
0 commit comments