-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlock-example.groovy
More file actions
106 lines (78 loc) · 3.07 KB
/
lock-example.groovy
File metadata and controls
106 lines (78 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
@Grab('org.apache.zookeeper:zookeeper:3.5.4-beta')
import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.ZooKeeper
import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL
import static org.apache.zookeeper.CreateMode.PERSISTENT
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE
ResourceLockingClient client = new ResourceLockingClient('localhost:2181', '/_lockparent_')
client.lock();
println 'Resource is locked'
println 'Press enter to unlock resource'
System.in.newReader().readLine()
client.unlock();
println 'Resource unlocked'
/**
* Client class which is used to lock or unlock a resource
*/
class ResourceLockingClient {
private final String name;
private final ZooKeeper zk
private String myLockNode;
ResourceLockingClient(String connectionString, String resourcename) {
def watcher = new WaitingWatcher();
zk = new ZooKeeper(connectionString, 1000, watcher);
watcher.waitForWatchEvent();
if (!zk.exists(resourcename, null)) {
zk.create(resourcename, new byte[0], OPEN_ACL_UNSAFE, PERSISTENT);
}
this.name = resourcename;
}
void lock() {
def res = zk.create("$name/lock-", new byte[0], OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL);
println("\tCreated znode '$res'")
myLockNode = res.split('/').last();
while (true) {
List<String> children = zk.getChildren(name, false);
//sort, so that all nodes are sorted with increasing sequential numbers
children.sort()
//check if client has current lock
if (children.get(0).equals(myLockNode)) {
println("\tLock received! (with znode '$name/$myLockNode')")
return;
}
println("\tResource is already locked. Wait until its released!");
//find predecessor to watch its node
int ourIndex = children.findIndexOf { it.equals(myLockNode) }
String predecessor = children.get(ourIndex - 1)
println("\tPredecessor znode identified: '$name/$predecessor'")
def watcher = new WaitingWatcher();
boolean predecessorExists = zk.exists("$name/$predecessor", watcher)
if (predecessorExists) {
println("\tWatcher on predecessor znode '$name/$predecessor' created.")
watcher.waitForWatchEvent();
println("\tWatch event for predecessor znode '$name/$predecessor' received.")
}
}
}
void unlock() {
zk.delete("$name/$myLockNode", -1)
println("\tznode '$name/$myLockNode' deleted.")
myLockNode = null;
}
}
/**
* Pseudo watcher which helps to make asynchronous calls synchronous. (Blocks until watch event has been received)
*/
class WaitingWatcher implements Watcher {
private boolean received = false;
@Override
void process(WatchedEvent watchedEvent) {
received = true;
}
void waitForWatchEvent() {
while (!received) {
sleep(50);
}
}
}