Skip to content

Commit 235ceb6

Browse files
author
bslatkin
committed
Incorporates various scalability tweaks; removes periodic EventToDeliver cleanup cronjob
git-svn-id: http://pubsubhubbub.googlecode.com/svn/trunk@408 ad9c9841-ba53-0410-b846-bb8d69ff4585
1 parent 92dc7e9 commit 235ceb6

File tree

4 files changed

+47
-106
lines changed

4 files changed

+47
-106
lines changed

hub/cron.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ cron:
33
url: /work/poll_bootstrap
44
schedule: every 5 minutes
55

6-
- description: EventToDeliver cleanup
7-
url: /work/event_cleanup
8-
schedule: every 1 minutes
9-
106
- description: Subscription cleanup
117
url: /work/subscription_cleanup
128
schedule: every 1 minutes

hub/index.yaml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,6 @@ indexes:
77
- name: topic_hash
88
- name: callback_hash
99

10-
# For displaying failed callbacks for a specific subscription.
11-
- kind: EventToDeliver
12-
properties:
13-
- name: failed_callbacks
14-
- name: last_modified
15-
direction: desc
16-
1710

1811
# AUTOGENERATED
1912

hub/main.py

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@
152152
DELIVERY_RETRY_PERIOD = 30 # seconds
153153

154154
# Period at which feed IDs should be refreshed.
155-
FEED_IDENTITY_UPDATE_PERIOD = (10 * 24 * 60 * 60) # 10 days
155+
FEED_IDENTITY_UPDATE_PERIOD = (20 * 24 * 60 * 60) # 20 days
156156

157157
# Number of polling feeds to fetch from the Datastore at a time.
158158
BOOSTRAP_FEED_CHUNK_SIZE = 50
@@ -209,14 +209,14 @@
209209
FETCH_SCORER = dos.UrlScorer(
210210
period=300, # Seconds
211211
min_requests=5, # per second
212-
max_failure_percentage=1, # TODO: Drop this to something more reasonable!
212+
max_failure_percentage=0.8,
213213
prefix='pull_feed')
214214

215215
# Pushing events
216216
DELIVERY_SCORER = dos.UrlScorer(
217217
period=300, # Seconds
218218
min_requests=0.5, # per second
219-
max_failure_percentage=1, # TODO: Drop this to something more reasonable!
219+
max_failure_percentage=0.8,
220220
prefix='deliver_events')
221221

222222

@@ -689,7 +689,6 @@ def get_random_challenge():
689689
"""Returns a string containing a random challenge token."""
690690
return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))
691691

692-
693692
################################################################################
694693
# Models
695694

@@ -929,7 +928,7 @@ def has_subscribers(cls, topic):
929928
Returns:
930929
True if it has verified subscribers, False otherwise.
931930
"""
932-
if (cls.all().filter('topic_hash =', sha1_hash(topic))
931+
if (cls.all(keys_only=True).filter('topic_hash =', sha1_hash(topic))
933932
.filter('subscription_state =', cls.STATE_VERIFIED).get() is not None):
934933
return True
935934
else:
@@ -1047,7 +1046,7 @@ def txn():
10471046
return db.run_in_transaction(txn)
10481047

10491048

1050-
class FeedToFetch(db.Model):
1049+
class FeedToFetch(db.Expando):
10511050
"""A feed that has new data that needs to be pulled.
10521051
10531052
The key name of this entity is a get_hash_key_name() hash of the topic URL, so
@@ -1057,7 +1056,7 @@ class FeedToFetch(db.Model):
10571056
topic = db.TextProperty(required=True)
10581057
eta = db.DateTimeProperty(auto_now_add=True, indexed=False)
10591058
fetching_failures = db.IntegerProperty(default=0, indexed=False)
1060-
totally_failed = db.BooleanProperty(default=False)
1059+
totally_failed = db.BooleanProperty(default=False, indexed=False)
10611060
source_keys = db.StringListProperty(indexed=False)
10621061
source_values = db.StringListProperty(indexed=False)
10631062
work_index = db.IntegerProperty()
@@ -1145,19 +1144,24 @@ def fetch_failed(self,
11451144
retry_period: Initial period for doing exponential (base-2) backoff.
11461145
now: Returns the current time as a UTC datetime.
11471146
"""
1147+
orig_failures = self.fetching_failures
11481148
def txn():
11491149
if self.fetching_failures >= max_failures:
11501150
logging.debug('Max fetching failures exceeded, giving up.')
11511151
self.totally_failed = True
11521152
else:
1153-
retry_delay = retry_period * (2 ** self.fetching_failures)
1153+
retry_delay = retry_period * (2 ** orig_failures)
11541154
logging.debug('Fetching failed. Will retry in %s seconds',
11551155
retry_delay)
11561156
self.eta = now() + datetime.timedelta(seconds=retry_delay)
1157-
self.fetching_failures += 1
1157+
self.fetching_failures = orig_failures + 1
11581158
self._enqueue_retry_task()
11591159
self.put()
1160-
db.run_in_transaction(txn)
1160+
try:
1161+
db.run_in_transaction_custom_retries(2, txn)
1162+
except:
1163+
logging.exception('Could not mark feed fetching as a failure: topic=%r',
1164+
self.topic)
11611165

11621166
def done(self):
11631167
"""The feed fetch has completed successfully.
@@ -1230,11 +1234,11 @@ class FeedRecord(db.Model):
12301234
"""
12311235

12321236
topic = db.TextProperty(required=True)
1233-
header_footer = db.TextProperty() # Save this for debugging.
1234-
last_updated = db.DateTimeProperty(auto_now=True) # The last polling time.
1237+
header_footer = db.TextProperty()
1238+
last_updated = db.DateTimeProperty(auto_now=True, indexed=False)
12351239
format = db.TextProperty() # 'atom', 'rss', or 'arbitrary'
12361240

1237-
# Content-related headers served by the feed' host.
1241+
# Content-related headers served by the feed's host.
12381242
content_type = db.TextProperty()
12391243
last_modified = db.TextProperty()
12401244
etag = db.TextProperty()
@@ -1331,6 +1335,7 @@ def get_request_headers(self, subscriber_count):
13311335
headers = {
13321336
'Cache-Control': 'no-cache no-store max-age=1',
13331337
'Connection': 'cache-control',
1338+
'Accept': '*/*',
13341339
}
13351340
if self.last_modified:
13361341
headers['If-Modified-Since'] = self.last_modified
@@ -1437,10 +1442,11 @@ class EventToDeliver(db.Expando):
14371442
topic_hash = db.StringProperty(required=True)
14381443
last_callback = db.TextProperty(default='') # For paging Subscriptions
14391444
failed_callbacks = db.ListProperty(db.Key) # Refs to Subscription entities
1440-
delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES)
1445+
delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES,
1446+
indexed=False)
14411447
retry_attempts = db.IntegerProperty(default=0, indexed=False)
1442-
last_modified = db.DateTimeProperty(required=True)
1443-
totally_failed = db.BooleanProperty(default=False)
1448+
last_modified = db.DateTimeProperty(required=True, indexed=False)
1449+
totally_failed = db.BooleanProperty(default=False, indexed=False)
14441450
content_type = db.TextProperty(default='')
14451451
max_failures = db.IntegerProperty(indexed=False)
14461452

@@ -1619,13 +1625,17 @@ def update(self,
16191625
return
16201626
elif not more_callbacks:
16211627
self.last_callback = ''
1622-
retry_delay = retry_period * (2 ** self.retry_attempts)
1623-
self.last_modified += datetime.timedelta(seconds=retry_delay)
16241628
self.retry_attempts += 1
16251629
if self.max_failures is not None:
16261630
max_failures = self.max_failures
16271631
if self.retry_attempts > max_failures:
16281632
self.totally_failed = True
1633+
else:
1634+
retry_delay = retry_period * (2 ** (self.retry_attempts-1))
1635+
try:
1636+
self.last_modified += datetime.timedelta(seconds=retry_delay)
1637+
except OverflowError:
1638+
pass
16291639

16301640
if self.delivery_mode == EventToDeliver.NORMAL:
16311641
logging.debug('Normal delivery done; %d broken callbacks remain',
@@ -1969,7 +1979,12 @@ def derive_additional_topics(cls, topics):
19691979
if topic_set is None:
19701980
topic_set = set([known_topic])
19711981
output_dict[known_topic] = topic_set
1972-
topic_set.update(identified.topics)
1982+
# TODO(bslatkin): Test this.
1983+
if len(identified.topics) > 25:
1984+
logging.debug('Too many expansion feeds for topic %s: %s',
1985+
known_topic, identified.topics)
1986+
else:
1987+
topic_set.update(identified.topics)
19731988

19741989
return output_dict
19751990

@@ -2235,6 +2250,7 @@ def post(self):
22352250
done_callback_queue=POLLING_QUEUE))
22362251

22372252

2253+
# TODO(bslatkin): Move this to an offline job.
22382254
class SubscriptionCleanupHandler(webapp.RequestHandler):
22392255
"""Background worker for cleaning up deleted Subscription instances."""
22402256

@@ -2367,7 +2383,8 @@ def post(self):
23672383
self.response.out.write('MUST supply at least one hub.url parameter')
23682384
return
23692385

2370-
logging.debug('Publish event for %d URLs: %s', len(urls), urls)
2386+
logging.debug('Publish event for %d URLs (showing first 25): %s',
2387+
len(urls), list(urls)[:25])
23712388
error = self.receive_publish(urls, 204, 'hub.url')
23722389
if error:
23732390
self.response.out.write(error)
@@ -2558,8 +2575,9 @@ def parse_feed(feed_record,
25582575
except (xml.sax.SAXException, feed_diff.Error), e:
25592576
error_traceback = traceback.format_exc()
25602577
logging.debug(
2561-
'Could not get entries for content of %d bytes in format "%s":\n%s',
2562-
len(content), format, error_traceback)
2578+
'Could not get entries for content of %d bytes in format "%s" '
2579+
'for topic %r:\n%s',
2580+
len(content), format, feed_record.topic, error_traceback)
25632581
parse_failures += 1
25642582
except LookupError, e:
25652583
error_traceback = traceback.format_exc()
@@ -2570,7 +2588,8 @@ def parse_feed(feed_record,
25702588
return true_on_bad_feed
25712589

25722590
if parse_failures == len(order):
2573-
logging.error('Could not parse feed; giving up:\n%s', error_traceback)
2591+
logging.error('Could not parse feed %r; giving up:\n%s',
2592+
feed_record.topic, error_traceback)
25742593
# That's right, we return True. This will cause the fetch to be
25752594
# abandoned on parse failures because the feed is beyond hope!
25762595
return true_on_bad_feed
@@ -2580,7 +2599,7 @@ def parse_feed(feed_record,
25802599
# separate EventToDeliver entities to be inserted for the feed pulls, each
25812600
# containing a separate subset of the data.
25822601
if len(entities_to_save) > MAX_NEW_FEED_ENTRY_RECORDS:
2583-
logging.warning('Found more entities than we can process for topic %s; '
2602+
logging.warning('Found more entities than we can process for topic %r; '
25842603
'splitting', feed_record.topic)
25852604
entities_to_save = entities_to_save[:MAX_NEW_FEED_ENTRY_RECORDS]
25862605
entry_payloads = entry_payloads[:MAX_NEW_FEED_ENTRY_RECORDS]
@@ -2625,8 +2644,8 @@ def txn():
26252644
try:
26262645
db.put(group)
26272646
except (db.BadRequestError, apiproxy_errors.RequestTooLargeError):
2628-
logging.exception('Could not insert %d entities; splitting in half',
2629-
len(group))
2647+
logging.exception('Could not insert %d entities for topic %r; '
2648+
'splitting in half', len(group), feed_record.topic)
26302649
# Insert the first half at the beginning since we need to make sure that
26312650
# the EventToDeliver gets inserted first.
26322651
all_entities.insert(0, group[len(group)/2:])
@@ -2904,30 +2923,6 @@ def create_callback(sub):
29042923

29052924
work.update(more_subscribers, failed_callbacks)
29062925

2907-
2908-
class EventCleanupHandler(webapp.RequestHandler):
2909-
"""Background worker for cleaning up expired EventToDeliver instances."""
2910-
2911-
def __init__(self, now=datetime.datetime.utcnow):
2912-
"""Initializer."""
2913-
webapp.RequestHandler.__init__(self)
2914-
self.now = now
2915-
2916-
@work_queue_only
2917-
def get(self):
2918-
threshold = (self.now() -
2919-
datetime.timedelta(seconds=EVENT_CLEANUP_MAX_AGE_SECONDS))
2920-
events = (EventToDeliver.all()
2921-
.filter('last_modified <=', threshold)
2922-
.order('last_modified').fetch(EVENT_CLEANUP_CHUNK_SIZE))
2923-
if events:
2924-
logging.info('Cleaning up %d events older than %s',
2925-
len(events), threshold)
2926-
try:
2927-
db.delete(events)
2928-
except (db.Error, apiproxy_errors.Error, runtime.DeadlineExceededError):
2929-
logging.exception('Could not clean-up EventToDeliver instances')
2930-
29312926
################################################################################
29322927

29332928
def take_polling_action(topic_list, poll_type):
@@ -3217,7 +3212,6 @@ def get(self):
32173212
else:
32183213
failed_events = (EventToDeliver.all()
32193214
.filter('failed_callbacks =', subscription.key())
3220-
.order('-last_modified')
32213215
.fetch(25))
32223216
delivery_score = DELIVERY_SCORER.filter([callback_url])[0]
32233217

hub/main_test.py

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,7 +2224,8 @@ def testPullWithUnicodeEtag(self):
22242224
self.assertEquals(data.replace('\n', ''), event.payload.replace('\n', ''))
22252225
self.assertEquals('application/atom+xml', event.content_type)
22262226
self.assertEquals(
2227-
{'Connection': 'cache-control',
2227+
{'Accept': '*/*',
2228+
'Connection': 'cache-control',
22282229
'Cache-Control': 'no-cache no-store max-age=1'},
22292230
FeedRecord.all().get().get_request_headers(0))
22302231

@@ -2787,49 +2788,6 @@ def testNotAllowed(self):
27872788
finally:
27882789
dos.DISABLE_FOR_TESTING = True
27892790

2790-
2791-
class EventCleanupHandlerTest(testutil.HandlerTestBase):
2792-
"""Tests for the EventCleanupHandler worker."""
2793-
2794-
def setUp(self):
2795-
"""Sets up the test harness."""
2796-
self.now = datetime.datetime.utcnow()
2797-
self.expire_time = self.now - datetime.timedelta(
2798-
seconds=main.EVENT_CLEANUP_MAX_AGE_SECONDS)
2799-
def create_handler():
2800-
return main.EventCleanupHandler(now=lambda: self.now)
2801-
self.handler_class = create_handler
2802-
testutil.HandlerTestBase.setUp(self)
2803-
self.topic = 'http://example.com/mytopic'
2804-
self.header_footer = '<feed></feed>'
2805-
2806-
def testEventCleanupTooYoung(self):
2807-
"""Tests when there are events present, but they're too young to remove."""
2808-
event = EventToDeliver.create_event_for_topic(
2809-
self.topic, main.ATOM, 'application/atom+xml',
2810-
self.header_footer, [])
2811-
event.last_modified = self.expire_time + datetime.timedelta(seconds=1)
2812-
event.put()
2813-
self.handle('get')
2814-
self.assertTrue(db.get(event.key()) is not None)
2815-
2816-
def testEventCleanupOldEnough(self):
2817-
"""Tests when there are events old enough to clean up."""
2818-
event = EventToDeliver.create_event_for_topic(
2819-
self.topic, main.ATOM, 'application/atom+xml',
2820-
self.header_footer, [])
2821-
event.last_modified = self.expire_time
2822-
event.put()
2823-
2824-
too_young_event = EventToDeliver.create_event_for_topic(
2825-
self.topic + 'blah', main.ATOM, 'application/atom+xml',
2826-
self.header_footer, [])
2827-
too_young_event.put()
2828-
2829-
self.handle('get')
2830-
self.assertTrue(db.get(event.key()) is None)
2831-
self.assertTrue(db.get(too_young_event.key()) is not None)
2832-
28332791
################################################################################
28342792

28352793
class SubscribeHandlerTest(testutil.HandlerTestBase):

0 commit comments

Comments
 (0)