Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
a74ff91
feat: kind-version now 2025.2
Jun 23, 2025
d955a19
feat: Initial schema for replicate step declaration
Jun 23, 2025
ab94bd7
feat: Add get_generated_outputs_for_step_output() to API adapter
Jun 23, 2025
b4be311
feat: Rename new API method
Jun 23, 2025
27c5a83
feat: Add replica to step creation (and step-by-name query)
Jun 23, 2025
1d22716
feat: Removed 'as' from workflow mapping output declaration
Jun 23, 2025
0c60799
feat: Add get_running_workflow_step_output_values_for_output() to API
Jun 23, 2025
12b3602
fix: Removed rogue 'print' statement
Jun 23, 2025
da12869
test: Add mock of step outputs
Jun 26, 2025
cc5d8cc
fix: it's a mess
kaliif Jul 18, 2025
ad00205
docs: Add instance-directory comment
Aug 8, 2025
95eafcb
feat: Creating instances now adds instance-directory property to step…
Aug 8, 2025
f37ea74
fix: get_instance() response type
Aug 8, 2025
13e4470
test: Add tests for instance_directory
Aug 8, 2025
37d5ad2
Merge pull request #33 from InformaticsMatters/instance-directory
alanbchristie Aug 8, 2025
294c765
Merge branch '2.0' into fanout-jobs
kaliif Aug 8, 2025
84866d0
build: Add experimental json copy of the schema
Aug 11, 2025
20a0e3e
style: Removed unnecessary json schema file
Aug 11, 2025
82f14bf
fix: Better rfc1035-label-name pattern
Aug 11, 2025
de34339
fix: Better rfc1035-label-name regex
Aug 11, 2025
e7adc1b
test: Fix decoder tests
Aug 11, 2025
d946546
Merge branch '2.0' into fanout-jobs
kaliif Aug 15, 2025
00e49bb
test: Fix test module name (for consistency)
Aug 15, 2025
39bb840
Remove unnecessary logic
alanbchristie Aug 15, 2025
30ba59b
Merge pull request #34 from InformaticsMatters/alanbchristie-patch-1
alanbchristie Aug 15, 2025
2a6b708
fix: stashing
kaliif Aug 18, 2025
340670e
fix: stashing
kaliif Aug 19, 2025
e3ece79
refactor: Major refactor (new variable-mapping schema)
Aug 19, 2025
f9d4aca
refactor: from-workflow-variable becomes from-workflow
Aug 19, 2025
2c4c867
refactor: from-workflow-variable becomes from-workflow
Aug 19, 2025
c2eb21c
refactor: Replicate now uses variable not input
Aug 19, 2025
8d05f15
fix: No longer need realise-outputs
Aug 19, 2025
0da1a59
build: Add devcontainer
Aug 21, 2025
6ddca27
build: No need for DinD
Aug 21, 2025
bca4f9c
docs: Docs on devcontainer
Aug 21, 2025
47a5ffc
Merge pull request #35 from InformaticsMatters/2.0
alanbchristie Aug 22, 2025
b9e3f00
docs: Doc tweak
Aug 22, 2025
5a9903b
feat: Some work on the refactored engine
Aug 22, 2025
f3bbc6d
fix: More fixes for engine
Aug 27, 2025
8809440
fix: More work on the decoder
Aug 27, 2025
77e3cff
fix: Variable mapping now exposed as a Translation dataclass
Aug 27, 2025
8f1c098
fix: Major refactoring of logic (for new launch/workflow API)
Aug 28, 2025
7770d7f
feat: First successful replicating workflow test
Aug 28, 2025
c53b245
feat: Use of decoder 2.4.0 (traits)
Aug 29, 2025
3883412
refactor: Switch away from workflow replicate property
Sep 1, 2025
6397955
refactor: Refactored using decoder 2.5.0
Sep 2, 2025
7f4b0c6
docs: Doc tweak
Sep 2, 2025
4b1e868
Merge pull request #36 from InformaticsMatters/fanout-jobs
alanbchristie Sep 2, 2025
5e2c8bc
refactor: variable-map is now 'plumbing' and Translation is a 'Conne…
Sep 2, 2025
c39ddb7
refactor: Better function and variable naming (plumbing)
Sep 2, 2025
cdd936e
feat: new _prepare_step_variables function
Sep 2, 2025
c08ed5c
feat: Refactoring
Sep 2, 2025
19e58ae
refactor: More combiner logic
Sep 2, 2025
f6707c8
feat: refactor definition of an output
Sep 2, 2025
5834c8c
fix: Add get_status_of_all_step_instances_by_name implementation (and…
Sep 3, 2025
cfaeaec
docs: Doc tweak
Sep 3, 2025
7d0363e
fix: Typo in YAML
Sep 3, 2025
f336119
feat: Minor work on combiner logic
Sep 3, 2025
bea2cdb
fix: First very basic combiner run
Sep 3, 2025
94fd202
docs: Doc tweak
Sep 3, 2025
8dd9308
fix: replica always starts at 0
Sep 3, 2025
179135a
docs: Doc tweak
Sep 3, 2025
cdddc35
feat: Add from-link-prefix variables
Sep 4, 2025
17da698
Merge pull request #38 from InformaticsMatters/concat
alanbchristie Sep 4, 2025
c6a34e8
feat: Switch to pre-defined variables (rather then link)
Sep 4, 2025
04f57d2
fix: Fix input output hanes in decoder
Sep 4, 2025
8891ee5
fix: Add from-project support
Sep 4, 2025
3a8e569
refactor: lin-prefix becomes link-glob
Sep 16, 2025
dec6390
fix: Return kwargs to launch()
Sep 16, 2025
02ad4ea
feat: Add dependent_instances to LaunchParameters
Sep 18, 2025
646128f
feat: Populate dependent_instances
Sep 18, 2025
4fd2619
refactor: link-glob is now instance-link-glob
Sep 19, 2025
a1d1fea
refactor: Remove to/from project schema
Sep 19, 2025
10c3259
feat: Add outputs to step prepration response
Sep 23, 2025
8ffb3af
test: Attempt to fix tests
Sep 23, 2025
a38c626
test: Fix instance creation
Sep 23, 2025
53b858b
fix: Fix test execution
Sep 23, 2025
2d2d30e
docs: Significant level of module documentation
Sep 23, 2025
776020e
docs: More docs
Sep 23, 2025
af44290
docs: Doc tweak
Sep 23, 2025
a3f76ed
docs: More doc and typo corrections
Sep 24, 2025
18d9a82
docs: Code typos corrected
Sep 24, 2025
d37caf0
dev: Logs end of step and end of running workflow
Sep 24, 2025
594c0e8
dev: Engine now collects inputs and sets LP inputs & outputs
Sep 24, 2025
17ddd62
style: Log inputs
Sep 24, 2025
1d41048
dev: Fix handling of launch() result
Sep 24, 2025
3fe5969
fix: Better handling of get_running_workflow_step_by_name() response
Sep 25, 2025
ac23732
feat: Engine now attempts to prefix inputs (from prior steps)
Sep 25, 2025
b788cd0
fix: Fix prior-step prefix
Sep 25, 2025
cde958a
fix: Launch parameter sets are now lists
Sep 25, 2025
a5774b0
dev: fix build
Sep 25, 2025
43c516f
docs: Doc tweak
Sep 25, 2025
079736c
dev: Fix instance prefix
Sep 26, 2025
cf6cfbe
fix: Engine now handles lack of job better
Sep 30, 2025
b61baa7
fix: Validator now ensures jobs exist (run-level test)
Sep 30, 2025
e90a313
fix: Better testing of validation (job must be present)
Sep 30, 2025
967b3b3
feat: Better validation error message
Oct 2, 2025
aca2891
test: Fix tests
Oct 2, 2025
edc70ee
fix: Better handling of lack of outputs
Oct 14, 2025
d67ce50
docs: Doc tweak
Oct 14, 2025
a5113f3
fix: Fix iteration input path
Oct 14, 2025
515b7f5
test: Disable unit test
Oct 14, 2025
564e6a5
fix: Support for dirsGlob
Oct 15, 2025
350f303
fix: Better step preparation initial log (combiners)
Oct 15, 2025
5743f97
fix: Combiners no longer get built-in variables automatically (#39)
alanbchristie Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .cz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
commitizen:
name: cz_customize
customize:
schema_pattern: "^(?P<change_type>feat|fix|perf|refactor|remove|style|test|build|docs|chore|ci|BREAKING CHANGE)(?:\\((?P<scope>[^()\\r\\n]*)\\)|\\()?(?P<breaking>!)?:\\s(?P<message>.*)?"
commit_parser: "^(?P<change_type>feat|fix|perf|refactor|remove|style|test|build|docs|chore|ci|BREAKING CHANGE)(?:\\((?P<scope>[^()\\r\\n]*)\\)|\\()?(?P<breaking>!)?:\\s(?P<message>.*)?"
schema_pattern: "^(?P<change_type>feat|fix|perf|refactor|remove|style|test|build|docs|chore|ci|dev|BREAKING CHANGE)(?:\\((?P<scope>[^()\\r\\n]*)\\)|\\()?(?P<breaking>!)?:\\s(?P<message>.*)?"
commit_parser: "^(?P<change_type>feat|fix|perf|refactor|remove|style|test|build|docs|chore|ci|dev|BREAKING CHANGE)(?:\\((?P<scope>[^()\\r\\n]*)\\)|\\()?(?P<breaking>!)?:\\s(?P<message>.*)?"
# The changelog_pattern identifies the commit types
# that will be included.
# Build the changelog with 'cz ch' on the staging or production branches.
Expand Down
41 changes: 41 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "WorkflowEngine Python 3.13",
"image": "mcr.microsoft.com/devcontainers/python:1-3.13-bullseye",
"features": {
"ghcr.io/devcontainers/features/git:1": {
"ppa": true,
"version": "os-provided"
}
},
// We mount bash history in an attempt to preserver history
// between container restarts
// (see https://code.visualstudio.com/remote/advancedcontainers/persist-bash-history)
"mounts": [
"source=projectname-bashhistory,target=/commandhistory,type=volume"
],
"customizations": {
"vscode": {
"extensions": [
"codezombiech.gitignore",
"donjayamanne.githistory",
"donjayamanne.git-extension-pack",
"eamodio.gitlens",
"github.vscode-github-actions",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"ms-python.vscode-pylance",
"sourcery.sourcery",
"streetsidesoftware.code-spell-checker",
"trond-snekvik.simple-rst",
"vivaxy.vscode-conventional-commits",
"yzhang.markdown-all-in-one"
]
}
},
"postCreateCommand": {
"Install Python requirements": "pip3 install --user -r requirements.txt",
"Fix Volume Permissions": "sudo chown -R $(whoami): /commandhistory"
},
"forwardPorts": []
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dist/
**/__pycache__/
**/*.pickle
tests/project-root/project-*/
**/.DS_Store

# temp files
*~
Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ The project's written in Python and uses `Poetry`_ for dependency and package
management. We also use `pre-commit`_ to manage our pre-commit hooks, which
rely on `black`_, `mypy`_, `pylint`_, amongst others.

Create your environment::
From within a VS Code `devcontainer`_ environment (recommended)::

poetry shell
poetry install --with dev
poetry install --with dev --sync
pre-commit install -t commit-msg -t pre-commit

And then start by running the pre-commit hooks to ensure you're stating with a
Expand All @@ -51,9 +50,10 @@ _clean_ project::

And then run the tests::

coverage run -m pytest
coverage report
poetry run coverage run -m pytest
poetry run coverage report

.. _devcontainer: https://code.visualstudio.com/docs/devcontainers/containers
.. _Poetry: https://python-poetry.org
.. _pre-commit: https://pre-commit.com
.. _black: https://github.com/psf/black
Expand Down
543 changes: 306 additions & 237 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ packages = [
[tool.poetry.dependencies]
python = "^3.12"
im-protobuf = "^8.2.0"
im-data-manager-job-decoder = "^2.1.0"
im-data-manager-job-decoder = "^2.5.0"
jsonschema = "^4.21.1"
pyyaml = ">= 5.3.1, < 7.0"

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
poetry == 1.8.5
pre-commit == 4.2.0
54 changes: 40 additions & 14 deletions tests/instance_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,64 @@ def __init__(
elif os.path.isdir(file_path):
shutil.rmtree(file_path)

def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
assert launch_parameters
assert launch_parameters.project_id == TEST_PROJECT_ID
assert launch_parameters.specification
assert isinstance(launch_parameters.specification, dict)

os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)

# We're passed a RunningWorkflowStep ID but a record is expected to have been
# created bt the caller, we simply create instance records.
response, _ = self._api_adapter.get_running_workflow_step(
running_workflow_step_id=launch_parameters.running_workflow_step_id
)
# Now simulate the creation of a Task and Instance record
response = self._api_adapter.create_instance(
running_workflow_step_id=launch_parameters.running_workflow_step_id
)
if launch_parameters.step_replication_number:
assert (
launch_parameters.step_replication_number
<= launch_parameters.total_number_of_replicas
)

# Create an Instance record (and dummy Task ID)
response = self._api_adapter.create_instance()
instance_id = response["id"]
task_id = "task-00000000-0000-0000-0000-000000000001"

# Apply variables to the step's Job command.
# Create a running workflow step
assert launch_parameters.running_workflow_id
assert launch_parameters.step_name
response, _ = self._api_adapter.create_running_workflow_step(
running_workflow_id=launch_parameters.running_workflow_id,
step=launch_parameters.step_name,
instance_id=instance_id,
replica=launch_parameters.step_replication_number,
replicas=launch_parameters.total_number_of_replicas,
)
assert "id" in response
rwfs_id: str = response["id"]
# And add the variables we've been provided with
if launch_parameters.variables:
_ = self._api_adapter.set_running_workflow_step_variables(
running_workflow_step_id=rwfs_id, variables=launch_parameters.variables
)

# Now add the running workflow ID ot the instance record.
self._api_adapter.set_instance_running_workflow_step_id(
instance_id=instance_id,
running_workflow_step_id=rwfs_id,
)

# Get the job defitnion.
# This is expected to exist in the tests/job-definitions directory.
job, _ = self._api_adapter.get_job(
collection=launch_parameters.specification["collection"],
job=launch_parameters.specification["job"],
version="do-not-care",
)
assert job

# Now apply the variables to the command
# Now apply the provided variables to the command.
# The command may not need any, but we do the decoding anyway.
decoded_command, status = job_decoder.decode(
job["command"],
launch_parameters.specification_variables,
launch_parameters.running_workflow_step_id,
launch_parameters.variables,
rwfs_id,
TextEncoding.JINJA2_3_0,
)
print(f"Decoded command: {decoded_command}")
Expand Down Expand Up @@ -132,6 +157,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
self._msg_dispatcher.send(pod_message)

return LaunchResult(
running_workflow_step_id=rwfs_id,
instance_id=instance_id,
task_id=task_id,
command=" ".join(subprocess_cmd),
Expand Down
34 changes: 33 additions & 1 deletion tests/job-definitions/job-definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,36 @@ jobs:

concatenate:
command: >-
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
concatenate.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
# Simulate a multiple input files Job (combiner)...
variables:
inputs:
properties:
inputFile:
type: files
options:
type: object
properties:
inputDirPrefix:
title: Optional inoput directory prefix
type: string
outputs:
properties:
outputBase:
creates: '{{ outputFile }}'
type: file

splitsmiles:
command: >-
copyf.py {{ inputFile }}
# Simulate a multiple output files Job (splitetr)...
variables:
inputs:
properties:
inputFile:
type: file
outputs:
properties:
outputBase:
creates: '{{ outputBase }}_*.smi'
type: files
11 changes: 7 additions & 4 deletions tests/jobs/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

parser = argparse.ArgumentParser(
prog="addcol",
description="Takes a list of files and writes them into single outputfile",
description="Takes an optional directory prefix and a file,"
" and combines all the input files that are found"
" into single outputfile",
)
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
parser.add_argument("--inputDirPrefix")
parser.add_argument("--inputFile", required=True)
parser.add_argument("-o", "--outputFile", required=True)
args = parser.parse_args()


with open(args.outputFile, "wt", encoding="utf8") as ofile:
for f in args.inputFile:
ofile.write(f.read())
with open(args.inputFile, "rt", encoding="utf8") as ifile:
ofile.write(ifile.read())
30 changes: 30 additions & 0 deletions tests/jobs/copyf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import shutil
import sys
from pathlib import Path


def main():
print("copyf job runnint")
if len(sys.argv) != 2:
print("Usage: python copy_file.py <filename>")
sys.exit(1)

original_path = Path(sys.argv[1])

if not original_path.exists() or not original_path.is_file():
print(f"Error: '{original_path}' does not exist or is not a file.")
sys.exit(1)

# Create a new filename like 'example_copy.txt'
new_name = original_path.absolute().parent.joinpath("chunk_1.smi")
new_path = original_path.with_name(new_name.name)
shutil.copyfile(original_path, new_path)

new_name = original_path.absolute().parent.joinpath("chunk_2.smi")
new_path = original_path.with_name(new_name.name)

shutil.copyfile(original_path, new_path)


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions tests/jobs/copyf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#! /bin/bash

cp "$1" chunk_1.smi
cp "$1" chunk_2.smi
72 changes: 72 additions & 0 deletions tests/jobs/split-smi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/bin/bash
set -euo pipefail

if [[ $# -lt 3 || $# -gt 4 ]]; then
echo "Usage: $0 <input_file(.smi or .smi.gz)> <lines_per_file> <output_basename> [has_header: yes]"
exit 1
fi

input_file="$1"
lines_per_file="$2"
base_name="$3"
has_header="${4:-no}"

# Determine how to read the file (plain text or gzipped)
if [[ "$input_file" == *.gz ]]; then
reader="zcat"
else
reader="cat"
fi

if ! [[ -f "$input_file" ]]; then
echo "Error: File '$input_file' not found"
exit 1
fi

# Extract header if present
if [[ "$has_header" == "yes" ]]; then
header="$($reader "$input_file" | head -n1)"
data_start=2
else
header=""
data_start=1
fi

# Count number of data lines (excluding header if present)
data_lines="$($reader "$input_file" | tail -n +"$data_start" | wc -l)"
if [[ "$data_lines" -eq 0 ]]; then
echo "No data lines to process."
exit 0
fi

# Calculate number of output files and required zero padding
num_files=$(( (data_lines + lines_per_file - 1) / lines_per_file ))
pad_width=0
if [[ "$num_files" -gt 1 ]]; then
pad_width=${#num_files}
fi

# Split logic
$reader "$input_file" | tail -n +"$data_start" | awk -v header="$header" -v lines="$lines_per_file" -v base="$base_name" -v pad="$pad_width" '
function new_file() {
suffix = (pad > 0) ? sprintf("%0*d", pad, file_index) : file_index
file = base "_" suffix ".smi"
if (header != "") {
print header > file
}
file_index++
line_count = 0
}
{
if (line_count == 0) {
new_file()
}
print >> file
line_count++
if (line_count == lines) {
close(file)
print file " created"
line_count = 0
}
}
' file_index=1
6 changes: 3 additions & 3 deletions tests/message_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""The UnitTest Message Dispatcher.

A very simple object that relies on an underlying message queue.
A very simple object that relies on an underlying message queue and is designed
to emulate the behaviour of the message queue used in the Data Manager.
Here we offer a minimal implementation that simply sends a (protocol buffer) message
to the queue.
"""

from google.protobuf.message import Message

from tests.message_queue import UnitTestMessageQueue
from workflow.workflow_abc import MessageDispatcher


class UnitTestMessageDispatcher(MessageDispatcher):
class UnitTestMessageDispatcher:
"""A minimal Message dispatcher to support testing."""

def __init__(self, msg_queue: UnitTestMessageQueue):
Expand Down
Loading
Loading