Skip to content

Commit d926fad

Browse files
committed
Merge branch 'release/v0.15'
2 parents ab527b5 + ea1b3a2 commit d926fad

File tree

6 files changed

+141
-21
lines changed

6 files changed

+141
-21
lines changed

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@
5555
# built documents.
5656
#
5757
# The short X.Y version.
58-
version = '0.14'
58+
version = '0.15'
5959
# The full version, including alpha/beta/rc tags.
60-
release = '0.14'
60+
release = '0.15'
6161

6262
# The language for content autogenerated by Sphinx. Refer to documentation
6363
# for a list of supported languages.

ers/api.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
"""
22
ers.api
33
4-
Provides class ERS implementing API to Entity Rgistry System.
4+
Provides class ERS implementing API to Entity Registry System.
55
66
"""
77
import re
88
import sys
9-
import uuid
109

1110
from hashlib import md5
1211
from socket import gethostname
12+
from collections import Counter
13+
from random import randrange
1314

1415
import store
16+
from timeout import TimeoutError
1517

1618
class ERSReadOnly(object):
1719
""" ERS version with read-only methods.
@@ -26,18 +28,30 @@ def __init__(self,
2628
local_only=False):
2729
self._local_only = local_only
2830
self.fixed_peers = [] if self._local_only else list(fixed_peers)
31+
self._timeout_count = Counter()
2932
self.store = store.LocalStore()
3033
self._init_host_urn()
3134

3235
def _init_host_urn(self):
33-
fingerprint = md5(gethostname()).hexdigest()
34-
self.host_urn = "urn:ers:host:{}".format(fingerprint)
36+
# Use uuid provided by CouchDB 1.3+, fallback to hostname fingerprint
37+
try:
38+
uid = self.store.info()['uuid']
39+
except KeyError:
40+
uid = md5(gethostname()).hexdigest()
41+
self.host_urn = "urn:ers:host:{}".format(uid)
3542

3643
def get_machine_uuid(self):
3744
'''
3845
@return a unique identifier for this ERS node
3946
'''
40-
return str(uuid.getnode())
47+
return self.host_urn.split(':')[-1]
48+
49+
def _is_failing(self, url):
50+
"""
51+
Returns True for url's which failed to respond with increasing probability.
52+
Returns False for url's which did not fail.
53+
"""
54+
return randrange(self._timeout_count[url] + 1) != 0
4155

4256
def get_entity(self, entity_name):
4357
'''
@@ -54,15 +68,23 @@ def get_entity(self, entity_name):
5468
entity.add_document(doc, source)
5569

5670
# Get documents out of public/cache of connected peers
71+
# TODO parallelize
5772
for peer in self.get_peers():
73+
url = peer['server_url']
74+
if self._is_failing(url):
75+
continue
76+
5877
remote_docs = []
5978
try:
60-
remote_store = store.RemoteStore(peer['server_url'])
61-
remote_docs.extend(remote_store.docs_by_entity(entity_name))
62-
except:
63-
sys.stderr.write("Warning: failed to query remote peer {0}\n".format(peer))
64-
continue
79+
remote_docs = store.query_remote(url, 'docs_by_entity', entity_name)
80+
except TimeoutError:
81+
self._timeout_count[url] += 1
82+
sys.stderr.write("Incremented timeout count for {0}: {1}\n".format(
83+
url, self._timeout_count[url]))
84+
except Exception as e:
85+
sys.stderr.write("Warning: failed to query remote peer {0}. Error: {1}\n".format(peer, e))
6586
else:
87+
self._timeout_count.pop(url, 0)
6688
for doc in remote_docs:
6789
entity.add_document(doc, 'remote')
6890
return entity
@@ -81,13 +103,21 @@ def search(self, prop, value=None):
81103

82104
# Search peers
83105
for peer in self.get_peers():
106+
url = peer['server_url']
107+
if self._is_failing(url):
108+
continue
109+
84110
remote_result = []
85111
try:
86-
remote_store = store.RemoteStore(peer['server_url'])
87-
remote_result = set(remote_store.by_property_value(prop, value))
88-
except:
89-
sys.stderr.write("Warning: failed to query remote peer {0}\n".format(peer))
112+
remote_result = store.query_remote(url, 'by_property_value', prop, value)
113+
except TimeoutError:
114+
self._timeout_count[url] += 1
115+
sys.stderr.write("Incremented timeout count for {0}: {1}\n".format(
116+
url, self._timeout_count[url]))
117+
except Exception as e:
118+
sys.stderr.write("Warning: failed to query remote peer {0}. Error: {1}\n".format(peer, e))
90119
else:
120+
self._timeout_count.pop(url, 0)
91121
result.update(remote_result)
92122

93123
return list(result)

ers/store.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727

2828
from functools import partial
2929
from itertools import chain
30+
from timeout import timeout
3031

31-
REMOTE_SERVER_TIMEOUT = 300
32+
REMOTE_SERVER_TIMEOUT = 0.3
3233

3334
DEFAULT_STORE_URI = 'http://127.0.0.1:5984'
3435
DEFAULT_AUTH = ['admin', 'admin']
@@ -218,6 +219,12 @@ def update_replicator_docs(self, repl_docs):
218219
RemoteStore = partial(Store, databases=REMOTE_DBS, timeout=REMOTE_SERVER_TIMEOUT)
219220

220221

222+
@timeout(REMOTE_SERVER_TIMEOUT)
223+
def query_remote(uri, method_name, *args, **kwargs):
224+
# import ipdb; ipdb.set_trace()
225+
remote_store = RemoteStore(uri)
226+
return list(getattr(remote_store, method_name)(*args, **kwargs))
227+
221228
def reset_local_store(auth=DEFAULT_AUTH):
222229
user, password = auth
223230
store = LocalStore(filters=[restkit.BasicAuth(user, password)])

ers/timeout.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Source: http://code.activestate.com/recipes/577853-timeout-decorator-with-multiprocessing/
2+
3+
"""
4+
Code to timeout with processes.
5+
6+
>>> @timeout(.5)
7+
... def sleep(x):
8+
... print "ABOUT TO SLEEP {0} SECONDS".format(x)
9+
... time.sleep(x)
10+
... return x
11+
12+
>>> sleep(1)
13+
Traceback (most recent call last):
14+
...
15+
TimeoutError: timed out after 0 seconds
16+
17+
>>> sleep(.2)
18+
0.2
19+
20+
>>> @timeout(.5)
21+
... def exc():
22+
... raise Exception('Houston we have problems!')
23+
24+
>>> exc()
25+
Traceback (most recent call last):
26+
...
27+
Exception: Houston we have problems!
28+
29+
"""
30+
import multiprocessing
31+
import time
32+
import logging
33+
logger = multiprocessing.log_to_stderr()
34+
logger.setLevel(logging.ERROR)
35+
36+
37+
class TimeoutError(Exception):
38+
pass
39+
40+
41+
class RunableProcessing(multiprocessing.Process):
42+
def __init__(self, func, *args, **kwargs):
43+
self.queue = multiprocessing.Queue(maxsize=1)
44+
args = (func,) + args
45+
multiprocessing.Process.__init__(self, target=self.run_func, args=args, kwargs=kwargs)
46+
47+
def run_func(self, func, *args, **kwargs):
48+
try:
49+
result = func(*args, **kwargs)
50+
self.queue.put((True, result))
51+
except Exception as e:
52+
self.queue.put((False, e))
53+
54+
def done(self):
55+
return self.queue.full()
56+
57+
def result(self):
58+
return self.queue.get()
59+
60+
61+
def timeout(seconds, force_kill=True):
62+
def wrapper(function):
63+
def inner(*args, **kwargs):
64+
now = time.time()
65+
proc = RunableProcessing(function, *args, **kwargs)
66+
proc.start()
67+
proc.join(seconds)
68+
if proc.is_alive():
69+
if force_kill:
70+
proc.terminate()
71+
runtime = int(time.time() - now)
72+
raise TimeoutError('timed out after {0} seconds'.format(runtime))
73+
assert proc.done()
74+
success, result = proc.result()
75+
if success:
76+
return result
77+
else:
78+
raise result
79+
return inner
80+
return wrapper
81+
82+
83+
if __name__ == '__main__':
84+
import doctest
85+
doctest.testmod()

ers/utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
21
from collections import defaultdict
32
from StringIO import StringIO
4-
import rdflib
53

6-
# Document model is used to store data in CouchDB. The API is independent from the choice of model.
4+
import rdflib
75

86
def import_nt(registry, file_name, target_graph):
97
""" Import N-Triples file.

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
requirements = open('requirements.txt').read().splitlines()
66
setup(name='ERS',
7-
version='0.14',
7+
version='0.15',
88
description='Entity Registry System',
99
url='https://github.com/ers-devs/ers-node/',
1010
packages=['ers'],

0 commit comments

Comments
 (0)