Skip to content

Commit 4b1e868

Browse files
Merge pull request #36 from InformaticsMatters/fanout-jobs
Support for initial (basic) fanout
2 parents bca4f9c + 7f4b0c6 commit 4b1e868

31 files changed

+1066
-1743
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ The project's written in Python and uses `Poetry`_ for dependency and package
3838
management. We also use `pre-commit`_ to manage our pre-commit hooks, which
3939
rely on `black`_, `mypy`_, `pylint`_, amongst others.
4040

41-
From within a VS Code `devcontainer`_] environment (recommended)::
41+
From within a VS Code `devcontainer`_ environment (recommended)::
4242

4343
poetry install --with dev --sync
4444
pre-commit install -t commit-msg -t pre-commit

poetry.lock

Lines changed: 306 additions & 237 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ packages = [
1414
[tool.poetry.dependencies]
1515
python = "^3.12"
1616
im-protobuf = "^8.2.0"
17-
im-data-manager-job-decoder = "^2.1.0"
17+
im-data-manager-job-decoder = "^2.5.0"
1818
jsonschema = "^4.21.1"
1919
pyyaml = ">= 5.3.1, < 7.0"
2020

tests/instance_launcher.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,32 @@ def __init__(
6868
elif os.path.isdir(file_path):
6969
shutil.rmtree(file_path)
7070

71-
def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
71+
def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
7272
assert launch_parameters
7373
assert launch_parameters.project_id == TEST_PROJECT_ID
7474
assert launch_parameters.specification
7575
assert isinstance(launch_parameters.specification, dict)
7676

7777
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)
7878

79-
# Create an Instance record (and dummy Task ID)
80-
response = self._api_adapter.create_instance(
81-
running_workflow_step_id=launch_parameters.running_workflow_step_id
79+
# Create a running workflow step
80+
assert launch_parameters.running_workflow_id
81+
assert launch_parameters.step_name
82+
response, _ = self._api_adapter.create_running_workflow_step(
83+
running_workflow_id=launch_parameters.running_workflow_id,
84+
step=launch_parameters.step_name,
85+
replica=launch_parameters.step_replication_number,
8286
)
87+
assert "id" in response
88+
rwfs_id: str = response["id"]
89+
# And add the variables we've been provided with
90+
if launch_parameters.variables:
91+
_ = self._api_adapter.set_running_workflow_step_variables(
92+
running_workflow_step_id=rwfs_id, variables=launch_parameters.variables
93+
)
94+
95+
# Create an Instance record (and dummy Task ID)
96+
response = self._api_adapter.create_instance(running_workflow_step_id=rwfs_id)
8397
instance_id = response["id"]
8498
task_id = "task-00000000-0000-0000-0000-000000000001"
8599

@@ -96,8 +110,8 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
96110
# The command may not need any, but we do the decoding anyway.
97111
decoded_command, status = job_decoder.decode(
98112
job["command"],
99-
launch_parameters.specification_variables,
100-
launch_parameters.running_workflow_step_id,
113+
launch_parameters.variables,
114+
rwfs_id,
101115
TextEncoding.JINJA2_3_0,
102116
)
103117
print(f"Decoded command: {decoded_command}")
@@ -129,6 +143,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
129143
self._msg_dispatcher.send(pod_message)
130144

131145
return LaunchResult(
146+
running_workflow_step_id=rwfs_id,
132147
instance_id=instance_id,
133148
task_id=task_id,
134149
command=" ".join(subprocess_cmd),

tests/job-definitions/job-definitions.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,14 @@ jobs:
132132
concatenate:
133133
command: >-
134134
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
135+
136+
splitsmiles:
137+
command: >-
138+
copyf.py {{ inputFile }}
139+
# Simulate multiple output files...
140+
variables:
141+
outputs:
142+
properties:
143+
outputBase:
144+
creates: '{{ outputBase }}_*.smi'
145+
type: files

tests/jobs/copyf.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import shutil
2+
import sys
3+
from pathlib import Path
4+
5+
6+
def main():
7+
print("copyf job runnint")
8+
if len(sys.argv) != 2:
9+
print("Usage: python copy_file.py <filename>")
10+
sys.exit(1)
11+
12+
original_path = Path(sys.argv[1])
13+
14+
if not original_path.exists() or not original_path.is_file():
15+
print(f"Error: '{original_path}' does not exist or is not a file.")
16+
sys.exit(1)
17+
18+
# Create a new filename like 'example_copy.txt'
19+
new_name = original_path.absolute().parent.joinpath("chunk_1.smi")
20+
new_path = original_path.with_name(new_name.name)
21+
shutil.copyfile(original_path, new_path)
22+
23+
new_name = original_path.absolute().parent.joinpath("chunk_2.smi")
24+
new_path = original_path.with_name(new_name.name)
25+
26+
shutil.copyfile(original_path, new_path)
27+
28+
29+
if __name__ == "__main__":
30+
main()

tests/jobs/copyf.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#! /bin/bash
2+
3+
cp "$1" chunk_1.smi
4+
cp "$1" chunk_2.smi

tests/jobs/split-smi.sh

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
if [[ $# -lt 3 || $# -gt 4 ]]; then
5+
echo "Usage: $0 <input_file(.smi or .smi.gz)> <lines_per_file> <output_basename> [has_header: yes]"
6+
exit 1
7+
fi
8+
9+
input_file="$1"
10+
lines_per_file="$2"
11+
base_name="$3"
12+
has_header="${4:-no}"
13+
14+
# Determine how to read the file (plain text or gzipped)
15+
if [[ "$input_file" == *.gz ]]; then
16+
reader="zcat"
17+
else
18+
reader="cat"
19+
fi
20+
21+
if ! [[ -f "$input_file" ]]; then
22+
echo "Error: File '$input_file' not found"
23+
exit 1
24+
fi
25+
26+
# Extract header if present
27+
if [[ "$has_header" == "yes" ]]; then
28+
header="$($reader "$input_file" | head -n1)"
29+
data_start=2
30+
else
31+
header=""
32+
data_start=1
33+
fi
34+
35+
# Count number of data lines (excluding header if present)
36+
data_lines="$($reader "$input_file" | tail -n +"$data_start" | wc -l)"
37+
if [[ "$data_lines" -eq 0 ]]; then
38+
echo "No data lines to process."
39+
exit 0
40+
fi
41+
42+
# Calculate number of output files and required zero padding
43+
num_files=$(( (data_lines + lines_per_file - 1) / lines_per_file ))
44+
pad_width=0
45+
if [[ "$num_files" -gt 1 ]]; then
46+
pad_width=${#num_files}
47+
fi
48+
49+
# Split logic
50+
$reader "$input_file" | tail -n +"$data_start" | awk -v header="$header" -v lines="$lines_per_file" -v base="$base_name" -v pad="$pad_width" '
51+
function new_file() {
52+
suffix = (pad > 0) ? sprintf("%0*d", pad, file_index) : file_index
53+
file = base "_" suffix ".smi"
54+
if (header != "") {
55+
print header > file
56+
}
57+
file_index++
58+
line_count = 0
59+
}
60+
{
61+
if (line_count == 0) {
62+
new_file()
63+
}
64+
print >> file
65+
line_count++
66+
if (line_count == lines) {
67+
close(file)
68+
print file " created"
69+
line_count = 0
70+
}
71+
}
72+
' file_index=1

0 commit comments

Comments
 (0)