Skip to content

Commit dab0659

Browse files
committed
Ignore failing nodes with increasing probability
1 parent 98b57c9 commit dab0659

File tree

3 files changed

+36
-9
lines changed

3 files changed

+36
-9
lines changed

ers/api.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010

1111
from hashlib import md5
1212
from socket import gethostname
13+
from collections import Counter
14+
from random import randrange
1315

1416
import store
17+
from timeout import TimeoutError
1518

1619
class ERSReadOnly(object):
1720
""" ERS version with read-only methods.
@@ -26,6 +29,7 @@ def __init__(self,
2629
local_only=False):
2730
self._local_only = local_only
2831
self.fixed_peers = [] if self._local_only else list(fixed_peers)
32+
self._timeout_count = Counter()
2933
self.store = store.LocalStore()
3034
self._init_host_urn()
3135

@@ -39,6 +43,13 @@ def get_machine_uuid(self):
3943
'''
4044
return str(uuid.getnode())
4145

46+
def _is_failing(url):
47+
"""
48+
Returns True for url's which failed to respond with increasing probability.
49+
Returns False for url's which did not fail.
50+
"""
51+
return randrange(self._timeout_count[url] + 1) != 0
52+
4253
def get_entity(self, entity_name):
4354
'''
4455
Create an entity object, fill it will all the relevant documents
@@ -56,13 +67,21 @@ def get_entity(self, entity_name):
5667
# Get documents out of public/cache of connected peers
5768
# TODO parallelize
5869
for peer in self.get_peers():
70+
url = peer['server_url']
71+
if self._is_failing(url):
72+
continue
73+
5974
remote_docs = []
6075
try:
61-
remote_docs = store.query_remote(peer['server_url'],
62-
'docs_by_entity', entity_name)
76+
remote_docs = store.query_remote(url, 'docs_by_entity', entity_name)
77+
except TimeoutError:
78+
self._timeout_count[url] += 1
79+
sys.stderr.write("Incremented timeout count for {0}: {1}\n".format(
80+
url, self._timeout_count[url]))
6381
except Exception as e:
6482
sys.stderr.write("Warning: failed to query remote peer {0}. Error: {1}\n".format(peer, e))
6583
else:
84+
self._timeout_count.pop(url, 0)
6685
for doc in remote_docs:
6786
entity.add_document(doc, 'remote')
6887
return entity
@@ -81,13 +100,21 @@ def search(self, prop, value=None):
81100

82101
# Search peers
83102
for peer in self.get_peers():
103+
url = peer['server_url']
104+
if self._is_failing(url):
105+
continue
106+
84107
remote_result = []
85108
try:
86-
remote_result = store.query_remote(peer['server_url'],
87-
'by_property_value', prop, value)
109+
remote_result = store.query_remote(url, 'by_property_value', prop, value)
110+
except TimeoutError:
111+
self._timeout_count[url] += 1
112+
sys.stderr.write("Incremented timeout count for {0}: {1}\n".format(
113+
url, self._timeout_count[url]))
88114
except Exception as e:
89115
sys.stderr.write("Warning: failed to query remote peer {0}. Error: {1}\n".format(peer, e))
90116
else:
117+
self._timeout_count.pop(url, 0)
91118
result.update(remote_result)
92119

93120
return list(result)

ers/store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from itertools import chain
3030
from timeout import timeout
3131

32-
REMOTE_SERVER_TIMEOUT = 1
32+
REMOTE_SERVER_TIMEOUT = 0.3
3333

3434
DEFAULT_STORE_URI = 'http://127.0.0.1:5984'
3535
DEFAULT_AUTH = ['admin', 'admin']

ers/timeout.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
>>> sleep(1)
1313
Traceback (most recent call last):
1414
...
15-
TimeoutException: timed out after 0 seconds
15+
TimeoutError: timed out after 0 seconds
1616
1717
>>> sleep(.2)
1818
0.2
@@ -31,10 +31,10 @@
3131
import time
3232
import logging
3333
logger = multiprocessing.log_to_stderr()
34-
logger.setLevel(logging.INFO)
34+
logger.setLevel(logging.ERROR)
3535

3636

37-
class TimeoutException(Exception):
37+
class TimeoutError(Exception):
3838
pass
3939

4040

@@ -69,7 +69,7 @@ def inner(*args, **kwargs):
6969
if force_kill:
7070
proc.terminate()
7171
runtime = int(time.time() - now)
72-
raise TimeoutException('timed out after {0} seconds'.format(runtime))
72+
raise TimeoutError('timed out after {0} seconds'.format(runtime))
7373
assert proc.done()
7474
success, result = proc.result()
7575
if success:

0 commit comments

Comments
 (0)