11import gevent
22from gevent .event import Event
3+
34from collections import defaultdict
5+ import logging
46
57from honeybadgerbft .exceptions import RedundantMessageError , AbandonedNodeError
68
79
10+ logger = logging .getLogger (__name__ )
11+
12+
13+ def handle_conf_messages (* , sender , message , conf_values , pid , bv_signal ):
14+ _ , r , v = message
15+ assert v in ((0 ,), (1 ,), (0 , 1 ))
16+ if sender in conf_values [r ][v ]:
17+ logger .warn (f'Redundant CONF received { message } by { sender } ' ,
18+ extra = {'nodeid' : pid , 'epoch' : r })
19+ # FIXME: Raise for now to simplify things & be consistent
20+ # with how other TAGs are handled. Will replace the raise
21+ # with a continue statement as part of
22+ # https://github.com/initc3/HoneyBadgerBFT-Python/issues/10
23+ raise RedundantMessageError (
24+ 'Redundant CONF received {}' .format (message ))
25+
26+ conf_values [r ][v ].add (sender )
27+ logger .debug (
28+ f'add v = { v } to conf_value[{ r } ] = { conf_values [r ]} ' ,
29+ extra = {'nodeid' : pid , 'epoch' : r },
30+ )
31+
32+ bv_signal .set ()
33+
34+
35+ def wait_for_conf_values (* , pid , N , f , epoch , conf_sent , bin_values ,
36+ values , conf_values , bv_signal , broadcast ):
37+ conf_sent [epoch ][tuple (values )] = True
38+ logger .debug (f"broadcast { ('CONF' , epoch , tuple (values ))} " ,
39+ extra = {'nodeid' : pid , 'epoch' : epoch })
40+ broadcast (('CONF' , epoch , tuple (bin_values [epoch ])))
41+ while True :
42+ logger .debug (
43+ f'looping ... conf_values[epoch] is: { conf_values [epoch ]} ' ,
44+ extra = {'nodeid' : pid , 'epoch' : epoch },
45+ )
46+ if 1 in bin_values [epoch ] and len (conf_values [epoch ][(1 ,)]) >= N - f :
47+ return set ((1 ,))
48+ if 0 in bin_values [epoch ] and len (conf_values [epoch ][(0 ,)]) >= N - f :
49+ return set ((0 ,))
50+ if (sum (len (senders ) for conf_value , senders in
51+ conf_values [epoch ].items () if senders and
52+ set (conf_value ).issubset (bin_values [epoch ])) >= N - f ):
53+ return set ((0 , 1 ))
54+
55+ bv_signal .clear ()
56+ bv_signal .wait ()
57+
58+
859def binaryagreement (sid , pid , N , f , coin , input , decide , broadcast , receive ):
960 """Binary consensus from [MMR14]. It takes an input ``vi`` and will
1061 finally write the decided value into ``decide`` channel.
@@ -23,7 +74,9 @@ def binaryagreement(sid, pid, N, f, coin, input, decide, broadcast, receive):
2374 # Messages received are routed to either a shared coin, the broadcast, or AUX
2475 est_values = defaultdict (lambda : [set (), set ()])
2576 aux_values = defaultdict (lambda : [set (), set ()])
77+ conf_values = defaultdict (lambda : {(0 ,): set (), (1 ,): set (), (0 , 1 ): set ()})
2678 est_sent = defaultdict (lambda : [False , False ])
79+ conf_sent = defaultdict (lambda : {(0 ,): False , (1 ,): False , (0 , 1 ): False })
2780 bin_values = defaultdict (set )
2881
2982 # This event is triggered whenever bin_values or aux_values changes
@@ -32,6 +85,8 @@ def binaryagreement(sid, pid, N, f, coin, input, decide, broadcast, receive):
3285 def _recv ():
3386 while True : # not finished[pid]:
3487 (sender , msg ) = receive ()
88+ logger .debug (f'receive { msg } from node { sender } ' ,
89+ extra = {'nodeid' : pid , 'epoch' : msg [1 ]})
3590 assert sender in range (N )
3691 if msg [0 ] == 'EST' :
3792 # BV_Broadcast message
@@ -41,7 +96,11 @@ def _recv():
4196 # FIXME: raise or continue? For now will raise just
4297 # because it appeared first, but maybe the protocol simply
4398 # needs to continue.
44- print ('Redundant EST received' , msg )
99+ print (f'Redundant EST received by { sender } ' , msg )
100+ logger .warn (
101+ f'Redundant EST message received by { sender } : { msg } ' ,
102+ extra = {'nodeid' : pid , 'epoch' : msg [1 ]}
103+ )
45104 raise RedundantMessageError (
46105 'Redundant EST received {}' .format (msg ))
47106 # continue
@@ -51,10 +110,18 @@ def _recv():
51110 if len (est_values [r ][v ]) >= f + 1 and not est_sent [r ][v ]:
52111 est_sent [r ][v ] = True
53112 broadcast (('EST' , r , v ))
113+ logger .debug (f"broadcast { ('EST' , r , v )} " ,
114+ extra = {'nodeid' : pid , 'epoch' : r })
54115
55116 # Output after reaching second threshold
56117 if len (est_values [r ][v ]) >= 2 * f + 1 :
118+ logger .debug (
119+ f'add v = { v } to bin_value[{ r } ] = { bin_values [r ]} ' ,
120+ extra = {'nodeid' : pid , 'epoch' : r },
121+ )
57122 bin_values [r ].add (v )
123+ logger .debug (f'bin_values[{ r } ] is now: { bin_values [r ]} ' ,
124+ extra = {'nodeid' : pid , 'epoch' : r })
58125 bv_signal .set ()
59126
60127 elif msg [0 ] == 'AUX' :
@@ -68,11 +135,28 @@ def _recv():
68135 print ('Redundant AUX received' , msg )
69136 raise RedundantMessageError (
70137 'Redundant AUX received {}' .format (msg ))
71- # continue
72138
139+ logger .debug (
140+ f'add sender = { sender } to aux_value[{ r } ][{ v } ] = { aux_values [r ][v ]} ' ,
141+ extra = {'nodeid' : pid , 'epoch' : r },
142+ )
73143 aux_values [r ][v ].add (sender )
144+ logger .debug (
145+ f'aux_value[{ r } ][{ v } ] is now: { aux_values [r ][v ]} ' ,
146+ extra = {'nodeid' : pid , 'epoch' : r },
147+ )
148+
74149 bv_signal .set ()
75150
151+ elif msg [0 ] == 'CONF' :
152+ handle_conf_messages (
153+ sender = sender ,
154+ message = msg ,
155+ conf_values = conf_values ,
156+ pid = pid ,
157+ bv_signal = bv_signal ,
158+ )
159+
76160 # Translate mmr14 broadcast into coin.broadcast
77161 # _coin_broadcast = lambda (r, sig): broadcast(('COIN', r, sig))
78162 # _coin_recv = Queue()
@@ -88,6 +172,9 @@ def _recv():
88172 r = 0
89173 already_decided = None
90174 while True : # Unbounded number of rounds
175+ logger .info (f'Starting with est = { est } ' ,
176+ extra = {'nodeid' : pid , 'epoch' : r })
177+
91178 if not est_sent [r ][est ]:
92179 est_sent [r ][est ] = True
93180 broadcast (('EST' , r , est ))
@@ -98,10 +185,19 @@ def _recv():
98185 bv_signal .wait ()
99186
100187 w = next (iter (bin_values [r ])) # take an element
188+ logger .debug (f"broadcast { ('AUX' , r , w )} " ,
189+ extra = {'nodeid' : pid , 'epoch' : r })
101190 broadcast (('AUX' , r , w ))
102191
103192 values = None
193+ logger .debug (
194+ f'block until at least N-f ({ N - f } ) AUX values are received' ,
195+ extra = {'nodeid' : pid , 'epoch' : r })
104196 while True :
197+ logger .debug (f'bin_values[{ r } ]: { bin_values [r ]} ' ,
198+ extra = {'nodeid' : pid , 'epoch' : r })
199+ logger .debug (f'aux_values[{ r } ]: { aux_values [r ]} ' ,
200+ extra = {'nodeid' : pid , 'epoch' : r })
105201 # Block until at least N-f AUX values are received
106202 if 1 in bin_values [r ] and len (aux_values [r ][1 ]) >= N - f :
107203 values = set ((1 ,))
@@ -118,8 +214,37 @@ def _recv():
118214 bv_signal .clear ()
119215 bv_signal .wait ()
120216
217+ logger .debug (f'Completed AUX phase with values = { values } ' ,
218+ extra = {'nodeid' : pid , 'epoch' : r })
219+
220+ # CONF phase
221+ logger .debug (
222+ f'block until at least N-f ({ N - f } ) CONF values are received' ,
223+ extra = {'nodeid' : pid , 'epoch' : r })
224+ if not conf_sent [r ][tuple (values )]:
225+ values = wait_for_conf_values (
226+ pid = pid ,
227+ N = N ,
228+ f = f ,
229+ epoch = r ,
230+ conf_sent = conf_sent ,
231+ bin_values = bin_values ,
232+ values = values ,
233+ conf_values = conf_values ,
234+ bv_signal = bv_signal ,
235+ broadcast = broadcast ,
236+ )
237+ logger .debug (f'Completed CONF phase with values = { values } ' ,
238+ extra = {'nodeid' : pid , 'epoch' : r })
239+
240+ logger .debug (
241+ f'Block until receiving the common coin value' ,
242+ extra = {'nodeid' : pid , 'epoch' : r },
243+ )
121244 # Block until receiving the common coin value
122245 s = coin (r )
246+ logger .info (f'Received coin with value = { s } ' ,
247+ extra = {'nodeid' : pid , 'epoch' : r })
123248
124249 try :
125250 est , already_decided = set_new_estimate (
@@ -130,6 +255,8 @@ def _recv():
130255 )
131256 except AbandonedNodeError :
132257 # print('[sid:%s] [pid:%d] QUITTING in round %d' % (sid,pid,r)))
258+ logger .debug (f'QUIT!' ,
259+ extra = {'nodeid' : pid , 'epoch' : r })
133260 _thread_recv .kill ()
134261 return
135262
0 commit comments