Skip to content

Commit 586bb04

Browse files
authored
feat: v25.37.0 (#143)
* feat: Tidy GPG logic and move gpg home to a temp dir (#140) * Tidy GPG logic and move gpg home to a temp dir * Forgot to keep exceptions * Feat: Email renaming (#141) * Add email filename renaming * Add email rename tests * Added more verbose logging (#142) * Update CHANGELOG * bump version v25.35.2 -> v25.37.0 * Prevent reloading filters for every template validation * Cache file list at runtime * Add proper logging for schema errors * Add schema validation * Update changelog * Add config loader tests * Fix tests and add file cache clearing to help * Fix email test
1 parent 7cfb1dc commit 586bb04

File tree

14 files changed

+410
-86
lines changed

14 files changed

+410
-86
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
# v25.37.0
4+
5+
- Add additional logging to `sftp` & `local` protocol.
6+
- Add `rename` option to `email` destination protocol.
7+
- Tidy GPG logic and move gpghome creation into a temporary directory to avoid issues with multiple processes (or retries) finding broken keychains. Hopefully fixes an issue encountered when uusing EFS as a staging location and the contents of the .gnupg directory being locked.
8+
- Performance improvements to config loading
9+
310
# v25.35.2
411

512
- Fix race condition when importing addons too. See previous release notes for more details.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "opentaskpy"
7-
version = "v25.35.2"
7+
version = "v25.37.0"
88
authors = [{ name = "Adam McDonagh", email = "[email protected]" }]
99
license-files = [ "LICENSE" ]
1010

@@ -71,7 +71,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
7171
profile = 'black'
7272

7373
[tool.bumpver]
74-
current_version = "v25.35.2"
74+
current_version = "v25.37.0"
7575
version_pattern = "vYY.WW.PATCH[-TAG]"
7676
commit_message = "bump version {old_version} -> {new_version}"
7777
commit = true

src/opentaskpy/cli/batch_validator.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@
55
import logging
66
import os
77
import sys
8+
import time
89

910
from opentaskpy.config.loader import ConfigLoader
11+
from opentaskpy.config.schemas import (
12+
validate_batch_json,
13+
validate_execution_json,
14+
validate_transfer_json,
15+
)
1016
from opentaskpy.otflogging import OTF_LOG_FORMAT
1117

1218
CONFIG_PATH = f"{os.getcwd()}/cfg"
@@ -106,7 +112,32 @@ def main(
106112
return False
107113

108114
# Loop through the tasks and ensure that the dependencies are valid
115+
start = time.time() * 1000
109116
for task in batch_task_definition["tasks"]:
117+
order_id = task["order_id"]
118+
full_task = tasks[order_id]
119+
# Validate that the task definition is valid
120+
# Determine the task type and use the appropriate validation function
121+
if full_task["type"] == "transfer":
122+
# Validate the schema
123+
if not validate_transfer_json(full_task):
124+
logger.error("JSON format does not match schema")
125+
return False
126+
127+
elif full_task["type"] == "execution":
128+
129+
# Validate the schema
130+
if not validate_execution_json(full_task):
131+
logger.error("JSON format does not match schema")
132+
return False
133+
134+
elif full_task["type"] == "batch":
135+
136+
# Validate the schema
137+
if not validate_batch_json(full_task):
138+
logger.error("JSON format does not match schema")
139+
return False
140+
110141
logger.debug(f"Checking dependencies for task {task['order_id']}")
111142
if "dependencies" not in task:
112143
continue
@@ -117,6 +148,9 @@ def main(
117148
)
118149
return False
119150

151+
end = time.time() * 1000
152+
logger.info(f"Batch definition is valid in {end - start} ms")
153+
120154
logger.info("Batch definition is valid")
121155
return True
122156

src/opentaskpy/config/loader.py

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def __init__(self, config_dir: str) -> None:
3333
self.logger = opentaskpy.otflogging.init_logging(__name__)
3434
self.config_dir = config_dir
3535
self.global_variables: dict = {}
36+
self.loaded_filters: dict = {}
37+
self.file_cache: dict[str, str] = {} # Cache for preloaded files
3638

3739
self.logger.log(12, f"Looking in {self.config_dir}")
3840

@@ -41,6 +43,7 @@ def __init__(self, config_dir: str) -> None:
4143
self.template_env = jinja2.Environment(undefined=jinja2.StrictUndefined)
4244

4345
self._load_filters(self.template_env.filters)
46+
self.loaded_filters = self.template_env.filters
4447

4548
self._load_global_variables()
4649

@@ -50,12 +53,33 @@ def __init__(self, config_dir: str) -> None:
5053

5154
self._resolve_templated_variables(lazy_load=self.lazy_load)
5255

56+
# Preload all files in the config directory
57+
self._preload_files()
58+
59+
def _preload_files(self) -> None:
60+
"""Preload all JSON and Jinja2 files in the config directory into memory."""
61+
self.file_cache = {}
62+
file_patterns = [
63+
f"{self.config_dir}/**/*.json",
64+
f"{self.config_dir}/**/*.json.j2",
65+
]
66+
for pattern in file_patterns:
67+
for file_path in glob(pattern, recursive=True):
68+
with open(file_path, encoding="utf-8") as file:
69+
self.file_cache[file_path] = file.read()
70+
self.logger.log(12, f"Preloaded {len(self.file_cache)} files into memory.")
71+
5372
def _load_filters(self, destination: dict) -> None:
5473
"""Load default filters from opentaskpy.filters.default_filters.
5574
5675
Args:
5776
destination (dict): The destination dictionary to load the filters into
5877
"""
78+
# Prevent multiple loads
79+
if self.loaded_filters:
80+
destination.update(self.loaded_filters)
81+
return
82+
5983
# Check what functions exist in the module
6084
for name, func in inspect.getmembers(default_filters, inspect.isfunction):
6185
destination[name] = func
@@ -79,31 +103,46 @@ def _load_filters(self, destination: dict) -> None:
79103
else:
80104
self.logger.log(12, f"Couldn't import custom filter: {filter_file}")
81105

106+
destination = self.loaded_filters
107+
82108
def _override_variables_from_env(self, variables: dict, variable_type: str) -> None:
83109
"""Overrides variables with environment variables."""
84-
for env_var_name, env_var_value in os.environ.items():
110+
for ( # pylint: disable=too-many-nested-blocks
111+
env_var_name,
112+
env_var_value,
113+
) in os.environ.items():
85114
if "." in env_var_name:
86115
key_path = env_var_name.split(".")
87116
current_dict = variables
117+
self.logger.log(12, f"Searching for {env_var_name}")
88118
for i, key in enumerate(key_path):
89119
if isinstance(current_dict, dict) and key in current_dict:
90120
if i == len(key_path) - 1:
91121
# It's the final key, override the value
92122
self.logger.info(
93123
f"Overriding nested {variable_type} variable '{env_var_name}' with environment variable."
94124
)
95-
current_dict[key] = env_var_value
125+
# Check the original type of the variable, if it was an int, then cast it to an int
126+
if isinstance(current_dict[key], int):
127+
current_dict[key] = int(env_var_value)
128+
else:
129+
current_dict[key] = env_var_value
96130
else:
97131
# Traverse deeper
98132
current_dict = current_dict[key]
99133
else:
100134
# The key path does not exist in the dictionary
101135
break
136+
102137
elif env_var_name in variables:
103138
self.logger.info(
104139
f"Overriding {variable_type} variable ({env_var_name}: {variables[env_var_name]}) with environment variable ({env_var_value})"
105140
)
106-
variables[env_var_name] = env_var_value
141+
# Check the original type of the variable, if it was an int, then cast it to an int
142+
if isinstance(variables[env_var_name], int):
143+
variables[env_var_name] = int(env_var_value)
144+
else:
145+
variables[env_var_name] = env_var_value
107146

108147
def get_global_variables(self) -> dict:
109148
"""Return the set of global variables that have been assigned via config files.
@@ -179,11 +218,12 @@ def template_lookup(self, plugin: str, **kwargs) -> str: # type: ignore[no-unty
179218
)
180219

181220
# TASK DEFINITION FIND FILE
182-
def load_task_definition(self, task_id: str) -> dict:
221+
def load_task_definition(self, task_id: str, cache: bool = True) -> dict:
183222
"""Load the task definition from the config directory.
184223
185224
Args:
186225
task_id (str): The id of the task to load
226+
cache (bool, optional): Whether to use the cache or load from disk. Defaults to True.
187227
188228
Raises:
189229
DuplicateConfigFileError: Raised if more than one config file is found
@@ -193,20 +233,25 @@ def load_task_definition(self, task_id: str) -> dict:
193233
Returns:
194234
dict: A dictionary representing the task definition
195235
"""
196-
json_config = glob(f"{self.config_dir}/**/{task_id}.json", recursive=True)
197-
json_config.extend(
198-
glob(f"{self.config_dir}/**/{task_id}.json.j2", recursive=True)
199-
)
200-
201-
if not json_config or len(json_config) != 1:
202-
if len(json_config) > 1:
236+
if not cache:
237+
self._preload_files()
238+
239+
# Search for files matching the task_id in the preloaded cache
240+
matching_files = [
241+
path
242+
for path in self.file_cache
243+
if path.endswith(f"{task_id}.json") or path.endswith(f"{task_id}.json.j2")
244+
]
245+
246+
if not matching_files or len(matching_files) != 1:
247+
if len(matching_files) > 1:
203248
raise DuplicateConfigFileError(
204249
f"Found more than one task with name: {task_id}"
205250
)
206251

207252
raise FileNotFoundError(f"Couldn't find task with name: {task_id}")
208253

209-
found_file = json_config[0]
254+
found_file = matching_files[0]
210255
self.logger.log(12, f"Found: {found_file}")
211256

212257
task_definition = self._enrich_variables(found_file)

src/opentaskpy/config/schemas.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,11 @@ def validate_transfer_json(json_data: dict) -> bool:
215215
module_path = schema_dir
216216

217217
schema_def = {
218-
"$ref": Path(
219-
f"{module_path}/transfer/{destination_protocol}_destination.json"
220-
).as_uri()
218+
"$ref": (
219+
Path(
220+
f"{module_path}/transfer/{destination_protocol}_destination.json"
221+
).as_uri()
222+
)
221223
}
222224

223225
# If schema_refs does not already contain the schema_def, then append it
@@ -249,6 +251,7 @@ def validate_transfer_json(json_data: dict) -> bool:
249251

250252
except ValidationError as err:
251253
print(err.message) # noqa: T201
254+
logger.error(err.message)
252255
return False
253256
return True
254257

@@ -312,6 +315,7 @@ def validate_execution_json(json_data: dict) -> bool:
312315

313316
except ValidationError as err:
314317
print(err.message) # noqa: T201
318+
logger.error(err.message)
315319
return False
316320
return True
317321

@@ -337,5 +341,6 @@ def validate_batch_json(json_data: dict) -> bool:
337341

338342
except ValidationError as err:
339343
print(err.message) # noqa: T201
344+
logger.error(err.message)
340345
return False
341346
return True
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"$schema": "https://json-schema.org/draft/2020-12/schema",
3+
"$id": "http://localhost/transfer/email/rename.json",
4+
"type": "object",
5+
"properties": {
6+
"pattern": {
7+
"type": "string"
8+
},
9+
"sub": {
10+
"type": "string"
11+
}
12+
},
13+
"required": ["pattern", "sub"],
14+
"additionalProperties": false
15+
}

src/opentaskpy/config/schemas/transfer/email_destination.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
"messageContentFilename": {
2525
"type": "string"
2626
},
27+
"rename": {
28+
"$ref": "http://localhost/transfer/email/rename.json"
29+
},
2730
"deleteContentFileAfterTransfer": {
2831
"type": "boolean",
2932
"default": true

src/opentaskpy/remotehandlers/email.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import glob
44
import os
5+
import re
56
import smtplib
67
from email.mime.application import MIMEApplication
78
from email.mime.multipart import MIMEMultipart
@@ -81,8 +82,24 @@ def push_files_from_worker(
8182
else:
8283
files = glob.glob(f"{local_staging_directory}/*")
8384

85+
# Rename files if required
86+
87+
# Don't rename the files themselves, because it's unnecessary, we just need
88+
# to set the new file names in the email attachment
89+
file_names = []
90+
if "rename" in self.spec:
91+
# Handle any rename that might be specified in the spec
92+
93+
rename_regex = self.spec["rename"]["pattern"]
94+
rename_sub = self.spec["rename"]["sub"]
95+
96+
for file in files:
97+
file_names.append(re.sub(rename_regex, rename_sub, file))
98+
else:
99+
file_names = files
100+
84101
# Get comma separated list of files
85-
attachment_file_list = ", ".join([file.split("/")[-1] for file in files])
102+
attachment_file_list = ", ".join([file.split("/")[-1] for file in file_names])
86103

87104
# Add a body to the email
88105
content_type = (
@@ -124,9 +141,9 @@ def push_files_from_worker(
124141
msg = MIMEMultipart()
125142

126143
# Attach the files to the message
127-
for file in files:
128-
# Strip the directory from the file
129-
file_name = file.split("/")[-1]
144+
for file, file_name in zip(files, file_names):
145+
# Strip the directory from the file name
146+
file_name = file_name.split("/")[-1]
130147
self.logger.debug(f"Emailing file: {files} to {email_address}")
131148
try:
132149
with open(file, "rb") as file_handle:

src/opentaskpy/remotehandlers/local.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ def push_files_from_worker(
193193
shutil.copy(file, final_destination)
194194
if mode:
195195
os.chmod(final_destination, int(mode, base=8))
196+
197+
self.logger.info(
198+
f"[LOCALHOST] Copied file {file} to {final_destination}"
199+
)
196200
except Exception as ex: # pylint: disable=broad-exception-caught
197201
self.logger.error(f"[LOCALHOST] Failed to move file: {ex}")
198202
result = 1
@@ -227,6 +231,7 @@ def handle_post_copy_action(self, files: list[str]) -> int:
227231
try:
228232
self.logger.info(f"[LOCALHOST] Deleting file {file}")
229233
os.remove(file)
234+
self.logger.info(f"[LOCALHOST] Deleted file {file}")
230235
except OSError:
231236
self.logger.error(
232237
f"[LOCALHOST] Could not delete file {file} on source host"
@@ -272,6 +277,9 @@ def handle_post_copy_action(self, files: list[str]) -> int:
272277
file,
273278
f"{self.spec['postCopyAction']['destination']}/{file_name}",
274279
)
280+
self.logger.info(
281+
f"[LOCALHOST] Renamed file {file} to {self.spec['postCopyAction']['destination']}/{file_name}"
282+
)
275283
# If this is a rename, then we need to rename the file
276284
if self.spec["postCopyAction"]["action"] == "rename":
277285
# Determine the new file name
@@ -292,6 +300,9 @@ def handle_post_copy_action(self, files: list[str]) -> int:
292300
f" {new_file_dir}/{new_file_name}"
293301
)
294302
os.rename(file, f"{new_file_dir}/{new_file_name}")
303+
self.logger.info(
304+
f"[LOCALHOST] Renamed file {file} to {new_file_dir}/{new_file_name}"
305+
)
295306
except OSError as e:
296307
self.logger.error(f"[LOCALHOST] Error: {e}")
297308
self.logger.error(
@@ -320,6 +331,8 @@ def create_flag_files(self) -> int:
320331
if "permissions" in self.spec:
321332
os.chmod(filename, int(self.spec["permissions"], base=8))
322333

334+
self.logger.info(f"[LOCALHOST] Created flag file: {filename}")
335+
323336
except OSError as e:
324337
self.logger.error(f"[LOCALHOST] Error: {e}")
325338
self.logger.error(f"[LOCALHOST] Error creating flag file: {filename}")

0 commit comments

Comments
 (0)