Skip to content

Commit 96ccf2e

Browse files
authored
Merge pull request #11 from nzlosh/master
Implementation basic locking using consul keystore/session
2 parents ec19a17 + 010db1f commit 96ccf2e

File tree

8 files changed

+237
-4
lines changed

8 files changed

+237
-4
lines changed

CHANGES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Change Log
22

3+
## [0.6.3] 25 Jan 2018
4+
5+
### Added
6+
- Implementation for basic locking using consul key store/session.
7+
8+
### Changed
9+
- Make acl_list return a list instead of a string.
10+
311
## [0.6.2] 8 Jan 2018
412

513
### Added

actions/custom_lock.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from lib import action
2+
3+
4+
class ConsulCustomLockAction(action.ConsulBaseAction):
5+
"""
6+
:key_prefix: (string) Prefix the lock name which is the path in consul's kvstore.
7+
:max_locks: (integer) Maximum number of concurrent lock holders.
8+
:name: (string) Name of the key that will hold the locking information.
9+
"""
10+
def run(self, key_prefix, max_locks=1, acquire_timeout=10, name="CustomLock",
11+
node=None, checks=None, behavior="release", ttl=None, token=None, dc=None):
12+
self.lock_manager = action.LockManager(self.consul, key_prefix, name, node, token, dc)
13+
return self.lock_manager.lock(max_locks, acquire_timeout, checks, behavior, ttl)

actions/custom_lock.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
name: custom_lock
2+
runner_type: python-script
3+
description: "Create a lock or semaphore. Used for synchronising discret workflows."
4+
enabled: true
5+
entry_point: "custom_lock.py"
6+
parameters:
7+
key_prefix:
8+
type: string
9+
description: KV path to prefix to lock name.
10+
required: true
11+
max_locks:
12+
type: integer
13+
description: Maximum number of concurrent holders for the lock.
14+
default: 1
15+
required: false
16+
acquire_timeout:
17+
type: integer
18+
description: Number of seconds to wait for the lock to be acquired.
19+
default: 10
20+
required: false
21+
name:
22+
type: string
23+
description: The name of the lock.
24+
required: false
25+
node:
26+
type: string
27+
description: The node that will establish and maintain the lock.
28+
required: false
29+
checks:
30+
type: array
31+
description: A list of the health checks used to determine if the lock should be invalidated.
32+
required: false
33+
behavior:
34+
type: string
35+
enum:
36+
- release
37+
- delete
38+
description: The behavior to use when the lock expires (release or delete).
39+
default: release
40+
required: false
41+
ttl:
42+
type: integer
43+
description: Time to Live before the lock expires (between 10 and 86400 seconds).
44+
required: false
45+
token:
46+
type: string
47+
description: An optional ACL token to apply to this request.
48+
required: false
49+
dc:
50+
type: string
51+
description: Optional datacenter that you wish to communicate with.
52+
required: false

actions/custom_unlock.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from lib import action
2+
3+
4+
class ConsulCustomUnLockAction(action.ConsulBaseAction):
5+
"""
6+
:session_id: (string) the session id that holds the lock to a key.
7+
:key_prefix: (string) Prefix the lock name which is the path in consul's kvstore.
8+
:name: (string) Name of the key that will hold the locking information.
9+
"""
10+
def run(self, session_id, key_prefix, name, node=None, token=None, dc=None):
11+
self.lock_manager = action.LockManager(self.consul, key_prefix, name, node, token, dc)
12+
return self.lock_manager.unlock(session_id)

actions/custom_unlock.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
name: custom_unlock
2+
runner_type: python-script
3+
description: "Release a lock or semaphore. Used for synchronising discret workflows."
4+
enabled: true
5+
entry_point: "custom_unlock.py"
6+
parameters:
7+
session_id:
8+
type: string
9+
description: The session id that currently holds the lock on the key.
10+
required: true
11+
key_prefix:
12+
type: string
13+
description: KV path to prefix to lock name.
14+
required: true
15+
name:
16+
type: string
17+
description: The name of the lock.
18+
required: true
19+
node:
20+
type: string
21+
description: The node that will establish and maintain the lock.
22+
required: false
23+
token:
24+
type: string
25+
description: An optional ACL token to apply to this request.
26+
required: false
27+
dc:
28+
type: string
29+
description: Optional datacenter that you wish to communicate with.
30+
required: false

actions/kv_put.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def run(self,
1616
if to_json:
1717
value = self.to_json(value)
1818

19-
return (True, self.consul.kv.put(
19+
return self.consul.kv.put(
2020
key,
2121
value,
2222
cas=cas,
@@ -25,4 +25,4 @@ def run(self,
2525
release=release,
2626
token=token,
2727
dc=dc
28-
))
28+
)

actions/lib/action.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1+
import six
12
import json
3+
import random
4+
import time
5+
26
from st2common import log as logging
37
from st2common.runners.base_action import Action
48

59
# http://python-consul.readthedocs.org/en/latest/#
610
import consul
711

12+
LOG = logging.getLogger(__name__)
13+
814

915
class ConsulBaseAction(Action):
1016

1117
def __init__(self, config):
1218
super(ConsulBaseAction, self).__init__(config)
1319
self.consul = self._get_client()
14-
self.logger = logging.getLogger(__name__)
1520

1621
def _get_client(self):
1722
dc = self.config.get('dc')
@@ -41,3 +46,116 @@ def from_json(self, value):
4146
except ValueError:
4247
pass
4348
return value
49+
50+
51+
class LockManager(object):
52+
semaphore = {
53+
"Limit": 0,
54+
"Holders": {}
55+
}
56+
57+
def __init__(self, client, key_prefix, name, node, token, dc):
58+
"""
59+
Initialise object with common variables used to create or destroy a lock.
60+
"""
61+
self.client = client
62+
self.key_prefix = key_prefix
63+
self.name = name
64+
self.node = node
65+
self.token = token
66+
self.dc = dc
67+
68+
def lock(self, max_locks, acquire_timeout, checks, behavior, ttl):
69+
"""
70+
Method called by the Lock action.
71+
"""
72+
self.max_locks = max_locks
73+
self.wait_timeout = acquire_timeout
74+
self.checks = checks
75+
self.behavior = behavior
76+
self.ttl = ttl
77+
78+
if self.max_locks > 1:
79+
result = self.create_semaphore()
80+
else:
81+
result = self.create_lock("/".join([self.key_prefix, self.name, '.lock']))
82+
return result
83+
84+
def unlock(self, session_id):
85+
"""
86+
Method called by the Unlock action.
87+
"""
88+
key = "/".join([self.key_prefix, self.name, '.lock'])
89+
return self.release_lock(key, session_id)
90+
91+
def create_semaphore(self):
92+
raise NotImplementedError
93+
94+
def release_lock(self, key, session_id):
95+
"""
96+
Release the lock on the key and destroy the session.
97+
"""
98+
result = (False, "Failed to release lock.")
99+
if self.client.kv.put(
100+
key=key,
101+
value=None,
102+
release=session_id,
103+
token=self.token,
104+
dc=self.dc
105+
) is True:
106+
if self.destroy_session(session_id) is True:
107+
result = (True, "Lock released and session destroyed.")
108+
else:
109+
result = (True, "Lock released but session could not be destroyed.")
110+
return result
111+
112+
def destroy_session(self, session_id):
113+
self.client.session.destroy(session_id, self.dc)
114+
115+
def create_lock(self, key_name):
116+
result = (False, "")
117+
session_id = self.create_session()
118+
if isinstance(session_id, six.string_types) and len(session_id) > 0:
119+
if self.acquire_lock(session_id, key_name, value=session_id):
120+
result = (True, session_id)
121+
else:
122+
result = (False, "Failed to acquire lock on key.")
123+
self.destroy_session(session_id)
124+
else:
125+
result = (False, "Failed to create a session")
126+
return result
127+
128+
def create_session(self):
129+
"""
130+
https://www.consul.io/docs/internals/sessions.html
131+
"""
132+
return self.client.session.create(
133+
name=self.name,
134+
node=self.node,
135+
checks=self.checks,
136+
lock_delay=5,
137+
behavior=self.behavior,
138+
ttl=self.ttl,
139+
dc=self.dc
140+
)
141+
142+
def acquire_lock(self, session_id, key, cas=None, value=""):
143+
"""
144+
wait_timeout is used to determine when to abandon the lock acquisition.
145+
"""
146+
result = False
147+
timeout = time.time() + self.wait_timeout
148+
LOG.info("Acquire lock timeout: {}".format(timeout))
149+
while time.time() < timeout:
150+
result = self.client.kv.put(
151+
key=key,
152+
value=value,
153+
cas=cas,
154+
acquire=session_id,
155+
token=self.token,
156+
dc=self.dc
157+
)
158+
if result is True:
159+
break
160+
time.sleep(random.random())
161+
return result

pack.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
ref: consul
33
name: consul
44
description: consul
5-
version: 0.6.2
5+
version: 0.6.3
66
author: jfryman
77

0 commit comments

Comments
 (0)