11from __future__ import absolute_import , unicode_literals
22
33import logging
4+ import signal
45from datetime import timedelta
56from time import sleep
67
@@ -27,10 +28,18 @@ class WorkerService(object):
2728 _PREFIX_STR = 'prefix:'
2829 _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'
2930
31+ def __init__ (self ):
32+ # type: () -> None
33+ self ._exit_gracefully = False
34+ self ._last_healthcheck_time = None
35+
3036 def process_queues (self , queue_names ):
3137 # type: (list) -> None
38+ signal .signal (signal .SIGTERM , self ._exit_called )
39+ signal .signal (signal .SIGKILL , self ._exit_called )
40+
3241 self .write_healthcheck_file ()
33- self .last_healthcheck_time = timezone .now ()
42+ self ._last_healthcheck_time = timezone .now ()
3443
3544 logger .debug ('[django-eb-sqs] Connecting to SQS: {}' .format (', ' .join (queue_names )))
3645
@@ -61,6 +70,9 @@ def process_queues(self, queue_names):
6170 logger .info ('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}' .format (settings .REFRESH_PREFIX_QUEUES_S ))
6271
6372 while True :
73+ if self ._exit_gracefully :
74+ break
75+
6476 if len (queue_prefixes ) > 0 and \
6577 timezone .now () - timedelta (seconds = settings .REFRESH_PREFIX_QUEUES_S ) > last_update_time :
6678 queues = static_queues + self .get_queues_by_prefixes (sqs , queue_prefixes )
@@ -79,6 +91,9 @@ def process_messages(self, queues, worker, static_queues):
7991 # type: (list, Worker, list) -> None
8092
8193 for queue in queues :
94+ if self ._exit_gracefully :
95+ return
96+
8297 try :
8398 messages = self .poll_messages (queue )
8499 logger .debug ('[django-eb-sqs] Polled {} messages' .format (len (messages )))
@@ -98,10 +113,6 @@ def process_messages(self, queues, worker, static_queues):
98113 self .delete_messages (queue , msg_entries )
99114
100115 self ._send_signal (MESSAGES_DELETED , messages = messages )
101-
102- if timezone .now () - timedelta (seconds = settings .MIN_HEALTHCHECK_WRITE_PERIOD_S ) > self .last_healthcheck_time :
103- self .write_healthcheck_file ()
104- self .last_healthcheck_time = timezone .now ()
105116 except ClientError as exc :
106117 error_code = exc .response .get ('Error' , {}).get ('Code' , None )
107118 if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues :
@@ -111,6 +122,10 @@ def process_messages(self, queues, worker, static_queues):
111122 except Exception as exc :
112123 logger .warning ('[django-eb-sqs] Error polling queue {}: {}' .format (queue .url , exc ), exc_info = True )
113124
125+ if timezone .now () - timedelta (seconds = settings .MIN_HEALTHCHECK_WRITE_PERIOD_S ) > self ._last_healthcheck_time :
126+ self .write_healthcheck_file ()
127+ self ._last_healthcheck_time = timezone .now ()
128+
114129 def delete_messages (self , queue , msg_entries ):
115130 # type: (Queue, list) -> None
116131 if len (msg_entries ) > 0 :
@@ -130,10 +145,10 @@ def poll_messages(self, queue):
130145 AttributeNames = [self ._RECEIVE_COUNT_ATTRIBUTE ]
131146 )
132147
133- def _send_signal (self , signal , messages ):
148+ def _send_signal (self , dispatch_signal , messages ):
134149 # type: (django.dispatch.Signal, list) -> None
135- if signal .has_listeners (sender = self .__class__ ):
136- self ._execute_user_code (lambda : signal .send (sender = self .__class__ , messages = messages ))
150+ if dispatch_signal .has_listeners (sender = self .__class__ ):
151+ self ._execute_user_code (lambda : dispatch_signal .send (sender = self .__class__ , messages = messages ))
137152
138153 def _process_message (self , msg , worker ):
139154 # type: (Message, Worker) -> None
@@ -178,3 +193,6 @@ def write_healthcheck_file(self):
178193 # type: () -> None
179194 with open (settings .HEALTHCHECK_FILE_NAME , 'w' ) as file :
180195 file .write (timezone .now ().isoformat ())
196+
197+ def _exit_called (self ):
198+ self ._exit_gracefully = True
0 commit comments