Skip to content

Commit 59cb5df

Browse files
authored
Merge pull request #34 from clowder-framework/add-max-retry-flag
Add --max-retry CLI flag
2 parents b6c2cc4 + 4f14f8f commit 59cb5df

File tree

6 files changed

+31
-19
lines changed

6 files changed

+31
-19
lines changed

.github/workflows/ci.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
name: Python package
22

3-
on: [push]
3+
on:
4+
push:
5+
pull_request:
46

57
jobs:
68
build:
79
runs-on: ubuntu-latest
810
strategy:
911
matrix:
10-
python-version: [3.5, 3.6, 3.7, 3.8, 3.9]
12+
python-version: [3.6, 3.7, 3.8, 3.9]
1113

1214
steps:
1315
- uses: actions/checkout@v2

.github/workflows/pypi.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ on:
55
branches:
66
- main
77
- master
8+
pull_request:
89

910
jobs:
1011
publish:
@@ -49,7 +50,7 @@ jobs:
4950
python setup.py sdist bdist_wheel
5051
5152
- name: Publish distribution to PyPI
52-
if: steps.release_info.outputs.version != 'unreleased'
53+
if: github.event_name == 'push' && env.BRANCH == 'master' && steps.release_info.outputs.version != 'unreleased'
5354
uses: pypa/gh-action-pypi-publish@master
5455
with:
5556
password: ${{ secrets.pypi_password }}

.github/workflows/release.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ name: Create Release
33
on:
44
push:
55
branches:
6-
- main
76
- master
87

98
jobs:
@@ -27,7 +26,7 @@ jobs:
2726
echo "::set-output name=changelog::$changelog"
2827
2928
- name: create release
30-
if: steps.release_info.outputs.version != 'unreleased'
29+
if: github.event_name == 'push' && env.BRANCH == 'master' && steps.release_info.outputs.version != 'unreleased'
3130
uses: actions/create-release@v1
3231
id: create_release
3332
env:

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/)
55
and this project adheres to [Semantic Versioning](http://semver.org/).
66

7-
## unreleased
7+
## UNRELEASED
8+
9+
### Added
10+
- Add `--max_retry` CLI flag and `CLOWDER_MAX_RETRY` environment variable.
811

912
### Changed
1013
- updated all of the requirements to latest versions

pyclowder/connectors.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Connector(object):
6666
registered_clowder = list()
6767

6868
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
69-
mounted_paths=None, clowder_url=None):
69+
mounted_paths=None, clowder_url=None, max_retry=10):
7070
self.extractor_name = extractor_name
7171
self.extractor_info = extractor_info
7272
self.check_message = check_message
@@ -77,6 +77,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
7777
else:
7878
self.mounted_paths = mounted_paths
7979
self.clowder_url = clowder_url
80+
self.max_retry = max_retry
8081

8182
filename = 'notifications.json'
8283
self.smtp_server = None
@@ -490,7 +491,7 @@ def _process_message(self, body):
490491
except Exception as exc: # pylint: disable=broad-except
491492
message = str(exc)
492493
logger.exception("[%s] %s", resource['id'], message)
493-
if retry_count < 10:
494+
if retry_count < self.max_retry:
494495
message = "(#%s) %s" % (retry_count+1, message)
495496
self.message_resubmit(resource, retry_count+1, message)
496497
else:
@@ -629,9 +630,9 @@ class RabbitMQConnector(Connector):
629630
def __init__(self, extractor_name, extractor_info,
630631
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
631632
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
632-
heartbeat=5*60, clowder_url=None):
633+
heartbeat=5*60, clowder_url=None, max_retry=10):
633634
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
634-
ssl_verify, mounted_paths, clowder_url)
635+
ssl_verify, mounted_paths, clowder_url, max_retry)
635636
self.rabbitmq_uri = rabbitmq_uri
636637
self.rabbitmq_exchange = rabbitmq_exchange
637638
self.rabbitmq_key = rabbitmq_key
@@ -852,10 +853,10 @@ class RabbitMQHandler(Connector):
852853
"""
853854

854855
def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
855-
mounted_paths=None, clowder_url=None, method=None, header=None, body=None):
856+
mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10):
856857

857858
super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
858-
ssl_verify, mounted_paths, clowder_url)
859+
ssl_verify, mounted_paths, clowder_url, max_retry)
859860
self.method = method
860861
self.header = header
861862
self.body = body
@@ -977,9 +978,9 @@ class HPCConnector(Connector):
977978

978979
# pylint: disable=too-many-arguments
979980
def __init__(self, extractor_name, extractor_info, picklefile, job_id=None,
980-
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None):
981+
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, max_retry=10):
981982
super(HPCConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
982-
ssl_verify, mounted_paths)
983+
ssl_verify, mounted_paths, max_retry=max_retry)
983984
self.job_id = job_id
984985
self.picklefile = picklefile
985986
self.logfile = None
@@ -1036,8 +1037,8 @@ class LocalConnector(Connector):
10361037
10371038
"""
10381039

1039-
def __init__(self, extractor_name, extractor_info, input_file_path, process_message=None, output_file_path=None):
1040-
super(LocalConnector, self).__init__(extractor_name, extractor_info, process_message=process_message)
1040+
def __init__(self, extractor_name, extractor_info, input_file_path, process_message=None, output_file_path=None, max_retry=10):
1041+
super(LocalConnector, self).__init__(extractor_name, extractor_info, process_message=process_message, max_retry=max_retry)
10411042
self.input_file_path = input_file_path
10421043
self.output_file_path = output_file_path
10431044
self.completed_processing = False

pyclowder/extractors.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def __init__(self):
7373
connector_default = "RabbitMQ"
7474
if os.getenv('LOCAL_PROCESSING', "False").lower() == "true":
7575
connector_default = "Local"
76+
max_retry = os.getenv('CLOWDER_MAX_RETRY', 10)
7677

7778
# create the actual extractor
7879
self.parser = argparse.ArgumentParser(description=self.extractor_info['description'])
@@ -108,6 +109,8 @@ def __init__(self):
108109
self.parser.add_argument('--version', action='version', version='%(prog)s 1.0')
109110
self.parser.add_argument('--no-bind', dest="nobind", action='store_true',
110111
help='instance will bind itself to RabbitMQ by name but NOT file type')
112+
self.parser.add_argument('--max-retry', dest='max_retry', default=max_retry,
113+
help='Maximum number of retries if an error happens in the extractor')
111114

112115
def setup(self):
113116
"""Parse command line arguments and so some setup
@@ -161,7 +164,8 @@ def start(self):
161164
rabbitmq_key=rabbitmq_key,
162165
rabbitmq_queue=self.args.rabbitmq_queuename,
163166
mounted_paths=json.loads(self.args.mounted_paths),
164-
clowder_url=self.args.clowder_url)
167+
clowder_url=self.args.clowder_url,
168+
max_retry=self.args.max_retry)
165169
connector.connect()
166170
connector.register_extractor(self.args.registration_endpoints)
167171
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()
@@ -175,7 +179,8 @@ def start(self):
175179
check_message=self.check_message,
176180
process_message=self.process_message,
177181
picklefile=self.args.hpc_picklefile,
178-
mounted_paths=json.loads(self.args.mounted_paths))
182+
mounted_paths=json.loads(self.args.mounted_paths),
183+
max_retry=self.args.max_retry)
179184
connector.register_extractor(self.args.registration_endpoints)
180185
threading.Thread(target=connector.listen, name="HPCConnector").start()
181186

@@ -191,7 +196,8 @@ def start(self):
191196
self.extractor_info,
192197
self.args.input_file_path,
193198
process_message=self.process_message,
194-
output_file_path=self.args.output_file_path)
199+
output_file_path=self.args.output_file_path,
200+
max_retry=self.args.max_retry)
195201
threading.Thread(target=connector.listen, name="LocalConnector").start()
196202
else:
197203
logger.error("Could not create instance of %s connector.", self.args.connector)

0 commit comments

Comments
 (0)