Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 187 additions & 14 deletions plugins/AnnounceShare/AnnounceSharePlugin.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import random
import time
import os
import logging
import json
import atexit
import re

import gevent

Expand All @@ -13,20 +15,118 @@

class TrackerStorage(object):
def __init__(self):
self.site_announcer = None
self.log = logging.getLogger("TrackerStorage")

self.working_tracker_time_interval = 60 * 60
self.tracker_down_time_interval = 60 * 60
self.tracker_discover_time_interval = 5 * 60

self.working_shared_trackers_limit_per_protocol = {}
self.working_shared_trackers_limit_per_protocol["other"] = 2

self.file_path = "%s/trackers.json" % config.data_dir
self.load()
self.time_discover = 0.0
self.time_success = 0.0
atexit.register(self.save)

def initTrackerLimitForProtocol(self):
for s in re.split("[,;]", config.working_shared_trackers_limit_per_protocol):
x = s.split("=", 1)
if len(x) == 1:
x = ["other", x[0]]
try:
self.working_shared_trackers_limit_per_protocol[x[0]] = int(x[1])
except ValueError:
pass
self.log.debug("Limits per protocol: %s" % self.working_shared_trackers_limit_per_protocol)

def getTrackerLimitForProtocol(self, protocol):
l = self.working_shared_trackers_limit_per_protocol
return l.get(protocol, l.get("other"))

def setSiteAnnouncer(self, site_announcer):
if self.site_announcer:
return
self.site_announcer = site_announcer
self.initTrackerLimitForProtocol()
self.recheckValidTrackers()

def isTrackerAddressValid(self, tracker_address):
if not self.site_announcer: # Not completely initialized, skip check
return True

address_parts = self.site_announcer.getAddressParts(tracker_address)
if not address_parts:
self.log.debug("Invalid tracker address: %s" % tracker_address)
return False

handler = self.site_announcer.getTrackerHandler(address_parts["protocol"])
if not handler:
self.log.debug("Invalid tracker address: Unknown protocol %s: %s" % (address_parts["protocol"], tracker_address))
return False

return True

def recheckValidTrackers(self):
trackers = self.getTrackers()
for address, tracker in list(trackers.items()):
if not self.isTrackerAddressValid(address):
del trackers[address]

def getNormalizedTrackerProtocol(self, tracker_address):
if not self.site_announcer:
return None

address_parts = self.site_announcer.getAddressParts(tracker_address)
if not address_parts:
return None

protocol = address_parts["protocol"]
if protocol == "https":
protocol = "http"

return protocol

def getSupportedProtocols(self):
if not self.site_announcer:
return None

supported_trackers = self.site_announcer.getSupportedTrackers()

# If a tracker is in our list, but is absent from the results of getSupportedTrackers(),
# it seems to be supported by software, but forbidden by the settings or network configuration.
# We check and remove thoose trackers here, since onTrackerError() is never emitted for them.
trackers = self.getTrackers()
for tracker_address, tracker in list(trackers.items()):
t = max(trackers[tracker_address]["time_added"],
trackers[tracker_address]["time_success"])
if tracker_address not in supported_trackers and t < time.time() - self.tracker_down_time_interval:
self.log.debug("Tracker %s looks unused, removing." % tracker_address)
del trackers[tracker_address]

protocols = set()
for tracker_address in supported_trackers:
protocol = self.getNormalizedTrackerProtocol(tracker_address)
if not protocol:
continue
protocols.add(protocol)

protocols = list(protocols)

self.log.debug("Supported tracker protocols: %s" % protocols)

return protocols

def getDefaultFile(self):
return {"shared": {}}

def onTrackerFound(self, tracker_address, type="shared", my=False):
if not tracker_address.startswith("zero://"):
if not self.isTrackerAddressValid(tracker_address):
return False

trackers = self.getTrackers()
trackers = self.getTrackers(type)
added = False
if tracker_address not in trackers:
trackers[tracker_address] = {
Expand All @@ -52,6 +152,8 @@ def onTrackerSuccess(self, tracker_address, latency):
trackers[tracker_address]["time_success"] = time.time()
trackers[tracker_address]["num_error"] = 0

self.time_success = time.time()

def onTrackerError(self, tracker_address):
trackers = self.getTrackers()
if tracker_address not in trackers:
Expand All @@ -60,23 +162,60 @@ def onTrackerError(self, tracker_address):
trackers[tracker_address]["time_error"] = time.time()
trackers[tracker_address]["num_error"] += 1

if len(self.getWorkingTrackers()) >= config.working_shared_trackers_limit:
error_limit = 5
else:
error_limit = 30
error_limit
if self.time_success < time.time() - self.tracker_down_time_interval / 2:
# Don't drop trackers from the list, if there haven't been any successful announces recently.
# There may be network connectivity issues.
return

protocol = self.getNormalizedTrackerProtocol(tracker_address) or ""

nr_working_trackers_for_protocol = len(self.getTrackersPerProtocol(working_only=True).get(protocol, []))
nr_working_trackers = len(self.getWorkingTrackers())

error_limit = 30
if nr_working_trackers_for_protocol >= self.getTrackerLimitForProtocol(protocol):
error_limit = 10
if nr_working_trackers >= config.working_shared_trackers_limit:
error_limit = 5

if trackers[tracker_address]["num_error"] > error_limit and trackers[tracker_address]["time_success"] < time.time() - 60 * 60:
if trackers[tracker_address]["num_error"] > error_limit and trackers[tracker_address]["time_success"] < time.time() - self.tracker_down_time_interval:
self.log.debug("Tracker %s looks down, removing." % tracker_address)
del trackers[tracker_address]

def isTrackerWorking(self, tracker_address, type="shared"):
trackers = self.getTrackers(type)
tracker = trackers[tracker_address]
if not tracker:
return False

if tracker["time_success"] > time.time() - self.working_tracker_time_interval:
return True

return False

def getTrackers(self, type="shared"):
return self.file_content.setdefault(type, {})

def getTrackersPerProtocol(self, type="shared", working_only=False):
if not self.site_announcer:
return None

trackers = self.getTrackers(type)

trackers_per_protocol = {}
for tracker_address in trackers:
protocol = self.getNormalizedTrackerProtocol(tracker_address)
if not protocol:
continue
if not working_only or self.isTrackerWorking(tracker_address, type=type):
trackers_per_protocol.setdefault(protocol, []).append(tracker_address)

return trackers_per_protocol

def getWorkingTrackers(self, type="shared"):
trackers = {
key: tracker for key, tracker in self.getTrackers(type).items()
if tracker["time_success"] > time.time() - 60 * 60
if self.isTrackerWorking(key, type)
}
return trackers

Expand All @@ -97,17 +236,45 @@ def load(self):
self.log.debug("Loaded %s shared trackers" % len(trackers))
for address, tracker in list(trackers.items()):
tracker["num_error"] = 0
if not address.startswith("zero://"):
del trackers[address]
self.recheckValidTrackers()

def save(self):
s = time.time()
helper.atomicWrite(self.file_path, json.dumps(self.file_content, indent=2, sort_keys=True).encode("utf8"))
self.log.debug("Saved in %.3fs" % (time.time() - s))

def enoughWorkingTrackers(self, type="shared"):
supported_protocols = self.getSupportedProtocols()
if not supported_protocols:
return False

trackers_per_protocol = self.getTrackersPerProtocol(type="shared", working_only=True)
if not trackers_per_protocol:
return False

total_nr = 0

for protocol in supported_protocols:
trackers = trackers_per_protocol.get(protocol, [])
if len(trackers) < self.getTrackerLimitForProtocol(protocol):
self.log.debug("Not enough working trackers for protocol %s: %s < %s" % (
protocol, len(trackers), self.getTrackerLimitForProtocol(protocol)))
return False
total_nr += len(trackers)

if total_nr < config.working_shared_trackers_limit:
self.log.debug("Not enough working trackers: %s < %s" % (
total_nr, config.working_shared_trackers_limit))
return False

return True

def discoverTrackers(self, peers):
if len(self.getWorkingTrackers()) > config.working_shared_trackers_limit:
if self.enoughWorkingTrackers(type="shared"):
return False

self.log.debug("Discovering trackers from %s peers..." % len(peers))

s = time.time()
num_success = 0
for peer in peers:
Expand All @@ -119,6 +286,8 @@ def discoverTrackers(self, peers):
continue

num_success += 1

random.shuffle(res["trackers"])
for tracker_address in res["trackers"]:
if type(tracker_address) is bytes: # Backward compatibilitys
tracker_address = tracker_address.decode("utf8")
Expand All @@ -142,7 +311,8 @@ def discoverTrackers(self, peers):
@PluginManager.registerTo("SiteAnnouncer")
class SiteAnnouncerPlugin(object):
def getTrackers(self):
if tracker_storage.time_discover < time.time() - 5 * 60:
tracker_storage.setSiteAnnouncer(self)
if tracker_storage.time_discover < time.time() - tracker_storage.tracker_discover_time_interval:
tracker_storage.time_discover = time.time()
gevent.spawn(tracker_storage.discoverTrackers, self.site.getConnectedPeers())
trackers = super(SiteAnnouncerPlugin, self).getTrackers()
Expand All @@ -153,6 +323,7 @@ def getTrackers(self):
return trackers

def announceTracker(self, tracker, *args, **kwargs):
tracker_storage.setSiteAnnouncer(self)
res = super(SiteAnnouncerPlugin, self).announceTracker(tracker, *args, **kwargs)
if res:
latency = res
Expand All @@ -167,6 +338,7 @@ def announceTracker(self, tracker, *args, **kwargs):
class FileRequestPlugin(object):
def actionGetTrackers(self, params):
shared_trackers = list(tracker_storage.getWorkingTrackers("shared").keys())
random.shuffle(shared_trackers)
self.response({"trackers": shared_trackers})


Expand All @@ -185,6 +357,7 @@ def portCheck(self, *args, **kwargs):
class ConfigPlugin(object):
def createArguments(self):
group = self.parser.add_argument_group("AnnounceShare plugin")
group.add_argument('--working_shared_trackers_limit', help='Stop discovering new shared trackers after this number of shared trackers reached', default=5, type=int, metavar='limit')
group.add_argument('--working_shared_trackers_limit', help='Stop discovering new shared trackers after this number of shared trackers reached (total)', default=10, type=int, metavar='limit')
group.add_argument('--working_shared_trackers_limit_per_protocol', help='Stop discovering new shared trackers after this number of shared trackers reached per each supported protocol', default="zero=5,other=2", metavar='limit')

return super(ConfigPlugin, self).createArguments()