Skip to content

Commit 6574755

Browse files
committed
支持短暂类型节点的恢复。
1 parent bae01cf commit 6574755

File tree

2 files changed

+158
-25
lines changed

2 files changed

+158
-25
lines changed

src/Rabbit.Zookeeper/Implementation/NodeEntry.cs

Lines changed: 150 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ internal class NodeEntry
2323
/// </summary>
2424
private NodeChildrenChangeHandler _childrenChangeHandler;
2525

26+
/// <summary>
27+
/// 节点的快照。
28+
/// </summary>
29+
private NodeSnapshot _localSnapshot = default(NodeSnapshot);
30+
2631
#endregion Field
2732

2833
#region Property
@@ -48,6 +53,8 @@ public async Task<IEnumerable<byte>> GetDataAsync(bool watch = false)
4853
var zookeeper = _client.ZooKeeper;
4954
var data = await zookeeper.getDataAsync(Path, watch);
5055

56+
_localSnapshot.SetData(data?.Data);
57+
5158
return data?.Data;
5259
}
5360

@@ -56,6 +63,8 @@ public async Task<IEnumerable<string>> GetChildrenAsync(bool watch = false)
5663
var zookeeper = _client.ZooKeeper;
5764
var data = await zookeeper.getChildrenAsync(Path, watch);
5865

66+
_localSnapshot.SetChildrens(data?.Children);
67+
5968
return data?.Children;
6069
}
6170

@@ -64,25 +73,39 @@ public async Task<bool> ExistsAsync(bool watch = false)
6473
var zookeeper = _client.ZooKeeper;
6574
var data = await zookeeper.existsAsync(Path, watch);
6675

67-
return data != null;
76+
var exists = data != null;
77+
78+
_localSnapshot.SetExists(exists);
79+
80+
return exists;
6881
}
6982

7083
public async Task<string> CreateAsync(byte[] data, List<ACL> acls, CreateMode createMode)
7184
{
7285
var zooKeeper = _client.ZooKeeper;
73-
return await zooKeeper.createAsync(Path, data, acls, createMode);
86+
var path = await zooKeeper.createAsync(Path, data, acls, createMode);
87+
88+
_localSnapshot.Create(createMode, data, acls);
89+
90+
return path;
7491
}
7592

7693
public Task<Stat> SetDataAsync(byte[] data, int version = -1)
7794
{
7895
var zooKeeper = _client.ZooKeeper;
79-
return zooKeeper.setDataAsync(Path, data, version);
96+
var stat = zooKeeper.setDataAsync(Path, data, version);
97+
98+
_localSnapshot.Update(data, version);
99+
100+
return stat;
80101
}
81102

82-
public Task DeleteAsync(int version = -1)
103+
public async Task DeleteAsync(int version = -1)
83104
{
84105
var zookeeper = _client.ZooKeeper;
85-
return zookeeper.deleteAsync(Path, version);
106+
await zookeeper.deleteAsync(Path, version);
107+
108+
_localSnapshot.Delete();
86109
}
87110

88111
#region Listener
@@ -175,6 +198,26 @@ internal async Task OnChange(WatchedEvent watchedEvent, bool isFirstConnection)
175198
/// </summary>
176199
private bool HasChildrenChangeHandler => HasHandler(_childrenChangeHandler);
177200

201+
/// <summary>
202+
/// 状态变更处理。
203+
/// </summary>
204+
/// <param name="watchedEvent"></param>
205+
/// <param name="isFirstConnection">是否是zk第一次连接上服务器。</param>
206+
private async Task OnStatusChangeHandle(WatchedEvent watchedEvent, bool isFirstConnection)
207+
{
208+
//第一次连接zk不进行通知
209+
if (isFirstConnection)
210+
return;
211+
212+
//尝试恢复节点
213+
await RestoreEphemeral();
214+
215+
if (HasDataChangeHandler)
216+
await OnDataChangeHandle(watchedEvent);
217+
if (HasChildrenChangeHandler)
218+
await OnChildrenChangeHandle(watchedEvent);
219+
}
220+
178221
private async Task OnDataChangeHandle(WatchedEvent watchedEvent)
179222
{
180223
if (!HasDataChangeHandler)
@@ -220,23 +263,6 @@ private async Task OnDataChangeHandle(WatchedEvent watchedEvent)
220263
await WatchDataChange();
221264
}
222265

223-
/// <summary>
224-
/// 状态变更处理。
225-
/// </summary>
226-
/// <param name="watchedEvent"></param>
227-
/// <param name="isFirstConnection">是否是zk第一次连接上服务器。</param>
228-
private async Task OnStatusChangeHandle(WatchedEvent watchedEvent, bool isFirstConnection)
229-
{
230-
//第一次连接zk不进行通知
231-
if (isFirstConnection)
232-
return;
233-
234-
if (HasDataChangeHandler)
235-
await OnDataChangeHandle(watchedEvent);
236-
if (HasChildrenChangeHandler)
237-
await OnChildrenChangeHandle(watchedEvent);
238-
}
239-
240266
private async Task OnChildrenChangeHandle(WatchedEvent watchedEvent)
241267
{
242268
if (!HasChildrenChangeHandler)
@@ -261,16 +287,18 @@ private async Task OnChildrenChangeHandle(WatchedEvent watchedEvent)
261287
switch (watchedEvent.get_Type())
262288
{
263289
case Watcher.Event.EventType.NodeCreated:
264-
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeCreated, await getCurrentChildrens());
290+
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeCreated,
291+
await getCurrentChildrens());
265292
break;
266293

267294
case Watcher.Event.EventType.NodeDeleted:
268295
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeDeleted, null);
269296
break;
270297

271298
case Watcher.Event.EventType.NodeChildrenChanged:
272-
case Watcher.Event.EventType.None://重连时触发
273-
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeChildrenChanged, await getCurrentChildrens());
299+
case Watcher.Event.EventType.None: //重连时触发
300+
args = new NodeChildrenChangeArgs(Path, Watcher.Event.EventType.NodeChildrenChanged,
301+
await getCurrentChildrens());
274302
break;
275303

276304
default:
@@ -309,6 +337,103 @@ private static bool HasHandler(MulticastDelegate multicast)
309337
return multicast != null && multicast.GetInvocationList().Any();
310338
}
311339

340+
private async Task RestoreEphemeral()
341+
{
342+
//没有开启恢复
343+
if (!_client.Options.EnableEphemeralNodeRestore)
344+
return;
345+
346+
//节点不存在
347+
if (!_localSnapshot.IsExist)
348+
return;
349+
350+
//不是短暂的节点
351+
if (_localSnapshot.Mode != CreateMode.EPHEMERAL && _localSnapshot.Mode != CreateMode.EPHEMERAL_SEQUENTIAL)
352+
return;
353+
354+
try
355+
{
356+
await _client.RetryUntilConnected(async () =>
357+
{
358+
try
359+
{
360+
return await CreateAsync(_localSnapshot.Data?.ToArray(), _localSnapshot.Acls, _localSnapshot.Mode);
361+
}
362+
catch (KeeperException.NodeExistsException) //节点已经存在则忽略
363+
{
364+
return Path;
365+
}
366+
});
367+
}
368+
catch (Exception exception)
369+
{
370+
Console.WriteLine($"恢复节点失败,异常:{exception.Message}");
371+
}
372+
}
373+
312374
#endregion Private Method
375+
376+
#region Help Type
377+
378+
public struct NodeSnapshot
379+
{
380+
public bool IsExist { get; set; }
381+
public CreateMode Mode { get; set; }
382+
public IEnumerable<byte> Data { get; set; }
383+
public int? Version { get; set; }
384+
public List<ACL> Acls { get; set; }
385+
public IEnumerable<string> Childrens { get; set; }
386+
387+
public void Create(CreateMode mode, byte[] data, List<ACL> acls)
388+
{
389+
IsExist = true;
390+
Mode = mode;
391+
Data = data;
392+
Version = -1;
393+
Acls = acls;
394+
Childrens = null;
395+
}
396+
397+
public void Update(IEnumerable<byte> data, int version)
398+
{
399+
IsExist = true;
400+
Data = data;
401+
Version = version;
402+
}
403+
404+
public void Delete()
405+
{
406+
IsExist = false;
407+
Mode = null;
408+
Data = null;
409+
Version = null;
410+
Acls = null;
411+
Childrens = null;
412+
}
413+
414+
public void SetData(IEnumerable<byte> data)
415+
{
416+
IsExist = true;
417+
Data = data;
418+
}
419+
420+
public void SetChildrens(IEnumerable<string> childrens)
421+
{
422+
IsExist = true;
423+
Childrens = childrens;
424+
}
425+
426+
public void SetExists(bool exists)
427+
{
428+
if (!exists)
429+
{
430+
Delete();
431+
return;
432+
}
433+
IsExist = true;
434+
}
435+
}
436+
437+
#endregion Help Type
313438
}
314439
}

src/Rabbit.Zookeeper/ZookeeperClientOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class ZookeeperClientOptions
1818
/// <see cref="SessionId"/> 为0。
1919
/// <see cref="SessionPasswd"/> 为null。
2020
/// <see cref="BasePath"/> 为null。
21+
/// <see cref="EnableEphemeralNodeRestore"/> 为true。
2122
/// </remarks>
2223
public ZookeeperClientOptions()
2324
{
@@ -27,6 +28,7 @@ public ZookeeperClientOptions()
2728
ReadOnly = false;
2829
SessionId = 0;
2930
SessionPasswd = null;
31+
EnableEphemeralNodeRestore = true;
3032
}
3133

3234
/// <summary>
@@ -42,6 +44,7 @@ public ZookeeperClientOptions()
4244
/// <see cref="SessionId"/> 为0。
4345
/// <see cref="SessionPasswd"/> 为null。
4446
/// <see cref="BasePath"/> 为null。
47+
/// <see cref="EnableEphemeralNodeRestore"/> 为true。
4548
/// </remarks>
4649
public ZookeeperClientOptions(string connectionString) : this()
4750
{
@@ -90,5 +93,10 @@ public ZookeeperClientOptions(string connectionString) : this()
9093
/// 基础路径,会在所有的zk操作节点路径上加入此基础路径。
9194
/// </summary>
9295
public string BasePath { get; set; }
96+
97+
/// <summary>
98+
/// 是否启用短暂类型节点的恢复。
99+
/// </summary>
100+
public bool EnableEphemeralNodeRestore { get; set; }
93101
}
94102
}

0 commit comments

Comments
 (0)