Skip to content

Commit 2c68f59

Browse files
committed
Ensure partitionning is consistent accress message bus and queue
1 parent 1dec22f commit 2c68f59

File tree

17 files changed

+192
-130
lines changed

17 files changed

+192
-130
lines changed

docs/source/topics/cluster-setup.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ a common modules and import settings from it in component's modules.
5858
'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware'
5959
])
6060

61-
QUEUE_HOSTNAME_PARTITIONING = True
61+
62+
SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
6263
KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port
6364
SCORING_TOPIC = 'frontier-scoring'
6465
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'

docs/source/topics/frontera-settings.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,28 @@ Default: ``0``
332332

333333
Per-spider setting, pointing spider to it's assigned partition.
334334

335+
.. setting:: SPIDER_FEED_PARTITIONER
336+
337+
SPIDER_LOG_PARTITIONER
338+
-----------------------
339+
340+
Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``
341+
342+
Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of extracted links to the
343+
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
344+
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.
345+
346+
.. setting:: SPIDER_FEED_PARTITIONER
347+
348+
SPIDER_FEED_PARTITIONER
349+
-----------------------
350+
351+
Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``
352+
353+
Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of requests to the
354+
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
355+
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.
356+
335357
.. setting:: STATE_CACHE_SIZE
336358

337359
STATE_CACHE_SIZE

examples/cluster/bc/config/common.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
#--------------------------------------------------------
1515
# Crawl frontier backend
1616
#--------------------------------------------------------
17-
QUEUE_HOSTNAME_PARTITIONING = True
17+
SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
1818
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'
1919

2020
#MESSAGE_BUS='frontera.contrib.messagebus.kafkabus.MessageBus'
2121
#KAFKA_LOCATION = 'localhost:9092'
2222
#SCORING_GROUP = 'scrapy-scoring'
23-
#SCORING_TOPIC = 'frontier-score'
23+
#SCORING_TOPIC = 'frontier-score'

frontera/contrib/backends/hbase.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from frontera.core.components import Metadata, Queue, States
66
from frontera.core.models import Request
77
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
8-
from frontera.utils.misc import chunks, get_crc32
8+
from frontera.utils.misc import chunks, get_crc32, load_object
99
from frontera.contrib.backends.remote.codecs.msgpack import Decoder, Encoder
1010

1111
from happybase import Connection
@@ -66,10 +66,9 @@ class HBaseQueue(Queue):
6666

6767
GET_RETRIES = 3
6868

69-
def __init__(self, connection, partitions, table_name, drop=False):
69+
def __init__(self, connection, partitioner, table_name, drop=False):
7070
self.connection = connection
71-
self.partitions = [i for i in range(0, partitions)]
72-
self.partitioner = Crc32NamePartitioner(self.partitions)
71+
self.partitioner = partitioner
7372
self.logger = logging.getLogger("hbase.queue")
7473
self.table_name = to_bytes(table_name)
7574

@@ -141,14 +140,9 @@ def get_interval(score, resolution):
141140
for request, score in batch:
142141
domain = request.meta[b'domain']
143142
fingerprint = request.meta[b'fingerprint']
144-
if type(domain) == dict:
145-
partition_id = self.partitioner.partition(domain[b'name'], self.partitions)
146-
host_crc32 = get_crc32(domain[b'name'])
147-
elif type(domain) == int:
148-
partition_id = self.partitioner.partition_by_hash(domain, self.partitions)
149-
host_crc32 = domain
150-
else:
151-
raise TypeError("domain of unknown type.")
143+
key = self.partitioner.get_key(request)
144+
partition_id = self.partitioner.partition(key)
145+
host_crc32 = domain if type(domain) == int else get_crc32(key)
152146
item = (unhexlify(fingerprint), host_crc32, self.encoder.encode_request(request), score)
153147
score = 1 - score # because of lexicographical sort in HBase
154148
rk = "%d_%s_%d" % (partition_id, "%0.2f_%0.2f" % get_interval(score, 0.01), random_str)
@@ -404,7 +398,9 @@ def __init__(self, manager):
404398
self._min_hosts = settings.get('BC_MIN_HOSTS')
405399
self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST')
406400

407-
self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS')
401+
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
402+
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
403+
self.partitioner = partitioner_cls(partitions)
408404
host = choice(hosts) if type(hosts) in [list, tuple] else hosts
409405
kwargs = {
410406
'host': host,
@@ -435,7 +431,7 @@ def db_worker(cls, manager):
435431
o = cls(manager)
436432
settings = manager.settings
437433
drop_all_tables = settings.get('HBASE_DROP_ALL_TABLES')
438-
o._queue = HBaseQueue(o.connection, o.queue_partitions,
434+
o._queue = HBaseQueue(o.connection, o.partitioner,
439435
settings.get('HBASE_QUEUE_TABLE'), drop=drop_all_tables)
440436
o._metadata = HBaseMetadata(o.connection, settings.get('HBASE_METADATA_TABLE'), drop_all_tables,
441437
settings.get('HBASE_USE_SNAPPY'), settings.get('HBASE_BATCH_SIZE'),
@@ -484,7 +480,7 @@ def get_next_requests(self, max_next_requests, **kwargs):
484480
next_pages = []
485481
self.logger.debug("Querying queue table.")
486482
partitions = set(kwargs.pop('partitions', []))
487-
for partition_id in range(0, self.queue_partitions):
483+
for partition_id in self.partitioner.partitions:
488484
if partition_id not in partitions:
489485
continue
490486
results = self.queue.get_next_requests(max_next_requests, partition_id,

frontera/contrib/backends/memory/__init__.py

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from frontera.core.components import Metadata, Queue, States
88
from frontera.core import OverusedBuffer
99
from frontera.utils.heap import Heap
10-
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
1110
from frontera.utils.url import parse_domain_from_url_fast
11+
from frontera.utils.misc import load_object
1212
import six
1313
from six.moves import map
1414
from six.moves import range
@@ -52,12 +52,11 @@ def update_score(self, batch):
5252

5353

5454
class MemoryQueue(Queue):
55-
def __init__(self, partitions):
56-
self.partitions = [i for i in range(0, partitions)]
57-
self.partitioner = Crc32NamePartitioner(self.partitions)
55+
def __init__(self, partitioner):
56+
self.partitioner = partitioner
5857
self.logger = logging.getLogger("memory.queue")
5958
self.heap = {}
60-
for partition in self.partitions:
59+
for partition in self.partitioner.partitions:
6160
self.heap[partition] = Heap(self._compare_pages)
6261

6362
def count(self):
@@ -70,31 +69,26 @@ def schedule(self, batch):
7069
for fprint, score, request, schedule in batch:
7170
if schedule:
7271
request.meta[b'_scr'] = score
73-
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
74-
if not hostname:
75-
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
76-
partition_id = self.partitions[0]
77-
else:
78-
partition_id = self.partitioner.partition(hostname, self.partitions)
72+
key = self.partitioner.get_key(request)
73+
partition_id = self.partitioner.partition(key)
7974
self.heap[partition_id].push(request)
8075

8176
def _compare_pages(self, first, second):
8277
return cmp(first.meta[b'_scr'], second.meta[b'_scr'])
8378

8479

8580
class MemoryDequeQueue(Queue):
86-
def __init__(self, partitions, is_fifo=True):
81+
def __init__(self, partitioner, is_fifo=True):
8782
"""
8883
Deque-based queue (see collections module). Efficient queue for LIFO and FIFO strategies.
89-
:param partitions: int count of partitions
84+
:param partitioner: Partitioner
9085
:param type: bool, True for FIFO, False for LIFO
9186
"""
92-
self.partitions = [i for i in range(0, partitions)]
93-
self.partitioner = Crc32NamePartitioner(self.partitions)
87+
self.partitioner = partitioner
9488
self.logger = logging.getLogger("memory.dequequeue")
9589
self.queues = {}
9690
self.is_fifo = is_fifo
97-
for partition in self.partitions:
91+
for partition in self.partitioner.partitions:
9892
self.queues[partition] = deque()
9993

10094
def count(self):
@@ -112,12 +106,8 @@ def schedule(self, batch):
112106
for fprint, score, request, schedule in batch:
113107
if schedule:
114108
request.meta[b'_scr'] = score
115-
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
116-
if not hostname:
117-
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
118-
partition_id = self.partitions[0]
119-
else:
120-
partition_id = self.partitioner.partition(hostname, self.partitions)
109+
key = self.partitioner.get_key(request)
110+
partition_id = self.partitioner.partition(key)
121111
self.queues[partition_id].append(request)
122112

123113

@@ -165,6 +155,9 @@ def __init__(self, manager):
165155
settings = manager.settings
166156
self._metadata = MemoryMetadata()
167157
self._states = MemoryStates(settings.get("STATE_CACHE_SIZE"))
158+
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
159+
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
160+
self._partitioner = partitioner_cls(partitions)
168161
self._queue = self._create_queue(settings)
169162
self._id = 0
170163

@@ -222,27 +215,27 @@ def _compare_pages(self, first, second):
222215

223216
class MemoryFIFOBackend(MemoryBaseBackend):
224217
def _create_queue(self, settings):
225-
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'))
218+
return MemoryDequeQueue(self._partitioner)
226219

227220

228221
class MemoryLIFOBackend(MemoryBaseBackend):
229222
def _create_queue(self, settings):
230-
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'), is_fifo=False)
223+
return MemoryDequeQueue(self._partitioner, is_fifo=False)
231224

232225

233226
class MemoryDFSBackend(MemoryBaseBackend):
234227
def _create_queue(self, settings):
235-
return MemoryDFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
228+
return MemoryDFSQueue(self._partitioner)
236229

237230

238231
class MemoryBFSBackend(MemoryBaseBackend):
239232
def _create_queue(self, settings):
240-
return MemoryBFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
233+
return MemoryBFSQueue(self._partitioner)
241234

242235

243236
class MemoryRandomBackend(MemoryBaseBackend):
244237
def _create_queue(self, settings):
245-
return MemoryRandomQueue(settings.get('SPIDER_FEED_PARTITIONS'))
238+
return MemoryRandomQueue(self._partitioner)
246239

247240

248241
class MemoryDFSOverusedBackend(MemoryDFSBackend):

frontera/contrib/backends/partitioners.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,43 @@
55

66
from frontera.core.components import Partitioner
77
from frontera.utils.misc import get_crc32
8+
from frontera.utils.url import parse_domain_from_url_fast
89

910

1011
class Crc32NamePartitioner(Partitioner):
1112
def partition(self, key, partitions=None):
13+
if not partitions:
14+
partitions = self.partitions
1215
if key is None:
13-
return self.partitions[0]
14-
value = get_crc32(key)
15-
return self.partition_by_hash(value, partitions if partitions else self.partitions)
16+
return partitions[0]
17+
elif type(key) == int:
18+
value = key
19+
else:
20+
value = get_crc32(key)
21+
return self.partition_by_hash(value, partitions)
1622

1723
def partition_by_hash(self, value, partitions):
1824
size = len(partitions)
1925
idx = value % size
2026
return partitions[idx]
2127

22-
def __call__(self, key, all_partitions, available):
23-
return self.partition(key, all_partitions)
28+
@staticmethod
29+
def get_key(request):
30+
domain = request.meta.get(b'domain')
31+
if domain is not None:
32+
if type(domain) == dict:
33+
return domain[b'name']
34+
elif type(domain) == int:
35+
return domain
36+
else:
37+
raise TypeError("domain of unknown type.")
38+
39+
try:
40+
_, name, _, _, _, _ = parse_domain_from_url_fast(request.url)
41+
except Exception:
42+
return None
43+
else:
44+
return name.encode('utf-8', 'ignore')
2445

2546

2647
class FingerprintPartitioner(Partitioner):
@@ -32,5 +53,6 @@ def partition(self, key, partitions=None):
3253
idx = value[0] % len(partitions)
3354
return partitions[idx]
3455

35-
def __call__(self, key, all_partitions, available):
36-
return self.partition(key, all_partitions)
56+
@staticmethod
57+
def get_key(request):
58+
return request.meta[b'fingerprint']

frontera/contrib/backends/sqlalchemy/__init__.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ def __init__(self, manager):
3737
session.execute(table.delete())
3838
session.commit()
3939
session.close()
40+
41+
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
42+
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
43+
self.partitioner = partitioner_cls(partitions)
44+
4045
self._metadata = Metadata(self.session_cls, self.models['MetadataModel'],
4146
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
4247
self._states = States(self.session_cls, self.models['StateModel'],
@@ -48,7 +53,7 @@ def frontier_stop(self):
4853
self.engine.dispose()
4954

5055
def _create_queue(self, settings):
51-
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
56+
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)
5257

5358
@property
5459
def queue(self):
@@ -67,23 +72,23 @@ class FIFOBackend(SQLAlchemyBackend):
6772
component_name = 'SQLAlchemy FIFO Backend'
6873

6974
def _create_queue(self, settings):
70-
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
75+
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
7176
ordering='created')
7277

7378

7479
class LIFOBackend(SQLAlchemyBackend):
7580
component_name = 'SQLAlchemy LIFO Backend'
7681

7782
def _create_queue(self, settings):
78-
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
83+
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
7984
ordering='created_desc')
8085

8186

8287
class DFSBackend(SQLAlchemyBackend):
8388
component_name = 'SQLAlchemy DFS Backend'
8489

8590
def _create_queue(self, settings):
86-
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
91+
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)
8792

8893
def _get_score(self, obj):
8994
return -obj.meta[b'depth']
@@ -93,7 +98,7 @@ class BFSBackend(SQLAlchemyBackend):
9398
component_name = 'SQLAlchemy BFS Backend'
9499

95100
def _create_queue(self, settings):
96-
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
101+
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)
97102

98103
def _get_score(self, obj):
99104
return obj.meta[b'depth']
@@ -170,7 +175,7 @@ def db_worker(cls, manager):
170175

171176
b._metadata = Metadata(b.session_cls, metadata_m,
172177
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
173-
b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
178+
b._queue = Queue(b.session_cls, queue_m, b.partitioner)
174179
return b
175180

176181
@property

0 commit comments

Comments
 (0)