Skip to content

Commit 44f2947

Browse files
Support running StreamFlow on Windows
This commit enables support for Windows systems, allowing StreamFlow to be installed and executed on a Windows machine. Note that Linux remains the only supported OS for target execution locations.
1 parent 1f26965 commit 44f2947

File tree

10 files changed

+278
-105
lines changed

10 files changed

+278
-105
lines changed

cwl-conformance-test.bat

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
echo off
2+
3+
rem Version of the standard to test against
4+
rem Current options: v1.0, v1.1, v1.2, and v1.3
5+
if "%VERSION%"=="" set "VERSION=v1.2"
6+
7+
rem Which commit of the standard's repo to use
8+
rem Defaults to the last commit of the main branch
9+
if "%COMMIT%"=="" set "COMMIT=main"
10+
11+
rem Comma-separated list of test names that should be excluded from execution
12+
rem Defaults to "docker_entrypoint, inplace_update_on_file_content"
13+
if "%EXCLUDE%"=="" set "EXCLUDE=docker_entrypoint,modify_file_content"
14+
15+
rem Name of the CWLDockerTranslator plugin to use for test execution
16+
rem This parameter allows to test automatic CWL requirements translators
17+
if "%DOCKER%"=="" set "DOCKER=docker"
18+
19+
rem Additional arguments for the pytest command
20+
rem Defaults to none
21+
rem set "PYTEST_EXTRA="
22+
23+
rem The directory where this script resides
24+
set "SCRIPT_DIRECTORY=%~dp0"
25+
set "SCRIPT_DIRECTORY=%SCRIPT_DIRECTORY:~0,-1%"
26+
27+
rem Download archive from GitHub
28+
if "%VERSION%"=="v1.0" (
29+
set "REPO=common-workflow-language"
30+
) else (
31+
set "REPO=cwl-%VERSION%"
32+
)
33+
34+
if not exist "%REPO%-%COMMIT%" (
35+
if not exist "%COMMIT%.tar.gz" (
36+
echo Downloading %REPO% @ %COMMIT%...
37+
pwsh -Command "Invoke-WebRequest -Uri https://github.com/common-workflow-language/%REPO%/archive/%COMMIT%.tar.gz -OutFile %COMMIT%.tar.gz"
38+
)
39+
tar -xzf "%COMMIT%.tar.gz"
40+
)
41+
42+
rem Setup environment
43+
call :venv cwl-conformance-venv
44+
python -m pip install -U setuptools wheel pip
45+
python -m pip install -r "%SCRIPT_DIRECTORY%\requirements.txt"
46+
python -m pip install -r "%SCRIPT_DIRECTORY%\test-requirements.txt"
47+
if "%VERSION%"=="v1.3" (
48+
python -m pip uninstall -y cwl-utils
49+
python -m pip install git+https://github.com/common-workflow-language/cwl-utils.git@refs/pull/370/head
50+
)
51+
52+
rem Set conformance test filename
53+
if "%VERSION%"=="v1.0" (
54+
set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\%VERSION%\conformance_test_v1.0.yaml"
55+
) else (
56+
set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\conformance_tests.yaml"
57+
)
58+
move "%CONFORMANCE_TEST%" "%CONFORMANCE_TEST:.yaml=.cwltest.yaml%"
59+
set "CONFORMANCE_TEST=%CONFORMANCE_TEST:.yaml=.cwltest.yaml%"
60+
61+
rem Build command
62+
set "TEST_COMMAND=python -m pytest "%CONFORMANCE_TEST%" -n auto -rs"
63+
if not "%EXCLUDE%"=="" (
64+
set "TEST_COMMAND=%TEST_COMMAND% --cwl-exclude %EXCLUDE%"
65+
)
66+
set "TEST_COMMAND=%TEST_COMMAND% --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= %PYTEST_EXTRA%"
67+
68+
rem Cleanup coverage
69+
if exist "%SCRIPT_DIRECTORY%\.coverage" del "%SCRIPT_DIRECTORY%\.coverage"
70+
if exist "%SCRIPT_DIRECTORY%\coverage.xml" del "%SCRIPT_DIRECTORY%\coverage.xml"
71+
if exist "%SCRIPT_DIRECTORY%\junit.xml" del "%SCRIPT_DIRECTORY%\junit.xml"
72+
73+
rem Run test
74+
copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\conftest.py" "%~dpn1"
75+
copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\streamflow-%DOCKER%.yml" "%~dpn1\streamflow.yml"
76+
cmd /c "%TEST_COMMAND%"
77+
set "RETURN_CODE=%ERRORLEVEL%"
78+
79+
rem Coverage report
80+
if "%RETURN_CODE%"=="0" (
81+
coverage report
82+
coverage xml
83+
)
84+
85+
rem Cleanup
86+
rd /s /q "%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%"
87+
rd /s /q "%SCRIPT_DIRECTORY%\cwl-conformance-venv"
88+
del "%COMMIT%.tar.gz" 2>nul
89+
90+
rem Exit
91+
exit /b %RETURN_CODE%
92+
goto :eof
93+
94+
:venv
95+
if not exist "%~1" (
96+
where virtualenv >nul 2>&1 && (
97+
virtualenv -p python "%~1"
98+
) || (
99+
python -m venv "%~1"
100+
)
101+
)
102+
call "%~1\Scripts\activate.bat"
103+
goto :eof

streamflow/core/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ def dict_product(**kwargs) -> MutableMapping[Any, Any]:
143143
yield dict(zip(keys, list(instance), strict=True))
144144

145145

146-
def encode_command(command: str, shell: str = "sh") -> str:
147-
return f"echo {base64.b64encode(command.encode('utf-8')).decode('utf-8')} | base64 -d | {shell}"
146+
def encode_command(command: str) -> str:
147+
return f"echo {base64.b64encode(command.encode('utf-8')).decode('utf-8')} | base64 -d | sh"
148148

149149

150150
async def eval_processors(unfinished: Iterable[asyncio.Task], name: str) -> Token:

streamflow/data/manager.py

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -147,35 +147,31 @@ def invalidate_location(self, location: ExecutionLocation, path: str) -> None:
147147
self.invalidate_location(data_loc.location, data_loc.path)
148148

149149
def put(
150-
self, path: str, data_location: DataLocation, recursive: bool = False
150+
self, path: StreamFlowPath, data_location: DataLocation, recursive: bool = False
151151
) -> DataLocation:
152-
path = PurePosixPath(Path(path).as_posix())
153-
path_processor = get_path_processor(
154-
self.context.deployment_manager.get_connector(data_location.deployment)
155-
)
156152
node = self._filesystem
157153
nodes = {}
158154
# Create or navigate hierarchy
159155
for i, token in enumerate(path.parts):
160156
node = node.children.setdefault(token, _RemotePathNode())
161157
if recursive:
162-
nodes[path_processor.join(*path.parts[: i + 1])] = node
158+
nodes[path.parents[len(path.parts) - i + 2]] = node
163159
if not recursive:
164-
nodes[str(path)] = node
160+
nodes[path] = node
165161
# Process hierarchy bottom-up to add parent locations
166162
relpath = data_location.relpath
167163
for node_path in reversed(nodes):
168164
node = nodes[node_path]
169-
if node_path == str(path):
165+
if node_path == path:
170166
location = data_location
171167
else:
172168
location = DataLocation(
173169
location=data_location.location,
174-
path=node_path,
170+
path=str(node_path),
175171
relpath=(
176172
relpath
177-
if relpath and node_path.endswith(relpath)
178-
else path_processor.basename(node_path)
173+
if relpath and str(node_path).endswith(relpath)
174+
else node_path.stem
179175
),
180176
data_type=DataType.PRIMARY,
181177
available=True,
@@ -190,7 +186,11 @@ def put(
190186
break
191187
else:
192188
node.locations[location.deployment][location.name].append(location)
193-
relpath = path_processor.dirname(relpath)
189+
relpath = str(
190+
StreamFlowPath(
191+
relpath, context=self.context, location=data_location.location
192+
).parent
193+
)
194194
# Return location
195195
return data_location
196196

@@ -277,25 +277,24 @@ def register_path(
277277
available=False,
278278
)
279279
]
280-
self.path_mapper.put(path=path, data_location=data_locations[0], recursive=True)
280+
sf_path = StreamFlowPath(path, context=self.context, location=location)
281+
self.path_mapper.put(
282+
path=sf_path, data_location=data_locations[0], recursive=True
283+
)
281284
self.context.checkpoint_manager.register(data_locations[0])
282285
# Process wrapped locations if any
283-
while (
284-
path := get_inner_path(
285-
path=StreamFlowPath(path, context=self.context, location=location)
286-
)
287-
) is not None:
286+
while (sf_path := get_inner_path(path=sf_path)) is not None:
288287
data_locations.append(
289288
DataLocation(
290289
location=location.wraps,
291-
path=str(path),
292-
relpath=relpath or str(path),
290+
path=str(sf_path),
291+
relpath=relpath or str(sf_path),
293292
data_type=data_type,
294293
available=False,
295294
)
296295
)
297296
self.path_mapper.put(
298-
path=str(path), data_location=data_locations[-1], recursive=True
297+
path=sf_path, data_location=data_locations[-1], recursive=True
299298
)
300299
self.register_relation(
301300
src_location=data_locations[0], dst_location=data_locations[-1]
@@ -309,8 +308,22 @@ def register_relation(
309308
self, src_location: DataLocation, dst_location: DataLocation
310309
) -> None:
311310
for data_location in self.path_mapper.get(path=src_location.path):
312-
self.path_mapper.put(data_location.path, dst_location)
313-
self.path_mapper.put(dst_location.path, data_location)
311+
self.path_mapper.put(
312+
StreamFlowPath(
313+
path=data_location.path,
314+
context=self.context,
315+
location=data_location.location,
316+
),
317+
dst_location,
318+
)
319+
self.path_mapper.put(
320+
StreamFlowPath(
321+
path=dst_location.path,
322+
context=self.context,
323+
location=dst_location.location,
324+
),
325+
data_location,
326+
)
314327

315328
async def transfer_data(
316329
self,
@@ -402,9 +415,7 @@ async def transfer_data(
402415
relpath=src_data_location.relpath,
403416
data_type=DataType.PRIMARY,
404417
)
405-
self.path_mapper.put(
406-
path=str(loc_dst_path), data_location=dst_data_location
407-
)
418+
self.path_mapper.put(path=loc_dst_path, data_location=dst_data_location)
408419
data_locations.append(dst_data_location)
409420
# If the destination is not writable , map the new `DataLocation` object to the source locations
410421
if not writable:

streamflow/deployment/aiotarstream.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from __future__ import annotations
22

33
import copy
4-
import grp
54
import os
6-
import pwd
75
import re
86
import shutil
97
import stat
@@ -20,6 +18,15 @@
2018
from streamflow.core.data import StreamWrapper
2119
from streamflow.deployment.stream import BaseStreamWrapper
2220

21+
try:
22+
import grp
23+
except ImportError:
24+
grp = None
25+
try:
26+
import pwd
27+
except ImportError:
28+
pwd = None
29+
2330

2431
async def copyfileobj(src, dst, length=None, bufsize=None):
2532
bufsize = bufsize or 16 * 1024

streamflow/deployment/connector/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ async def run(
388388
)
389389
return await utils.run_in_subprocess(
390390
location=location,
391-
command=[utils.encode_command(command_str, "sh")],
391+
command=[utils.encode_command(command_str)],
392392
capture_output=capture_output,
393393
timeout=timeout,
394394
)

streamflow/deployment/connector/container.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import os
88
import posixpath
9+
import sys
910
from abc import ABC, abstractmethod
1011
from collections.abc import MutableMapping, MutableSequence
1112
from importlib.resources import files
@@ -568,7 +569,7 @@ async def get_stream_reader(
568569
) -> AsyncContextManager[StreamWrapper]:
569570
return await self.connector.get_stream_reader(
570571
command=self._get_run_command(
571-
command=utils.encode_command(" ".join(command), "sh"),
572+
command=utils.encode_command(" ".join(command)),
572573
location=location,
573574
interactive=False,
574575
),
@@ -623,7 +624,7 @@ async def run(
623624
return await self.connector.run(
624625
location=get_inner_location(location),
625626
command=self._get_run_command(
626-
command=utils.encode_command(command, "sh"),
627+
command=utils.encode_command(command),
627628
location=location,
628629
),
629630
capture_output=capture_output,
@@ -718,7 +719,11 @@ async def _populate_instance(self, name: str):
718719
)
719720
# Check if the container user is the current host user
720721
if self._wraps_local():
721-
host_user = os.getuid()
722+
if sys.platform == "win32":
723+
# Windows does not support the `getuid` function
724+
host_user = -1
725+
else:
726+
host_user = os.getuid()
722727
else:
723728
stdout, returncode = await self.connector.run(
724729
location=self._get_inner_location(),

0 commit comments

Comments
 (0)