|
2 | 2 | from gevent.event import Event |
3 | 3 |
|
4 | 4 | from collections import defaultdict |
5 | | -from distutils.util import strtobool |
6 | | -from os import environ |
7 | 5 | import logging |
8 | 6 |
|
9 | 7 | from honeybadgerbft.exceptions import RedundantMessageError, AbandonedNodeError |
10 | 8 |
|
11 | 9 |
|
12 | 10 | logger = logging.getLogger(__name__) |
13 | | -CONF_PHASE = strtobool(environ.get('CONF_PHASE', '1')) |
| 11 | + |
| 12 | + |
| 13 | +def handle_conf_messages(*, sender, message, conf_values, pid, bv_signal): |
| 14 | + _, r, v = message |
| 15 | + assert v in ((0,), (1,), (0, 1)) |
| 16 | + if sender in conf_values[r][v]: |
| 17 | + logger.warn(f'Redundant CONF received {message} by {sender}', |
| 18 | + extra={'nodeid': pid, 'epoch': r}) |
| 19 | + # FIXME: Raise for now to simplify things & be consistent |
| 20 | + # with how other TAGs are handled. Will replace the raise |
| 21 | + # with a continue statement as part of |
| 22 | + # https://github.com/initc3/HoneyBadgerBFT-Python/issues/10 |
| 23 | + raise RedundantMessageError( |
| 24 | + 'Redundant CONF received {}'.format(message)) |
| 25 | + |
| 26 | + conf_values[r][v].add(sender) |
| 27 | + logger.debug( |
| 28 | + f'add v = {v} to conf_value[{r}] = {conf_values[r]}', |
| 29 | + extra={'nodeid': pid, 'epoch': r}, |
| 30 | + ) |
| 31 | + |
| 32 | + bv_signal.set() |
| 33 | + |
| 34 | + |
| 35 | +def wait_for_conf_values(*, pid, N, f, epoch, conf_sent, bin_values, |
| 36 | + values, conf_values, bv_signal, broadcast): |
| 37 | + conf_sent[epoch][tuple(values)] = True |
| 38 | + logger.debug(f"broadcast {('CONF', epoch, tuple(values))}", |
| 39 | + extra={'nodeid': pid, 'epoch': epoch}) |
| 40 | + broadcast(('CONF', epoch, tuple(bin_values[epoch]))) |
| 41 | + while True: |
| 42 | + logger.debug( |
| 43 | + f'looping ... conf_values[epoch] is: {conf_values[epoch]}', |
| 44 | + extra={'nodeid': pid, 'epoch': epoch}, |
| 45 | + ) |
| 46 | + if 1 in bin_values[epoch] and len(conf_values[epoch][(1,)]) >= N - f: |
| 47 | + return set((1,)) |
| 48 | + if 0 in bin_values[epoch] and len(conf_values[epoch][(0,)]) >= N - f: |
| 49 | + return set((0,)) |
| 50 | + if (sum(len(senders) for conf_value, senders in |
| 51 | + conf_values[epoch].items() if senders and |
| 52 | + set(conf_value).issubset(bin_values[epoch])) >= N - f): |
| 53 | + return set((0, 1)) |
| 54 | + |
| 55 | + bv_signal.clear() |
| 56 | + bv_signal.wait() |
14 | 57 |
|
15 | 58 |
|
16 | 59 | def binaryagreement(sid, pid, N, f, coin, input, decide, broadcast, receive): |
@@ -105,27 +148,15 @@ def _recv(): |
105 | 148 |
|
106 | 149 | bv_signal.set() |
107 | 150 |
|
108 | | - elif msg[0] == 'CONF' and CONF_PHASE: |
109 | | - _, r, v = msg |
110 | | - assert v in ((0,), (1,), (0, 1)) |
111 | | - if sender in conf_values[r][v]: |
112 | | - logger.warn(f'Redundant CONF received {msg} by {sender}', |
113 | | - extra={'nodeid': pid, 'epoch': r}) |
114 | | - # FIXME: Raise for now to simplify things & be consistent |
115 | | - # with how other TAGs are handled. Will replace the raise |
116 | | - # with a continue statement as part of |
117 | | - # https://github.com/initc3/HoneyBadgerBFT-Python/issues/10 |
118 | | - raise RedundantMessageError( |
119 | | - 'Redundant CONF received {}'.format(msg)) |
120 | | - |
121 | | - conf_values[r][v].add(sender) |
122 | | - logger.debug( |
123 | | - f'add v = {v} to conf_value[{r}] = {conf_values[r]}', |
124 | | - extra={'nodeid': pid, 'epoch': r}, |
| 151 | + elif msg[0] == 'CONF': |
| 152 | + handle_conf_messages( |
| 153 | + sender=sender, |
| 154 | + message=msg, |
| 155 | + conf_values=conf_values, |
| 156 | + pid=pid, |
| 157 | + bv_signal=bv_signal, |
125 | 158 | ) |
126 | 159 |
|
127 | | - bv_signal.set() |
128 | | - |
129 | 160 | # Translate mmr14 broadcast into coin.broadcast |
130 | 161 | # _coin_broadcast = lambda (r, sig): broadcast(('COIN', r, sig)) |
131 | 162 | # _coin_recv = Queue() |
@@ -186,35 +217,23 @@ def _recv(): |
186 | 217 | logger.debug(f'Completed AUX phase with values = {values}', |
187 | 218 | extra={'nodeid': pid, 'epoch': r}) |
188 | 219 |
|
189 | | - # XXX CONF phase |
| 220 | + # CONF phase |
190 | 221 | logger.debug( |
191 | 222 | f'block until at least N-f ({N-f}) CONF values are received', |
192 | 223 | extra={'nodeid': pid, 'epoch': r}) |
193 | | - if CONF_PHASE and not conf_sent[r][tuple(values)]: |
194 | | - conf_sent[r][tuple(values)] = True |
195 | | - logger.debug(f"broadcast {('CONF', r, tuple(values))}", |
196 | | - extra={'nodeid': pid, 'epoch': r}) |
197 | | - broadcast(('CONF', r, tuple(bin_values[r]))) |
198 | | - while True: |
199 | | - logger.debug( |
200 | | - f'looping ... conf_values[r] is: {conf_values[r]}', |
201 | | - extra={'nodeid': pid, 'epoch': r}, |
202 | | - ) |
203 | | - if 1 in bin_values[r] and len(conf_values[r][(1,)]) >= N - f: |
204 | | - values = set((1,)) |
205 | | - break |
206 | | - if 0 in bin_values[r] and len(conf_values[r][(0,)]) >= N - f: |
207 | | - values = set((0,)) |
208 | | - break |
209 | | - if (sum(len(senders) for conf_value, senders in |
210 | | - conf_values[r].items() if senders and |
211 | | - set(conf_value).issubset(bin_values[r])) >= N - f): |
212 | | - values = set((0, 1)) |
213 | | - break |
214 | | - |
215 | | - bv_signal.clear() |
216 | | - bv_signal.wait() |
217 | | - |
| 224 | + if not conf_sent[r][tuple(values)]: |
| 225 | + values = wait_for_conf_values( |
| 226 | + pid=pid, |
| 227 | + N=N, |
| 228 | + f=f, |
| 229 | + epoch=r, |
| 230 | + conf_sent=conf_sent, |
| 231 | + bin_values=bin_values, |
| 232 | + values=values, |
| 233 | + conf_values=conf_values, |
| 234 | + bv_signal=bv_signal, |
| 235 | + broadcast=broadcast, |
| 236 | + ) |
218 | 237 | logger.debug(f'Completed CONF phase with values = {values}', |
219 | 238 | extra={'nodeid': pid, 'epoch': r}) |
220 | 239 |
|
|
0 commit comments