11from __future__ import absolute_import , unicode_literals
22
33import logging
4+ import signal
45from datetime import timedelta
56from time import sleep
67
@@ -27,10 +28,18 @@ class WorkerService(object):
2728 _PREFIX_STR = 'prefix:'
2829 _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'
2930
31+ def __init__ (self ):
32+ # type: () -> None
33+ self ._exit_gracefully = False
34+ self ._last_healthcheck_time = None
35+
3036 def process_queues (self , queue_names ):
3137 # type: (list) -> None
38+ signal .signal (signal .SIGTERM , self ._exit_called )
39+ signal .signal (signal .SIGKILL , self ._exit_called )
40+
3241 self .write_healthcheck_file ()
33- self .last_healthcheck_time = timezone .now ()
42+ self ._last_healthcheck_time = timezone .now ()
3443
3544 logger .debug ('[django-eb-sqs] Connecting to SQS: {}' .format (', ' .join (queue_names )))
3645
@@ -60,7 +69,7 @@ def process_queues(self, queue_names):
6069 logger .info ('[django-eb-sqs] DEFAULT_MAX_RETRIES = {}' .format (settings .DEFAULT_MAX_RETRIES ))
6170 logger .info ('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}' .format (settings .REFRESH_PREFIX_QUEUES_S ))
6271
63- while True :
72+ while not self . _exit_gracefully :
6473 if len (queue_prefixes ) > 0 and \
6574 timezone .now () - timedelta (seconds = settings .REFRESH_PREFIX_QUEUES_S ) > last_update_time :
6675 queues = static_queues + self .get_queues_by_prefixes (sqs , queue_prefixes )
@@ -79,6 +88,9 @@ def process_messages(self, queues, worker, static_queues):
7988 # type: (list, Worker, list) -> None
8089
8190 for queue in queues :
91+ if self ._exit_gracefully :
92+ return
93+
8294 try :
8395 messages = self .poll_messages (queue )
8496 logger .debug ('[django-eb-sqs] Polled {} messages' .format (len (messages )))
@@ -98,10 +110,6 @@ def process_messages(self, queues, worker, static_queues):
98110 self .delete_messages (queue , msg_entries )
99111
100112 self ._send_signal (MESSAGES_DELETED , messages = messages )
101-
102- if timezone .now () - timedelta (seconds = settings .MIN_HEALTHCHECK_WRITE_PERIOD_S ) > self .last_healthcheck_time :
103- self .write_healthcheck_file ()
104- self .last_healthcheck_time = timezone .now ()
105113 except ClientError as exc :
106114 error_code = exc .response .get ('Error' , {}).get ('Code' , None )
107115 if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues :
@@ -111,6 +119,10 @@ def process_messages(self, queues, worker, static_queues):
111119 except Exception as exc :
112120 logger .warning ('[django-eb-sqs] Error polling queue {}: {}' .format (queue .url , exc ), exc_info = True )
113121
122+ if timezone .now () - timedelta (seconds = settings .MIN_HEALTHCHECK_WRITE_PERIOD_S ) > self ._last_healthcheck_time :
123+ self .write_healthcheck_file ()
124+ self ._last_healthcheck_time = timezone .now ()
125+
114126 def delete_messages (self , queue , msg_entries ):
115127 # type: (Queue, list) -> None
116128 if len (msg_entries ) > 0 :
@@ -130,10 +142,10 @@ def poll_messages(self, queue):
130142 AttributeNames = [self ._RECEIVE_COUNT_ATTRIBUTE ]
131143 )
132144
133- def _send_signal (self , signal , messages ):
145+ def _send_signal (self , dispatch_signal , messages ):
134146 # type: (django.dispatch.Signal, list) -> None
135- if signal .has_listeners (sender = self .__class__ ):
136- self ._execute_user_code (lambda : signal .send (sender = self .__class__ , messages = messages ))
147+ if dispatch_signal .has_listeners (sender = self .__class__ ):
148+ self ._execute_user_code (lambda : dispatch_signal .send (sender = self .__class__ , messages = messages ))
137149
138150 def _process_message (self , msg , worker ):
139151 # type: (Message, Worker) -> None
@@ -178,3 +190,6 @@ def write_healthcheck_file(self):
178190 # type: () -> None
179191 with open (settings .HEALTHCHECK_FILE_NAME , 'w' ) as file :
180192 file .write (timezone .now ().isoformat ())
193+
194+ def _exit_called (self ):
195+ self ._exit_gracefully = True
0 commit comments