Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

# v25.37.0

- Add additional logging to `sftp` & `local` protocol.
- Add `rename` option to `email` destination protocol.
- Tidy GPG logic and move gpghome creation into a temporary directory to avoid issues with multiple processes (or retries) finding broken keychains. Hopefully fixes an issue encountered when uusing EFS as a staging location and the contents of the .gnupg directory being locked.
- Performance improvements to config loading

# v25.35.2

- Fix race condition when importing addons too. See previous release notes for more details.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "opentaskpy"
version = "v25.35.2"
version = "v25.37.0"
authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }]
license-files = [ "LICENSE" ]

Expand Down Expand Up @@ -71,7 +71,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
profile = 'black'

[tool.bumpver]
current_version = "v25.35.2"
current_version = "v25.37.0"
version_pattern = "vYY.WW.PATCH[-TAG]"
commit_message = "bump version {old_version} -> {new_version}"
commit = true
Expand Down
34 changes: 34 additions & 0 deletions src/opentaskpy/cli/batch_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import logging
import os
import sys
import time

from opentaskpy.config.loader import ConfigLoader
from opentaskpy.config.schemas import (
validate_batch_json,
validate_execution_json,
validate_transfer_json,
)
from opentaskpy.otflogging import OTF_LOG_FORMAT

CONFIG_PATH = f"{os.getcwd()}/cfg"
Expand Down Expand Up @@ -106,7 +112,32 @@ def main(
return False

# Loop through the tasks and ensure that the dependencies are valid
start = time.time() * 1000
for task in batch_task_definition["tasks"]:
order_id = task["order_id"]
full_task = tasks[order_id]
# Validate that the task definition is valid
# Determine the task type and use the appropriate validation function
if full_task["type"] == "transfer":
# Validate the schema
if not validate_transfer_json(full_task):
logger.error("JSON format does not match schema")
return False

elif full_task["type"] == "execution":

# Validate the schema
if not validate_execution_json(full_task):
logger.error("JSON format does not match schema")
return False

elif full_task["type"] == "batch":

# Validate the schema
if not validate_batch_json(full_task):
logger.error("JSON format does not match schema")
return False

logger.debug(f"Checking dependencies for task {task['order_id']}")
if "dependencies" not in task:
continue
Expand All @@ -117,6 +148,9 @@ def main(
)
return False

end = time.time() * 1000
logger.info(f"Batch definition is valid in {end - start} ms")

logger.info("Batch definition is valid")
return True

Expand Down
69 changes: 57 additions & 12 deletions src/opentaskpy/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(self, config_dir: str) -> None:
self.logger = opentaskpy.otflogging.init_logging(__name__)
self.config_dir = config_dir
self.global_variables: dict = {}
self.loaded_filters: dict = {}
self.file_cache: dict[str, str] = {} # Cache for preloaded files

self.logger.log(12, f"Looking in {self.config_dir}")

Expand All @@ -41,6 +43,7 @@ def __init__(self, config_dir: str) -> None:
self.template_env = jinja2.Environment(undefined=jinja2.StrictUndefined)

self._load_filters(self.template_env.filters)
self.loaded_filters = self.template_env.filters

self._load_global_variables()

Expand All @@ -50,12 +53,33 @@ def __init__(self, config_dir: str) -> None:

self._resolve_templated_variables(lazy_load=self.lazy_load)

# Preload all files in the config directory
self._preload_files()

def _preload_files(self) -> None:
"""Preload all JSON and Jinja2 files in the config directory into memory."""
self.file_cache = {}
file_patterns = [
f"{self.config_dir}/**/*.json",
f"{self.config_dir}/**/*.json.j2",
]
for pattern in file_patterns:
for file_path in glob(pattern, recursive=True):
with open(file_path, encoding="utf-8") as file:
self.file_cache[file_path] = file.read()
self.logger.log(12, f"Preloaded {len(self.file_cache)} files into memory.")

def _load_filters(self, destination: dict) -> None:
"""Load default filters from opentaskpy.filters.default_filters.

Args:
destination (dict): The destination dictionary to load the filters into
"""
# Prevent multiple loads
if self.loaded_filters:
destination.update(self.loaded_filters)
return

# Check what functions exist in the module
for name, func in inspect.getmembers(default_filters, inspect.isfunction):
destination[name] = func
Expand All @@ -79,31 +103,46 @@ def _load_filters(self, destination: dict) -> None:
else:
self.logger.log(12, f"Couldn't import custom filter: {filter_file}")

destination = self.loaded_filters

def _override_variables_from_env(self, variables: dict, variable_type: str) -> None:
"""Overrides variables with environment variables."""
for env_var_name, env_var_value in os.environ.items():
for ( # pylint: disable=too-many-nested-blocks
env_var_name,
env_var_value,
) in os.environ.items():
if "." in env_var_name:
key_path = env_var_name.split(".")
current_dict = variables
self.logger.log(12, f"Searching for {env_var_name}")
for i, key in enumerate(key_path):
if isinstance(current_dict, dict) and key in current_dict:
if i == len(key_path) - 1:
# It's the final key, override the value
self.logger.info(
f"Overriding nested {variable_type} variable '{env_var_name}' with environment variable."
)
current_dict[key] = env_var_value
# Check the original type of the variable, if it was an int, then cast it to an int
if isinstance(current_dict[key], int):
current_dict[key] = int(env_var_value)
else:
current_dict[key] = env_var_value
else:
# Traverse deeper
current_dict = current_dict[key]
else:
# The key path does not exist in the dictionary
break

elif env_var_name in variables:
self.logger.info(
f"Overriding {variable_type} variable ({env_var_name}: {variables[env_var_name]}) with environment variable ({env_var_value})"
)
variables[env_var_name] = env_var_value
# Check the original type of the variable, if it was an int, then cast it to an int
if isinstance(variables[env_var_name], int):
variables[env_var_name] = int(env_var_value)
else:
variables[env_var_name] = env_var_value

def get_global_variables(self) -> dict:
"""Return the set of global variables that have been assigned via config files.
Expand Down Expand Up @@ -179,11 +218,12 @@ def template_lookup(self, plugin: str, **kwargs) -> str: # type: ignore[no-unty
)

# TASK DEFINITION FIND FILE
def load_task_definition(self, task_id: str) -> dict:
def load_task_definition(self, task_id: str, cache: bool = True) -> dict:
"""Load the task definition from the config directory.

Args:
task_id (str): The id of the task to load
cache (bool, optional): Whether to use the cache or load from disk. Defaults to True.

Raises:
DuplicateConfigFileError: Raised if more than one config file is found
Expand All @@ -193,20 +233,25 @@ def load_task_definition(self, task_id: str) -> dict:
Returns:
dict: A dictionary representing the task definition
"""
json_config = glob(f"{self.config_dir}/**/{task_id}.json", recursive=True)
json_config.extend(
glob(f"{self.config_dir}/**/{task_id}.json.j2", recursive=True)
)

if not json_config or len(json_config) != 1:
if len(json_config) > 1:
if not cache:
self._preload_files()

# Search for files matching the task_id in the preloaded cache
matching_files = [
path
for path in self.file_cache
if path.endswith(f"{task_id}.json") or path.endswith(f"{task_id}.json.j2")
]

if not matching_files or len(matching_files) != 1:
if len(matching_files) > 1:
raise DuplicateConfigFileError(
f"Found more than one task with name: {task_id}"
)

raise FileNotFoundError(f"Couldn't find task with name: {task_id}")

found_file = json_config[0]
found_file = matching_files[0]
self.logger.log(12, f"Found: {found_file}")

task_definition = self._enrich_variables(found_file)
Expand Down
11 changes: 8 additions & 3 deletions src/opentaskpy/config/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ def validate_transfer_json(json_data: dict) -> bool:
module_path = schema_dir

schema_def = {
"$ref": Path(
f"{module_path}/transfer/{destination_protocol}_destination.json"
).as_uri()
"$ref": (
Path(
f"{module_path}/transfer/{destination_protocol}_destination.json"
).as_uri()
)
}

# If schema_refs does not already contain the schema_def, then append it
Expand Down Expand Up @@ -249,6 +251,7 @@ def validate_transfer_json(json_data: dict) -> bool:

except ValidationError as err:
print(err.message) # noqa: T201
logger.error(err.message)
return False
return True

Expand Down Expand Up @@ -312,6 +315,7 @@ def validate_execution_json(json_data: dict) -> bool:

except ValidationError as err:
print(err.message) # noqa: T201
logger.error(err.message)
return False
return True

Expand All @@ -337,5 +341,6 @@ def validate_batch_json(json_data: dict) -> bool:

except ValidationError as err:
print(err.message) # noqa: T201
logger.error(err.message)
return False
return True
15 changes: 15 additions & 0 deletions src/opentaskpy/config/schemas/transfer/email/rename.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "http://localhost/transfer/email/rename.json",
"type": "object",
"properties": {
"pattern": {
"type": "string"
},
"sub": {
"type": "string"
}
},
"required": ["pattern", "sub"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
"messageContentFilename": {
"type": "string"
},
"rename": {
"$ref": "http://localhost/transfer/email/rename.json"
},
"deleteContentFileAfterTransfer": {
"type": "boolean",
"default": true
Expand Down
25 changes: 21 additions & 4 deletions src/opentaskpy/remotehandlers/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import glob
import os
import re
import smtplib
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
Expand Down Expand Up @@ -81,8 +82,24 @@ def push_files_from_worker(
else:
files = glob.glob(f"{local_staging_directory}/*")

# Rename files if required

# Don't rename the files themselves, because it's unnecessary, we just need
# to set the new file names in the email attachment
file_names = []
if "rename" in self.spec:
# Handle any rename that might be specified in the spec

rename_regex = self.spec["rename"]["pattern"]
rename_sub = self.spec["rename"]["sub"]

for file in files:
file_names.append(re.sub(rename_regex, rename_sub, file))
else:
file_names = files

# Get comma separated list of files
attachment_file_list = ", ".join([file.split("/")[-1] for file in files])
attachment_file_list = ", ".join([file.split("/")[-1] for file in file_names])

# Add a body to the email
content_type = (
Expand Down Expand Up @@ -124,9 +141,9 @@ def push_files_from_worker(
msg = MIMEMultipart()

# Attach the files to the message
for file in files:
# Strip the directory from the file
file_name = file.split("/")[-1]
for file, file_name in zip(files, file_names):
# Strip the directory from the file name
file_name = file_name.split("/")[-1]
self.logger.debug(f"Emailing file: {files} to {email_address}")
try:
with open(file, "rb") as file_handle:
Expand Down
13 changes: 13 additions & 0 deletions src/opentaskpy/remotehandlers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def push_files_from_worker(
shutil.copy(file, final_destination)
if mode:
os.chmod(final_destination, int(mode, base=8))

self.logger.info(
f"[LOCALHOST] Copied file {file} to {final_destination}"
)
except Exception as ex: # pylint: disable=broad-exception-caught
self.logger.error(f"[LOCALHOST] Failed to move file: {ex}")
result = 1
Expand Down Expand Up @@ -227,6 +231,7 @@ def handle_post_copy_action(self, files: list[str]) -> int:
try:
self.logger.info(f"[LOCALHOST] Deleting file {file}")
os.remove(file)
self.logger.info(f"[LOCALHOST] Deleted file {file}")
except OSError:
self.logger.error(
f"[LOCALHOST] Could not delete file {file} on source host"
Expand Down Expand Up @@ -272,6 +277,9 @@ def handle_post_copy_action(self, files: list[str]) -> int:
file,
f"{self.spec['postCopyAction']['destination']}/{file_name}",
)
self.logger.info(
f"[LOCALHOST] Renamed file {file} to {self.spec['postCopyAction']['destination']}/{file_name}"
)
# If this is a rename, then we need to rename the file
if self.spec["postCopyAction"]["action"] == "rename":
# Determine the new file name
Expand All @@ -292,6 +300,9 @@ def handle_post_copy_action(self, files: list[str]) -> int:
f" {new_file_dir}/{new_file_name}"
)
os.rename(file, f"{new_file_dir}/{new_file_name}")
self.logger.info(
f"[LOCALHOST] Renamed file {file} to {new_file_dir}/{new_file_name}"
)
except OSError as e:
self.logger.error(f"[LOCALHOST] Error: {e}")
self.logger.error(
Expand Down Expand Up @@ -320,6 +331,8 @@ def create_flag_files(self) -> int:
if "permissions" in self.spec:
os.chmod(filename, int(self.spec["permissions"], base=8))

self.logger.info(f"[LOCALHOST] Created flag file: {filename}")

except OSError as e:
self.logger.error(f"[LOCALHOST] Error: {e}")
self.logger.error(f"[LOCALHOST] Error creating flag file: {filename}")
Expand Down
Loading