Skip to content

Commit 962642e

Browse files
committed
make sure extractor stops if rabbitmq dies
This will make sure that the extractor restarts if rabbitmq dies. Also removed the code to run parrallel threads, this was not used and could lead to race conditions. If multiple extractors are needed it is safer to start the extractor multiple times.
1 parent 4489c7a commit 962642e

File tree

3 files changed

+70
-58
lines changed

3 files changed

+70
-58
lines changed

CHANGELOG.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,23 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/)
55
and this project adheres to [Semantic Versioning](http://semver.org/).
66

7+
## 2.3.2 - 2020-09-24
8+
9+
### Fixed
10+
- When rabbitmq restarts the extractor would not stop and restart, resulting
11+
in the extractor no longer receiving any messages. #17
12+
13+
### Removed
14+
- Removed ability to run multiple connectors in the same python process. If
15+
parallelism is needed, use multiple processes (or containers).
16+
717
## 2.3.1 - 2020-09-18
818

919
With this version we no longer gurantee support for versions of python below 3.
1020

1121
### Fixed
12-
- There was an issue where status messages could cause an exception. This would prevent most extractors from running correctly.
22+
- There was an issue where status messages could cause an exception. This would
23+
prevent most extractors from running correctly.
1324

1425
## 2.3.0 - 2020-09-15
1526

pyclowder/connectors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,9 @@ def listen(self):
728728
self.connection.close()
729729
except Exception:
730730
logging.getLogger(__name__).exception("Error while closing connection.")
731+
if self.announcer:
732+
self.announcer.stop_thread()
733+
731734
self.connection = None
732735

733736
def stop(self):
@@ -808,6 +811,9 @@ def start_thread(self):
808811
self.thread.setDaemon(True)
809812
self.thread.start()
810813

814+
def stop_thread(self):
815+
self.thread = None
816+
811817
def send_heartbeat(self):
812818
# create the message we will send
813819
message = {
@@ -868,6 +874,7 @@ def start_thread(self, json_body):
868874
"""
869875
self.thread = threading.Thread(target=self._process_message, args=(json_body,))
870876
self.thread.start()
877+
self.thread.setDaemon(True)
871878

872879
def is_finished(self):
873880
with self.lock:

pyclowder/extractors.py

Lines changed: 51 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ def __init__(self):
8080
help='connector to use (default=RabbitMQ)')
8181
self.parser.add_argument('--logging', '-l', nargs='?', default=logging_config,
8282
help='file or url or logging coonfiguration (default=None)')
83-
self.parser.add_argument('--num', '-n', type=int, nargs='?', default=1,
84-
help='number of parallel instances (default=1)')
8583
self.parser.add_argument('--pickle', nargs='*', dest="hpc_picklefile",
8684
default=None, action='append',
8785
help='pickle file that needs to be processed (only needed for HPC)')
@@ -126,32 +124,32 @@ def setup(self):
126124
def start(self):
127125
"""Create the connector and start listening.
128126
129-
Based on the num command line argument this will start multiple instances of a connector and run each of them
130-
in their own thread. Once the connector(s) are created this function will go into a endless loop until either
127+
Start a single instance of a connector and run it in their own thread.
128+
Once the connector(s) are created this function will go into a endless loop until either
131129
all connectors have stopped or the user kills the program.
132130
"""
133131
logger = logging.getLogger(__name__)
134-
connectors = list()
135-
for connum in range(self.args.num):
136-
if self.args.connector == "RabbitMQ":
137-
if 'rabbitmq_uri' not in self.args:
138-
logger.error("Missing URI for RabbitMQ")
139-
else:
140-
rabbitmq_key = []
141-
if not self.args.nobind:
142-
for key, value in self.extractor_info['process'].items():
143-
for mt in value:
144-
# Replace trailing '*' with '#'
145-
mt = re.sub('(\*$)', '#', mt)
146-
if mt.find('*') > -1:
147-
logger.error("Invalid '*' found in rabbitmq_key: %s" % mt)
132+
connector = None
133+
134+
if self.args.connector == "RabbitMQ":
135+
if 'rabbitmq_uri' not in self.args:
136+
logger.error("Missing URI for RabbitMQ")
137+
else:
138+
rabbitmq_key = []
139+
if not self.args.nobind:
140+
for key, value in self.extractor_info['process'].items():
141+
for mt in value:
142+
# Replace trailing '*' with '#'
143+
mt = re.sub('(\*$)', '#', mt)
144+
if mt.find('*') > -1:
145+
logger.error("Invalid '*' found in rabbitmq_key: %s" % mt)
146+
else:
147+
if mt == "":
148+
rabbitmq_key.append("*.%s.#" % key)
148149
else:
149-
if mt == "":
150-
rabbitmq_key.append("*.%s.#" % key)
151-
else:
152-
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))
150+
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))
153151

154-
rconn = RabbitMQConnector(self.args.rabbitmq_queuename,
152+
connector = RabbitMQConnector(self.args.rabbitmq_queuename,
155153
self.extractor_info,
156154
check_message=self.check_message,
157155
process_message=self.process_message,
@@ -160,54 +158,50 @@ def start(self):
160158
rabbitmq_key=rabbitmq_key,
161159
rabbitmq_queue=self.args.rabbitmq_queuename,
162160
mounted_paths=json.loads(self.args.mounted_paths))
163-
rconn.connect()
164-
rconn.register_extractor(self.args.registration_endpoints)
165-
connectors.append(rconn)
166-
threading.Thread(target=rconn.listen, name="Connector-" + str(connum)).start()
167-
elif self.args.connector == "HPC":
168-
if 'hpc_picklefile' not in self.args:
169-
logger.error("Missing hpc_picklefile for HPCExtractor")
170-
else:
171-
hconn = HPCConnector(self.extractor_info['name'],
161+
connector.connect()
162+
connector.register_extractor(self.args.registration_endpoints)
163+
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()
164+
165+
elif self.args.connector == "HPC":
166+
if 'hpc_picklefile' not in self.args:
167+
logger.error("Missing hpc_picklefile for HPCExtractor")
168+
else:
169+
connector = HPCConnector(self.extractor_info['name'],
172170
self.extractor_info,
173171
check_message=self.check_message,
174172
process_message=self.process_message,
175173
picklefile=self.args.hpc_picklefile,
176174
mounted_paths=json.loads(self.args.mounted_paths))
177-
hconn.register_extractor(self.args.registration_endpoints)
178-
connectors.append(hconn)
179-
threading.Thread(target=hconn.listen, name="Connector-" + str(connum)).start()
180-
elif self.args.connector == "Local":
181-
182-
if self.args.input_file_path is None:
183-
logger.error("Environment variable INPUT_FILE_PATH or parameter --input-file-path is not set. "
184-
"Please try again after setting one of these")
185-
elif not os.path.isfile(self.args.input_file_path):
186-
logger.error("Local input file is not a regular file. Please check the path.")
187-
else:
188-
local_connector = LocalConnector(self.extractor_info['name'],
189-
self.extractor_info,
190-
self.args.input_file_path,
191-
process_message=self.process_message,
192-
output_file_path=self.args.output_file_path)
193-
connectors.append(local_connector)
194-
threading.Thread(target=local_connector.listen, name="Connector-" + str(connum)).start()
175+
connector.register_extractor(self.args.registration_endpoints)
176+
threading.Thread(target=connector.listen, name="HPCConnector").start()
177+
178+
elif self.args.connector == "Local":
179+
if self.args.input_file_path is None:
180+
logger.error("Environment variable INPUT_FILE_PATH or parameter "
181+
"--input-file-path is not set. Please try again after "
182+
"setting one of these")
183+
elif not os.path.isfile(self.args.input_file_path):
184+
logger.error("Local input file is not a regular file. Please check the path.")
195185
else:
196-
logger.error("Could not create instance of %s connector.", self.args.connector)
197-
sys.exit(-1)
186+
connector = LocalConnector(self.extractor_info['name'],
187+
self.extractor_info,
188+
self.args.input_file_path,
189+
process_message=self.process_message,
190+
output_file_path=self.args.output_file_path)
191+
threading.Thread(target=connector.listen, name="LocalConnector").start()
192+
else:
193+
logger.error("Could not create instance of %s connector.", self.args.connector)
194+
sys.exit(-1)
198195

199196
logger.info("Waiting for messages. To exit press CTRL+C")
200197
try:
201-
while connectors:
198+
while connector.alive():
202199
time.sleep(1)
203-
connectors = filter(lambda x: x.alive(), connectors)
204200
except KeyboardInterrupt:
205201
pass
206202
except BaseException:
207203
logger.exception("Error while consuming messages.")
208-
209-
for c in connectors:
210-
c.stop()
204+
connector.stop()
211205

212206
def get_metadata(self, content, resource_type, resource_id, server=None):
213207
"""Generate a metadata field.

0 commit comments

Comments
 (0)