diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index 65a9022..c561b64 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -21,6 +21,8 @@ DEFAULT_INSERT = { "priority": 0, + "time": None, + "period": 0, "attempts": 0, "locked_by": None, "locked_at": None, @@ -70,27 +72,84 @@ def repair(self): "$inc": {"attempts": 1}} ) - def put(self, payload, priority=0): + def put(self, payload, priority=0, time=None, period=None): """Place a job into the queue """ job = dict(DEFAULT_INSERT) job['priority'] = priority + job['time'] = time + # Store period as an integer representing the number of seconds + # because BSON format doesn't support timedelta + if period and type(period) == timedelta: + job['period'] = period.total_seconds() job['payload'] = payload + if self.is_dupe(job): + return return self.collection.insert(job) + def is_dupe(self, job): + jobs = self.collection.find({ + 'payload': job['payload'], + 'time': job['time'], + 'period': job['period'], + 'attempts': 0}, + limit=1 + ) + for job in jobs: + if job: + return True + return False + def next(self): - return self._wrap_one(self.collection.find_and_modify( - query={"locked_by": None, - "locked_at": None, - "attempts": {"$lt": self.max_attempts}}, - update={"$set": {"attempts": 1, - "locked_by": self.consumer_id, - "locked_at": datetime.now()}}, - sort=[('priority', pymongo.DESCENDING)], - new=1, + scheduled_job = self.next_scheduled_job() + free_job = self.next_free_job() + next_job = None + + if scheduled_job and scheduled_job['time'] < datetime.utcnow(): + next_job = scheduled_job + else: + next_job = free_job + + return self._wrap_one(self.collection.find_and_modify({ + "_id": next_job["_id"] + }, + update={ + "$set": { + "locked_by": self.consumer_id, + "locked_at": datetime.now() + }}, + new=True, limit=1 )) + def next_scheduled_job(self): + jobs = self.collection.find({ + "locked_by": None, + "locked_at": None, + "time": {"$ne": None}, + "attempts": {"$lt": self.max_attempts} + }, + sort=[('time', pymongo.ASCENDING)], + limit=1 + ) + for job in jobs: + return job + return None + + def next_free_job(self): + jobs = self.collection.find({ + "locked_by": None, + "locked_at": None, + "time": None, + "attempts": {"$lt": self.max_attempts} + }, + sort=[('priority', pymongo.DESCENDING)], + limit=1 + ) + for job in jobs: + return job + return None + def _jobs(self): return self.collection.find( query={"locked_by": None, @@ -153,6 +212,17 @@ def job_id(self): def complete(self): """Job has been completed. """ + if self._data['period']: + updated_time = self._data['time'] + timedelta(seconds=self._data['period']) + + return self._queue.collection.find_and_modify( + {"_id": self.job_id, "locked_by": self._queue.consumer_id}, + update={"$set":{ + "locked_by": None, + "locked_at": None, + "time": updated_time + }}) + return self._queue.collection.find_and_modify( {"_id": self.job_id, "locked_by": self._queue.consumer_id}, remove=True) @@ -166,6 +236,15 @@ def error(self, message=None): "locked_by": None, "locked_at": None, "last_error": message}, "$inc": {"attempts": 1}}) + if self._data['attempts'] == self._queue.max_attempts - 1 and self._data['period']: + updated_time = self._data['time'] + timedelta(seconds=self._data['period']) + + self._queue.put(self._data['payload'], + priority=self._data['priority'], + time=updated_time, + period=timedelta(seconds=self._data['period'])) + + def progress(self, count=0): """Note progress on a long running task. """ @@ -181,6 +260,13 @@ def release(self): update={"$set": {"locked_by": None, "locked_at": None}, "$inc": {"attempts": 1}}) + def abort(self): + """Intentionally terminate execution of a job, and remove it from the queue + """ + return self._queue.collection.find_and_modify( + {"_id": self.job_id, "locked_by": self._queue.consumer_id}, + remove=True) + ## Context Manager support def __enter__(self): @@ -192,3 +278,4 @@ def __exit__(self, type, value, tb): else: error = traceback.format_exc() self.error(error) + return True diff --git a/mongoqueue/test.py b/mongoqueue/test.py index dfbdb3a..d6f4db4 100644 --- a/mongoqueue/test.py +++ b/mongoqueue/test.py @@ -111,6 +111,20 @@ def test_release(self): job = self.queue.next() self.assert_job_equal(job, data) + def test_max_attempts(self): + data = {"context_id": "alpha", + "ts": time.time()} + self.queue.put(dict(data)) + attempts = 0 + for i in xrange(0, self.queue.max_attempts): + job = self.queue.next() + if not job: + break + with job: + attempts += 1 + raise Exception() + self.assertEqual(attempts, self.queue.max_attempts) + def test_error(self): pass diff --git a/setup.py b/setup.py index a7e7ba4..468a596 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='mongoqueue', - version="0.7.2", + version="0.7.3", classifiers=[ 'Intended Audience :: Developers', 'Programming Language :: Python',