Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a74ff91
feat: kind-version now 2025.2
Jun 23, 2025
d955a19
feat: Initial schema for replicate step declaration
Jun 23, 2025
ab94bd7
feat: Add get_generated_outputs_for_step_output() to API adapter
Jun 23, 2025
b4be311
feat: Rename new API method
Jun 23, 2025
27c5a83
feat: Add replica to step creation (and step-by-name query)
Jun 23, 2025
1d22716
feat: Removed 'as' from workflow mapping output declaration
Jun 23, 2025
0c60799
feat: Add get_running_workflow_step_output_values_for_output() to API
Jun 23, 2025
12b3602
fix: Removed rogue 'print' statement
Jun 23, 2025
da12869
test: Add mock of step outputs
Jun 26, 2025
cc5d8cc
fix: it's a mess
kaliif Jul 18, 2025
ad00205
docs: Add instance-directory comment
Aug 8, 2025
95eafcb
feat: Creating instances now adds instance-directory property to step…
Aug 8, 2025
f37ea74
fix: get_instance() response type
Aug 8, 2025
13e4470
test: Add tests for instance_directory
Aug 8, 2025
37d5ad2
Merge pull request #33 from InformaticsMatters/instance-directory
alanbchristie Aug 8, 2025
294c765
Merge branch '2.0' into fanout-jobs
kaliif Aug 8, 2025
84866d0
build: Add experimental json copy of the schema
Aug 11, 2025
20a0e3e
style: Removed unnecessary json schema file
Aug 11, 2025
82f14bf
fix: Better rfc1035-label-name pattern
Aug 11, 2025
de34339
fix: Better rfc1035-label-name regex
Aug 11, 2025
e7adc1b
test: Fix decoder tests
Aug 11, 2025
d946546
Merge branch '2.0' into fanout-jobs
kaliif Aug 15, 2025
00e49bb
test: Fix test module name (for consistency)
Aug 15, 2025
39bb840
Remove unnecessary logic
alanbchristie Aug 15, 2025
30ba59b
Merge pull request #34 from InformaticsMatters/alanbchristie-patch-1
alanbchristie Aug 15, 2025
2a6b708
fix: stashing
kaliif Aug 18, 2025
340670e
fix: stashing
kaliif Aug 19, 2025
e3ece79
refactor: Major refactor (new variable-mapping schema)
Aug 19, 2025
f9d4aca
refactor: from-workflow-variable becomes from-workflow
Aug 19, 2025
2c4c867
refactor: from-workflow-variable becomes from-workflow
Aug 19, 2025
c2eb21c
refactor: Replicate now uses variable not input
Aug 19, 2025
8d05f15
fix: No longer need realise-outputs
Aug 19, 2025
0da1a59
build: Add devcontainer
Aug 21, 2025
6ddca27
build: No need for DinD
Aug 21, 2025
bca4f9c
docs: Docs on devcontainer
Aug 21, 2025
47a5ffc
Merge pull request #35 from InformaticsMatters/2.0
alanbchristie Aug 22, 2025
b9e3f00
docs: Doc tweak
Aug 22, 2025
5a9903b
feat: Some work on the refactored engine
Aug 22, 2025
f3bbc6d
fix: More fixes for engine
Aug 27, 2025
8809440
fix: More work on the decoder
Aug 27, 2025
77e3cff
fix: Variable mapping now exposed as a Translation dataclass
Aug 27, 2025
8f1c098
fix: Major refactoring of logic (for new launch/workflow API)
Aug 28, 2025
7770d7f
feat: First successful replicating workflow test
Aug 28, 2025
c53b245
feat: Use of decoder 2.4.0 (traits)
Aug 29, 2025
3883412
refactor: Switch away from workflow replicate property
Sep 1, 2025
6397955
refactor: Refactored using decoder 2.5.0
Sep 2, 2025
7f4b0c6
docs: Doc tweak
Sep 2, 2025
4b1e868
Merge pull request #36 from InformaticsMatters/fanout-jobs
alanbchristie Sep 2, 2025
5e2c8bc
refactor: variable-map is now 'plumbing' and Translation is a 'Conne…
Sep 2, 2025
c39ddb7
refactor: Better function and variable naming (plumbing)
Sep 2, 2025
cdd936e
feat: new _prepare_step_variables function
Sep 2, 2025
c08ed5c
feat: Refactoring
Sep 2, 2025
19e58ae
refactor: More combiner logic
Sep 2, 2025
f6707c8
feat: refactor definition of an output
Sep 2, 2025
5834c8c
fix: Add get_status_of_all_step_instances_by_name implementation (and…
Sep 3, 2025
cfaeaec
docs: Doc tweak
Sep 3, 2025
7d0363e
fix: Typo in YAML
Sep 3, 2025
f336119
feat: Minor work on combiner logic
Sep 3, 2025
bea2cdb
fix: First very basic combiner run
Sep 3, 2025
94fd202
docs: Doc tweak
Sep 3, 2025
8dd9308
fix: replica always starts at 0
Sep 3, 2025
179135a
docs: Doc tweak
Sep 3, 2025
cdddc35
feat: Add from-link-prefix variables
Sep 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "WorkflowEngine Python 3.13",
"image": "mcr.microsoft.com/devcontainers/python:1-3.13-bullseye",
"features": {
"ghcr.io/devcontainers/features/git:1": {
"ppa": true,
"version": "os-provided"
}
},
// We mount bash history in an attempt to preserver history
// between container restarts
// (see https://code.visualstudio.com/remote/advancedcontainers/persist-bash-history)
"mounts": [
"source=projectname-bashhistory,target=/commandhistory,type=volume"
],
"customizations": {
"vscode": {
"extensions": [
"codezombiech.gitignore",
"donjayamanne.githistory",
"donjayamanne.git-extension-pack",
"eamodio.gitlens",
"github.vscode-github-actions",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"ms-python.vscode-pylance",
"sourcery.sourcery",
"streetsidesoftware.code-spell-checker",
"trond-snekvik.simple-rst",
"vivaxy.vscode-conventional-commits",
"yzhang.markdown-all-in-one"
]
}
},
"postCreateCommand": {
"Install Python requirements": "pip3 install --user -r requirements.txt",
"Fix Volume Permissions": "sudo chown -R $(whoami): /commandhistory"
},
"forwardPorts": []
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dist/
**/__pycache__/
**/*.pickle
tests/project-root/project-*/
**/.DS_Store

# temp files
*~
Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ The project's written in Python and uses `Poetry`_ for dependency and package
management. We also use `pre-commit`_ to manage our pre-commit hooks, which
rely on `black`_, `mypy`_, `pylint`_, amongst others.

Create your environment::
From within a VS Code `devcontainer`_ environment (recommended)::

poetry shell
poetry install --with dev
poetry install --with dev --sync
pre-commit install -t commit-msg -t pre-commit

And then start by running the pre-commit hooks to ensure you're stating with a
Expand All @@ -51,9 +50,10 @@ _clean_ project::

And then run the tests::

coverage run -m pytest
coverage report
poetry run coverage run -m pytest
poetry run coverage report

.. _devcontainer: https://code.visualstudio.com/docs/devcontainers/containers
.. _Poetry: https://python-poetry.org
.. _pre-commit: https://pre-commit.com
.. _black: https://github.com/psf/black
Expand Down
543 changes: 306 additions & 237 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ packages = [
[tool.poetry.dependencies]
python = "^3.12"
im-protobuf = "^8.2.0"
im-data-manager-job-decoder = "^2.1.0"
im-data-manager-job-decoder = "^2.5.0"
jsonschema = "^4.21.1"
pyyaml = ">= 5.3.1, < 7.0"

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
poetry == 1.8.5
pre-commit == 4.2.0
45 changes: 32 additions & 13 deletions tests/instance_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,57 @@ def __init__(
elif os.path.isdir(file_path):
shutil.rmtree(file_path)

def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
assert launch_parameters
assert launch_parameters.project_id == TEST_PROJECT_ID
assert launch_parameters.specification
assert isinstance(launch_parameters.specification, dict)

os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)

# We're passed a RunningWorkflowStep ID but a record is expected to have been
# created bt the caller, we simply create instance records.
response, _ = self._api_adapter.get_running_workflow_step(
running_workflow_step_id=launch_parameters.running_workflow_step_id
)
# Now simulate the creation of a Task and Instance record
response = self._api_adapter.create_instance(
running_workflow_step_id=launch_parameters.running_workflow_step_id
if launch_parameters.step_replication_number:
assert (
launch_parameters.step_replication_number
<= launch_parameters.total_number_of_replicas
)

# Create a running workflow step
assert launch_parameters.running_workflow_id
assert launch_parameters.step_name
response, _ = self._api_adapter.create_running_workflow_step(
running_workflow_id=launch_parameters.running_workflow_id,
step=launch_parameters.step_name,
replica=launch_parameters.step_replication_number,
replicas=launch_parameters.total_number_of_replicas,
)
assert "id" in response
rwfs_id: str = response["id"]
# And add the variables we've been provided with
if launch_parameters.variables:
_ = self._api_adapter.set_running_workflow_step_variables(
running_workflow_step_id=rwfs_id, variables=launch_parameters.variables
)

# Create an Instance record (and dummy Task ID)
response = self._api_adapter.create_instance(running_workflow_step_id=rwfs_id)
instance_id = response["id"]
task_id = "task-00000000-0000-0000-0000-000000000001"

# Apply variables to the step's Job command.
# Get the job defitnion.
# This is expected to exist in the tests/job-definitions directory.
job, _ = self._api_adapter.get_job(
collection=launch_parameters.specification["collection"],
job=launch_parameters.specification["job"],
version="do-not-care",
)
assert job

# Now apply the variables to the command
# Now apply the provided variables to the command.
# The command may not need any, but we do the decoding anyway.
decoded_command, status = job_decoder.decode(
job["command"],
launch_parameters.specification_variables,
launch_parameters.running_workflow_step_id,
launch_parameters.variables,
rwfs_id,
TextEncoding.JINJA2_3_0,
)
print(f"Decoded command: {decoded_command}")
Expand Down Expand Up @@ -132,6 +150,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
self._msg_dispatcher.send(pod_message)

return LaunchResult(
running_workflow_step_id=rwfs_id,
instance_id=instance_id,
task_id=task_id,
command=" ".join(subprocess_cmd),
Expand Down
34 changes: 33 additions & 1 deletion tests/job-definitions/job-definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,36 @@ jobs:

concatenate:
command: >-
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
concatenate.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
# Simulate a multiple input files Job (combiner)...
variables:
inputs:
properties:
inputFile:
type: files
options:
type: object
properties:
inputDirPrefix:
title: Optional inoput directory prefix
type: string
outputs:
properties:
outputBase:
creates: '{{ outputFile }}'
type: file

splitsmiles:
command: >-
copyf.py {{ inputFile }}
# Simulate a multiple output files Job (splitetr)...
variables:
inputs:
properties:
inputFile:
type: file
outputs:
properties:
outputBase:
creates: '{{ outputBase }}_*.smi'
type: files
11 changes: 7 additions & 4 deletions tests/jobs/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

parser = argparse.ArgumentParser(
prog="addcol",
description="Takes a list of files and writes them into single outputfile",
description="Takes an optional directory prefix and a file,"
" and combines all the input files that are found"
" into single outputfile",
)
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
parser.add_argument("--inputDirPrefix")
parser.add_argument("--inputFile", required=True)
parser.add_argument("-o", "--outputFile", required=True)
args = parser.parse_args()


with open(args.outputFile, "wt", encoding="utf8") as ofile:
for f in args.inputFile:
ofile.write(f.read())
with open(args.inputFile, "rt", encoding="utf8") as ifile:
ofile.write(ifile.read())
30 changes: 30 additions & 0 deletions tests/jobs/copyf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import shutil
import sys
from pathlib import Path


def main():
print("copyf job runnint")
if len(sys.argv) != 2:
print("Usage: python copy_file.py <filename>")
sys.exit(1)

original_path = Path(sys.argv[1])

if not original_path.exists() or not original_path.is_file():
print(f"Error: '{original_path}' does not exist or is not a file.")
sys.exit(1)

# Create a new filename like 'example_copy.txt'
new_name = original_path.absolute().parent.joinpath("chunk_1.smi")
new_path = original_path.with_name(new_name.name)
shutil.copyfile(original_path, new_path)

new_name = original_path.absolute().parent.joinpath("chunk_2.smi")
new_path = original_path.with_name(new_name.name)

shutil.copyfile(original_path, new_path)


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions tests/jobs/copyf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#! /bin/bash

cp "$1" chunk_1.smi
cp "$1" chunk_2.smi
72 changes: 72 additions & 0 deletions tests/jobs/split-smi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/bin/bash
set -euo pipefail

if [[ $# -lt 3 || $# -gt 4 ]]; then
echo "Usage: $0 <input_file(.smi or .smi.gz)> <lines_per_file> <output_basename> [has_header: yes]"
exit 1
fi

input_file="$1"
lines_per_file="$2"
base_name="$3"
has_header="${4:-no}"

# Determine how to read the file (plain text or gzipped)
if [[ "$input_file" == *.gz ]]; then
reader="zcat"
else
reader="cat"
fi

if ! [[ -f "$input_file" ]]; then
echo "Error: File '$input_file' not found"
exit 1
fi

# Extract header if present
if [[ "$has_header" == "yes" ]]; then
header="$($reader "$input_file" | head -n1)"
data_start=2
else
header=""
data_start=1
fi

# Count number of data lines (excluding header if present)
data_lines="$($reader "$input_file" | tail -n +"$data_start" | wc -l)"
if [[ "$data_lines" -eq 0 ]]; then
echo "No data lines to process."
exit 0
fi

# Calculate number of output files and required zero padding
num_files=$(( (data_lines + lines_per_file - 1) / lines_per_file ))
pad_width=0
if [[ "$num_files" -gt 1 ]]; then
pad_width=${#num_files}
fi

# Split logic
$reader "$input_file" | tail -n +"$data_start" | awk -v header="$header" -v lines="$lines_per_file" -v base="$base_name" -v pad="$pad_width" '
function new_file() {
suffix = (pad > 0) ? sprintf("%0*d", pad, file_index) : file_index
file = base "_" suffix ".smi"
if (header != "") {
print header > file
}
file_index++
line_count = 0
}
{
if (line_count == 0) {
new_file()
}
print >> file
line_count++
if (line_count == lines) {
close(file)
print file " created"
line_count = 0
}
}
' file_index=1
Loading