Skip to content

Commit a71b44e

Browse files
committed
moved dandelion_enabled from state to Dandelion class as enabled attr
1 parent a209d65 commit a71b44e

File tree

13 files changed

+76
-68
lines changed

13 files changed

+76
-68
lines changed

src/bitmessagemain.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import os
1313
import sys
1414

15+
1516
try:
1617
import pathmagic
1718
except ImportError:
@@ -156,13 +157,6 @@ def start(self):
156157

157158
set_thread_name("PyBitmessage")
158159

159-
state.dandelion_enabled = config.safeGetInt('network', 'dandelion')
160-
# dandelion requires outbound connections, without them,
161-
# stem objects will get stuck forever
162-
if state.dandelion_enabled and not config.safeGetBoolean(
163-
'bitmessagesettings', 'sendoutgoingconnections'):
164-
state.dandelion_enabled = 0
165-
166160
if state.testmode or config.safeGetBoolean(
167161
'bitmessagesettings', 'extralowdifficulty'):
168162
defaults.networkDefaultProofOfWorkNonceTrialsPerByte = int(

src/network/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""
22
Network subsystem package
33
"""
4-
4+
from .dandelion import Dandelion
55
from .threads import StoppableThread
66

7+
dandelion_ins = Dandelion()
78

89
__all__ = ["StoppableThread"]
910

@@ -21,6 +22,11 @@ def start(config, state):
2122
from .receivequeuethread import ReceiveQueueThread
2223
from .uploadthread import UploadThread
2324

25+
# check and set dandelion enabled value at network startup
26+
dandelion_ins.init_dandelion_enabled(config)
27+
# pass pool instance into dandelion class instance
28+
dandelion_ins.init_pool(connectionpool.pool)
29+
2430
readKnownNodes()
2531
connectionpool.pool.connectToStream(1)
2632
for thread in (

src/network/bmobject.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
import protocol
88
import state
9-
import dandelion
109
import connectionpool
10+
from network import dandelion_ins
1111
from highlevelcrypto import calculateInventoryHash
1212

1313
logger = logging.getLogger('default')
@@ -113,7 +113,7 @@ def checkAlreadyHave(self):
113113
or advertise it unnecessarily)
114114
"""
115115
# if it's a stem duplicate, pretend we don't have it
116-
if dandelion.instance.hasHash(self.inventoryHash):
116+
if dandelion_ins.hasHash(self.inventoryHash):
117117
return
118118
if self.inventoryHash in state.Inventory:
119119
raise BMObjectAlreadyHaveError()

src/network/bmproto.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import knownnodes
1616
import protocol
1717
import state
18-
import dandelion
1918
import connectionpool
2019
from bmconfigparser import config
2120
from queues import invQueue, objectProcessorQueue, portCheckerQueue
@@ -27,7 +26,7 @@
2726
BMObjectUnwantedStreamError
2827
)
2928
from network.proxy import ProxyError
30-
29+
from network import dandelion_ins
3130
from node import Node, Peer
3231
from objectracker import ObjectTracker, missingObjects
3332

@@ -351,14 +350,14 @@ def _command_inv(self, extend_dandelion_stem=False):
351350
raise BMProtoExcessiveDataError()
352351

353352
# ignore dinv if dandelion turned off
354-
if extend_dandelion_stem and not state.dandelion_enabled:
353+
if extend_dandelion_stem and not dandelion_ins.enabled:
355354
return True
356355

357356
for i in map(str, items):
358-
if i in state.Inventory and not dandelion.instance.hasHash(i):
357+
if i in state.Inventory and not dandelion_ins.hasHash(i):
359358
continue
360-
if extend_dandelion_stem and not dandelion.instance.hasHash(i):
361-
dandelion.instance.addHash(i, self)
359+
if extend_dandelion_stem and not dandelion_ins.hasHash(i):
360+
dandelion_ins.addHash(i, self)
362361
self.handleReceivedInventory(i)
363362

364363
return True
@@ -420,9 +419,9 @@ def bm_command_object(self):
420419
except KeyError:
421420
pass
422421

423-
if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash(
422+
if self.object.inventoryHash in state.Inventory and dandelion_ins.hasHash(
424423
self.object.inventoryHash):
425-
dandelion.instance.removeHash(
424+
dandelion_ins.removeHash(
426425
self.object.inventoryHash, "cycle detection")
427426

428427
state.Inventory[self.object.inventoryHash] = (
@@ -541,7 +540,7 @@ def bm_command_version(self):
541540
if not self.isOutbound:
542541
self.append_write_buf(protocol.assembleVersionMessage(
543542
self.destination.host, self.destination.port,
544-
connectionpool.pool.streams, True,
543+
connectionpool.pool.streams, dandelion_ins.enabled, True,
545544
nodeid=self.nodeid))
546545
logger.debug(
547546
'%(host)s:%(port)i sending version',

src/network/dandelion.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
from threading import RLock
88
from time import time
99

10-
import connectionpool
11-
import state
12-
from queues import invQueue
1310

1411
# randomise routes after 600 seconds
1512
REASSIGN_INTERVAL = 600
@@ -37,6 +34,8 @@ def __init__(self):
3734
# when to rerandomise routes
3835
self.refresh = time() + REASSIGN_INTERVAL
3936
self.lock = RLock()
37+
self.enabled = None
38+
self.pool = None
4039

4140
@staticmethod
4241
def poissonTimeout(start=None, average=0):
@@ -47,10 +46,23 @@ def poissonTimeout(start=None, average=0):
4746
average = FLUFF_TRIGGER_MEAN_DELAY
4847
return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY
4948

49+
def init_pool(self, pool):
50+
"""pass pool instance"""
51+
self.pool = pool
52+
53+
def init_dandelion_enabled(self, config):
54+
"""Check if Dandelion is enabled and set value in enabled attribute"""
55+
dandelion_enabled = config.safeGetInt('network', 'dandelion')
56+
# dandelion requires outbound connections, without them,
57+
# stem objects will get stuck forever
58+
if not config.safeGetBoolean(
59+
'bitmessagesettings', 'sendoutgoingconnections'):
60+
dandelion_enabled = 0
61+
self.enabled = dandelion_enabled
62+
5063
def addHash(self, hashId, source=None, stream=1):
51-
"""Add inventory vector to dandelion stem"""
52-
if not state.dandelion_enabled:
53-
return
64+
"""Add inventory vector to dandelion stem return status of dandelion enabled"""
65+
assert self.enabled is not None
5466
with self.lock:
5567
self.hashMap[hashId] = Stem(
5668
self.getNodeStem(source),
@@ -89,7 +101,7 @@ def objectChildStem(self, hashId):
89101
"""Child (i.e. next) node for an inventory vector during stem mode"""
90102
return self.hashMap[hashId].child
91103

92-
def maybeAddStem(self, connection):
104+
def maybeAddStem(self, connection, invQueue):
93105
"""
94106
If we had too few outbound connections, add the current one to the
95107
current stem list. Dandelion as designed by the authors should
@@ -163,7 +175,7 @@ def getNodeStem(self, node=None):
163175
self.nodeMap[node] = self.pickStem(node)
164176
return self.nodeMap[node]
165177

166-
def expire(self):
178+
def expire(self, invQueue):
167179
"""Switch expired objects from stem to fluff mode"""
168180
with self.lock:
169181
deadline = time()
@@ -179,19 +191,18 @@ def expire(self):
179191

180192
def reRandomiseStems(self):
181193
"""Re-shuffle stem mapping (parent <-> child pairs)"""
194+
assert self.pool is not None
195+
if self.refresh > time():
196+
return
197+
182198
with self.lock:
183199
try:
184200
# random two connections
185201
self.stem = sample(
186-
connectionpool.BMConnectionPool(
187-
).outboundConnections.values(), MAX_STEMS)
202+
self.pool.outboundConnections.values(), MAX_STEMS)
188203
# not enough stems available
189204
except ValueError:
190-
self.stem = connectionpool.BMConnectionPool(
191-
).outboundConnections.values()
205+
self.stem = self.pool.outboundConnections.values()
192206
self.nodeMap = {}
193207
# hashMap stays to cater for pending stems
194208
self.refresh = time() + REASSIGN_INTERVAL
195-
196-
197-
instance = Dandelion()

src/network/downloadthread.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import addresses
77
import helper_random
88
import protocol
9-
import dandelion
109
import connectionpool
10+
from network import dandelion_ins
1111
from objectracker import missingObjects
1212
from threads import StoppableThread
1313

@@ -60,7 +60,7 @@ def run(self):
6060
payload = bytearray()
6161
chunkCount = 0
6262
for chunk in request:
63-
if chunk in state.Inventory and not dandelion.instance.hasHash(chunk):
63+
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
6464
try:
6565
del i.objectsNewToMe[chunk]
6666
except KeyError:

src/network/invthread.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import addresses
99
import protocol
1010
import state
11-
import dandelion
1211
import connectionpool
12+
from network import dandelion_ins
1313
from queues import invQueue
1414
from threads import StoppableThread
1515

@@ -40,10 +40,10 @@ class InvThread(StoppableThread):
4040
@staticmethod
4141
def handleLocallyGenerated(stream, hashId):
4242
"""Locally generated inventory items require special handling"""
43-
dandelion.instance.addHash(hashId, stream=stream)
43+
dandelion_ins.addHash(hashId, stream=stream)
4444
for connection in connectionpool.pool.connections():
45-
if state.dandelion_enabled and connection != \
46-
dandelion.instance.objectChildStem(hashId):
45+
if dandelion_ins.enabled and connection != \
46+
dandelion_ins.objectChildStem(hashId):
4747
continue
4848
connection.objectsNewToThem[hashId] = time()
4949

@@ -52,7 +52,7 @@ def run(self): # pylint: disable=too-many-branches
5252
chunk = []
5353
while True:
5454
# Dandelion fluff trigger by expiration
55-
handleExpiredDandelion(dandelion.instance.expire())
55+
handleExpiredDandelion(dandelion_ins.expire(invQueue))
5656
try:
5757
data = invQueue.get(False)
5858
chunk.append((data[0], data[1]))
@@ -75,10 +75,10 @@ def run(self): # pylint: disable=too-many-branches
7575
except KeyError:
7676
continue
7777
try:
78-
if connection == dandelion.instance.objectChildStem(inv[1]):
78+
if connection == dandelion_ins.objectChildStem(inv[1]):
7979
# Fluff trigger by RNG
8080
# auto-ignore if config set to 0, i.e. dandelion is off
81-
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
81+
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
8282
fluffs.append(inv[1])
8383
# send a dinv only if the stem node supports dandelion
8484
elif connection.services & protocol.NODE_DANDELION > 0:
@@ -105,7 +105,6 @@ def run(self): # pylint: disable=too-many-branches
105105
for _ in range(len(chunk)):
106106
invQueue.task_done()
107107

108-
if dandelion.instance.refresh < time():
109-
dandelion.instance.reRandomiseStems()
108+
dandelion_ins.reRandomiseStems()
110109

111110
self.stop.wait(1)

src/network/objectracker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import time
55
from threading import RLock
66

7-
import dandelion
87
import connectionpool
8+
from network import dandelion_ins
99
from randomtrackingdict import RandomTrackingDict
1010

1111
haveBloom = False
@@ -107,14 +107,14 @@ def handleReceivedObject(self, streamNumber, hashid):
107107
del i.objectsNewToMe[hashid]
108108
except KeyError:
109109
if streamNumber in i.streams and (
110-
not dandelion.instance.hasHash(hashid)
111-
or dandelion.instance.objectChildStem(hashid) == i):
110+
not dandelion_ins.hasHash(hashid)
111+
or dandelion_ins.objectChildStem(hashid) == i):
112112
with i.objectsNewToThemLock:
113113
i.objectsNewToThem[hashid] = time.time()
114114
# update stream number,
115115
# which we didn't have when we just received the dinv
116116
# also resets expiration of the stem mode
117-
dandelion.instance.setHashStream(hashid, streamNumber)
117+
dandelion_ins.setHashStream(hashid, streamNumber)
118118

119119
if i == self:
120120
try:

src/network/tcp.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
import l10n
1616
import protocol
1717
import state
18-
import dandelion
1918
import connectionpool
2019
from bmconfigparser import config
2120
from highlevelcrypto import randomBytes
21+
from network import dandelion_ins
2222
from queues import invQueue, receiveDataQueue, UISignalQueue
2323
from tr import _translate
2424

@@ -169,7 +169,7 @@ def set_connection_fully_established(self):
169169
knownnodes.increaseRating(self.destination)
170170
knownnodes.addKnownNode(
171171
self.streams, self.destination, time.time())
172-
dandelion.instance.maybeAddStem(self)
172+
dandelion_ins.maybeAddStem(self, invQueue)
173173
self.sendAddr()
174174
self.sendBigInv()
175175

@@ -231,7 +231,7 @@ def sendChunk():
231231
with self.objectsNewToThemLock:
232232
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
233233
# don't advertise stem objects on bigInv
234-
if dandelion.instance.hasHash(objHash):
234+
if dandelion_ins.hasHash(objHash):
235235
continue
236236
bigInvList[objHash] = 0
237237
objectCount = 0
@@ -268,7 +268,7 @@ def handle_connect(self):
268268
self.append_write_buf(
269269
protocol.assembleVersionMessage(
270270
self.destination.host, self.destination.port,
271-
connectionpool.pool.streams,
271+
connectionpool.pool.streams, dandelion_ins.enabled,
272272
False, nodeid=self.nodeid))
273273
self.connectedAt = time.time()
274274
receiveDataQueue.put(self.destination)
@@ -293,7 +293,7 @@ def handle_close(self):
293293
if host_is_global:
294294
knownnodes.addKnownNode(
295295
self.streams, self.destination, time.time())
296-
dandelion.instance.maybeRemoveStem(self)
296+
dandelion_ins.maybeRemoveStem(self)
297297
else:
298298
self.checkTimeOffsetNotification()
299299
if host_is_global:
@@ -319,7 +319,7 @@ def state_proxy_handshake_done(self):
319319
self.append_write_buf(
320320
protocol.assembleVersionMessage(
321321
self.destination.host, self.destination.port,
322-
connectionpool.pool.streams,
322+
connectionpool.pool.streams, dandelion_ins.enabled,
323323
False, nodeid=self.nodeid))
324324
self.set_state("bm_header", expectBytes=protocol.Header.size)
325325
return True
@@ -343,7 +343,7 @@ def state_proxy_handshake_done(self):
343343
self.append_write_buf(
344344
protocol.assembleVersionMessage(
345345
self.destination.host, self.destination.port,
346-
connectionpool.pool.streams,
346+
connectionpool.pool.streams, dandelion_ins.enabled,
347347
False, nodeid=self.nodeid))
348348
self.set_state("bm_header", expectBytes=protocol.Header.size)
349349
return True

0 commit comments

Comments
 (0)