6
6
7
7
namespace Rabbit . Zookeeper
8
8
{
9
+ /// <summary>
10
+ /// 一个抽象的ZooKeeper客户端。
11
+ /// </summary>
9
12
public interface IZookeeperClient : IDisposable
10
13
{
11
14
/// <summary>
12
- /// 具体的zookeeper连接 。
15
+ /// 具体的ZooKeeper连接 。
13
16
/// </summary>
14
17
ZooKeeper ZooKeeper { get ; }
15
18
@@ -34,53 +37,174 @@ public interface IZookeeperClient : IDisposable
34
37
/// <returns>执行结果。</returns>
35
38
Task < T > RetryUntilConnected < T > ( Func < Task < T > > callable ) ;
36
39
40
+ /// <summary>
41
+ /// 获取指定节点的数据。
42
+ /// </summary>
43
+ /// <param name="path">节点路径。</param>
44
+ /// <returns>节点数据。</returns>
37
45
Task < IEnumerable < byte > > GetDataAsync ( string path ) ;
38
46
47
+ /// <summary>
48
+ /// 获取指定节点下的所有子节点。
49
+ /// </summary>
50
+ /// <param name="path">节点路径。</param>
51
+ /// <returns>子节点集合。</returns>
39
52
Task < IEnumerable < string > > GetChildrenAsync ( string path ) ;
40
53
54
+ /// <summary>
55
+ /// 判断节点是否存在。
56
+ /// </summary>
57
+ /// <param name="path">节点路径。</param>
58
+ /// <returns>如果存在则返回true,否则返回false。</returns>
41
59
Task < bool > ExistsAsync ( string path ) ;
42
60
43
- Task CreateAsync ( string path , byte [ ] data , List < ACL > acls , CreateMode createMode ) ;
61
+ /// <summary>
62
+ /// 创建节点。
63
+ /// </summary>
64
+ /// <param name="path">节点路径。</param>
65
+ /// <param name="data">节点数据。</param>
66
+ /// <param name="acls">权限。</param>
67
+ /// <param name="createMode">创建模式。</param>
68
+ /// <returns>节点路径。</returns>
69
+ /// <remarks>
70
+ /// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
71
+ /// </remarks>
72
+ Task < string > CreateAsync ( string path , byte [ ] data , List < ACL > acls , CreateMode createMode ) ;
44
73
74
+ /// <summary>
75
+ /// 设置节点数据。
76
+ /// </summary>
77
+ /// <param name="path">节点路径。</param>
78
+ /// <param name="data">节点数据。</param>
79
+ /// <param name="version">版本号。</param>
80
+ /// <returns>节点状态。</returns>
45
81
Task < Stat > SetDataAsync ( string path , byte [ ] data , int version = - 1 ) ;
46
82
83
+ /// <summary>
84
+ /// 删除节点。
85
+ /// </summary>
86
+ /// <param name="path">节点路径。</param>
87
+ /// <param name="version">版本号。</param>
47
88
Task DeleteAsync ( string path , int version = - 1 ) ;
48
89
90
+ /// <summary>
91
+ /// 订阅节点数据变更。
92
+ /// </summary>
93
+ /// <param name="path">节点路径。</param>
94
+ /// <param name="listener">监听者。</param>
49
95
Task SubscribeDataChange ( string path , NodeDataChangeHandler listener ) ;
50
96
97
+ /// <summary>
98
+ /// 取消订阅节点数据变更。
99
+ /// </summary>
100
+ /// <param name="path">节点路径。</param>
101
+ /// <param name="listener">监听者。</param>
51
102
void UnSubscribeDataChange ( string path , NodeDataChangeHandler listener ) ;
52
103
104
+ /// <summary>
105
+ /// 订阅连接状态变更。
106
+ /// </summary>
107
+ /// <param name="listener">监听者。</param>
53
108
void SubscribeStatusChange ( ConnectionStateChangeHandler listener ) ;
54
109
110
+ /// <summary>
111
+ /// 取消订阅连接状态变更。
112
+ /// </summary>
113
+ /// <param name="listener">监听者。</param>
55
114
void UnSubscribeStatusChange ( ConnectionStateChangeHandler listener ) ;
56
115
116
+ /// <summary>
117
+ /// 订阅节点子节点变更。
118
+ /// </summary>
119
+ /// <param name="path">节点路径。</param>
120
+ /// <param name="listener">监听者。</param>
57
121
Task < IEnumerable < string > > SubscribeChildrenChange ( string path , NodeChildrenChangeHandler listener ) ;
58
122
123
+ /// <summary>
124
+ /// 取消订阅节点子节点变更。
125
+ /// </summary>
126
+ /// <param name="path">节点路径。</param>
127
+ /// <param name="listener">监听者。</param>
59
128
void UnSubscribeChildrenChange ( string path , NodeChildrenChangeHandler listener ) ;
60
129
}
61
130
131
+ /// <summary>
132
+ /// ZooKeeper客户端扩展方法。
133
+ /// </summary>
62
134
public static class ZookeeperClientExtensions
63
135
{
64
- public static Task CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data )
136
+ /// <summary>
137
+ /// 创建短暂的节点。
138
+ /// </summary>
139
+ /// <param name="client">ZooKeeper客户端。</param>
140
+ /// <param name="path">节点路径。</param>
141
+ /// <param name="data">节点数据。</param>
142
+ /// <param name="isSequential">是否按顺序创建。</param>
143
+ /// <returns>节点路径。</returns>
144
+ /// <remarks>
145
+ /// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
146
+ /// </remarks>
147
+ public static Task < string > CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data , bool isSequential = false )
65
148
{
66
- return client . CreateEphemeralAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE ) ;
149
+ return client . CreateEphemeralAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE , isSequential ) ;
67
150
}
68
151
69
- public static Task CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls )
152
+ /// <summary>
153
+ /// 创建短暂的节点。
154
+ /// </summary>
155
+ /// <param name="client">ZooKeeper客户端。</param>
156
+ /// <param name="path">节点路径。</param>
157
+ /// <param name="data">节点数据。</param>
158
+ /// <param name="acls">权限。</param>
159
+ /// <param name="isSequential">是否按顺序创建。</param>
160
+ /// <returns>节点路径。</returns>
161
+ /// <remarks>
162
+ /// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
163
+ /// </remarks>
164
+ public static Task < string > CreateEphemeralAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls , bool isSequential = false )
70
165
{
71
- return client . CreateAsync ( path , data , acls , CreateMode . EPHEMERAL ) ;
166
+ return client . CreateAsync ( path , data , acls , isSequential ? CreateMode . EPHEMERAL_SEQUENTIAL : CreateMode . EPHEMERAL ) ;
72
167
}
73
168
74
- public static Task CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data )
169
+ /// <summary>
170
+ /// 创建节点。
171
+ /// </summary>
172
+ /// <param name="client">ZooKeeper客户端。</param>
173
+ /// <param name="path">节点路径。</param>
174
+ /// <param name="data">节点数据。</param>
175
+ /// <param name="isSequential">是否按顺序创建。</param>
176
+ /// <returns>节点路径。</returns>
177
+ /// <remarks>
178
+ /// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
179
+ /// </remarks>
180
+ public static Task < string > CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data , bool isSequential = false )
75
181
{
76
- return client . CreatePersistentAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE ) ;
182
+ return client . CreatePersistentAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE , isSequential ) ;
77
183
}
78
184
79
- public static Task CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls )
185
+ /// <summary>
186
+ /// 创建节点。
187
+ /// </summary>
188
+ /// <param name="client">ZooKeeper客户端。</param>
189
+ /// <param name="path">节点路径。</param>
190
+ /// <param name="data">节点数据。</param>
191
+ /// <param name="acls">权限。</param>
192
+ /// <param name="isSequential">是否按顺序创建。</param>
193
+ /// <returns>节点路径。</returns>
194
+ /// <remarks>
195
+ /// 因为使用序列方式创建节点zk会修改节点name,所以需要返回真正的节点路径。
196
+ /// </remarks>
197
+ public static Task < string > CreatePersistentAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls , bool isSequential = false )
80
198
{
81
- return client . CreateAsync ( path , data , acls , CreateMode . PERSISTENT ) ;
199
+ return client . CreateAsync ( path , data , acls , isSequential ? CreateMode . PERSISTENT_SEQUENTIAL : CreateMode . PERSISTENT ) ;
82
200
}
83
201
202
+ /// <summary>
203
+ /// 递归删除该节点下的所有子节点和该节点本身。
204
+ /// </summary>
205
+ /// <param name="client">ZooKeeper客户端。</param>
206
+ /// <param name="path">节点路径。</param>
207
+ /// <returns>如果成功则返回true,false。</returns>
84
208
public static async Task < bool > DeleteRecursiveAsync ( this IZookeeperClient client , string path )
85
209
{
86
210
IEnumerable < string > children ;
@@ -93,7 +217,7 @@ public static async Task<bool> DeleteRecursiveAsync(this IZookeeperClient client
93
217
return true ;
94
218
}
95
219
96
- foreach ( string subPath in children )
220
+ foreach ( var subPath in children )
97
221
{
98
222
if ( ! await client . DeleteRecursiveAsync ( path + "/" + subPath ) )
99
223
{
@@ -104,25 +228,52 @@ public static async Task<bool> DeleteRecursiveAsync(this IZookeeperClient client
104
228
return true ;
105
229
}
106
230
107
- public static Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data , CreateMode createMode )
231
+ /// <summary>
232
+ /// 递归创建该节点下的所有子节点和该节点本身。
233
+ /// </summary>
234
+ /// <param name="client">ZooKeeper客户端。</param>
235
+ /// <param name="path">节点路径。</param>
236
+ /// <param name="data">节点数据。</param>
237
+ public static Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data )
238
+ {
239
+ return client . CreateRecursiveAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE ) ;
240
+ }
241
+
242
+ /// <summary>
243
+ /// 递归创建该节点下的所有子节点和该节点本身。
244
+ /// </summary>
245
+ /// <param name="client">ZooKeeper客户端。</param>
246
+ /// <param name="path">节点路径。</param>
247
+ /// <param name="data">节点数据。</param>
248
+ /// <param name="acls">权限。</param>
249
+ public static Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls )
108
250
{
109
- return client . CreateRecursiveAsync ( path , data , ZooDefs . Ids . OPEN_ACL_UNSAFE , createMode ) ;
251
+ return client . CreateRecursiveAsync ( path , p => data , p => acls ) ;
110
252
}
111
253
112
- public static async Task CreateRecursiveAsync ( this IZookeeperClient client , string path , byte [ ] data , List < ACL > acls , CreateMode createMode )
254
+ /// <summary>
255
+ /// 递归创建该节点下的所有子节点和该节点本身。
256
+ /// </summary>
257
+ /// <param name="client">ZooKeeper客户端。</param>
258
+ /// <param name="path">节点路径。</param>
259
+ /// <param name="getNodeData">获取当前被创建节点数据的委托。</param>
260
+ /// <param name="getNodeAcls">获取当前被创建节点权限的委托。</param>
261
+ public static async Task CreateRecursiveAsync ( this IZookeeperClient client , string path , Func < string , byte [ ] > getNodeData , Func < string , List < ACL > > getNodeAcls )
113
262
{
263
+ var data = getNodeData ( path ) ;
264
+ var acls = getNodeAcls ( path ) ;
114
265
try
115
266
{
116
- await client . CreateAsync ( path , data , acls , createMode ) ;
267
+ await client . CreateAsync ( path , data , acls , CreateMode . PERSISTENT ) ;
117
268
}
118
269
catch ( KeeperException . NodeExistsException )
119
270
{
120
271
}
121
272
catch ( KeeperException . NoNodeException )
122
273
{
123
274
var parentDir = path . Substring ( 0 , path . LastIndexOf ( '/' ) ) ;
124
- await CreateRecursiveAsync ( client , parentDir , null , acls , CreateMode . PERSISTENT ) ;
125
- await client . CreateAsync ( path , data , acls , createMode ) ;
275
+ await CreateRecursiveAsync ( client , parentDir , getNodeData , getNodeAcls ) ;
276
+ await client . CreateAsync ( path , data , acls , CreateMode . PERSISTENT ) ;
126
277
}
127
278
}
128
279
0 commit comments