Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions cwl-conformance-test.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
echo off

rem Version of the standard to test against
rem Current options: v1.0, v1.1, v1.2, and v1.3
if "%VERSION%"=="" set "VERSION=v1.2"

rem Which commit of the standard's repo to use
rem Defaults to the last commit of the main branch
if "%COMMIT%"=="" set "COMMIT=main"

rem Comma-separated list of test names that should be excluded from execution
rem Defaults to "docker_entrypoint, inplace_update_on_file_content"
if "%EXCLUDE%"=="" set "EXCLUDE=docker_entrypoint,modify_file_content"

rem Name of the CWLDockerTranslator plugin to use for test execution
rem This parameter allows to test automatic CWL requirements translators
if "%DOCKER%"=="" set "DOCKER=docker"

rem Additional arguments for the pytest command
rem Defaults to none
rem set "PYTEST_EXTRA="

rem The directory where this script resides
set "SCRIPT_DIRECTORY=%~dp0"
set "SCRIPT_DIRECTORY=%SCRIPT_DIRECTORY:~0,-1%"

rem Download archive from GitHub
if "%VERSION%"=="v1.0" (
set "REPO=common-workflow-language"
) else (
set "REPO=cwl-%VERSION%"
)

if not exist "%REPO%-%COMMIT%" (
if not exist "%COMMIT%.tar.gz" (
echo Downloading %REPO% @ %COMMIT%...
pwsh -Command "Invoke-WebRequest -Uri https://github.com/common-workflow-language/%REPO%/archive/%COMMIT%.tar.gz -OutFile %COMMIT%.tar.gz"
)
tar -xzf "%COMMIT%.tar.gz"
)

rem Setup environment
call :venv cwl-conformance-venv
python -m pip install -U setuptools wheel pip
python -m pip install -r "%SCRIPT_DIRECTORY%\requirements.txt"
python -m pip install -r "%SCRIPT_DIRECTORY%\test-requirements.txt"
if "%VERSION%"=="v1.3" (
python -m pip uninstall -y cwl-utils
python -m pip install git+https://github.com/common-workflow-language/cwl-utils.git@refs/pull/370/head
)

rem Set conformance test filename
if "%VERSION%"=="v1.0" (
set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\%VERSION%\conformance_test_v1.0.yaml"
) else (
set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\conformance_tests.yaml"
)
move "%CONFORMANCE_TEST%" "%CONFORMANCE_TEST:.yaml=.cwltest.yaml%"
set "CONFORMANCE_TEST=%CONFORMANCE_TEST:.yaml=.cwltest.yaml%"

rem Build command
set "TEST_COMMAND=python -m pytest "%CONFORMANCE_TEST%" -n auto -rs"
if not "%EXCLUDE%"=="" (
set "TEST_COMMAND=%TEST_COMMAND% --cwl-exclude %EXCLUDE%"
)
set "TEST_COMMAND=%TEST_COMMAND% --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= %PYTEST_EXTRA%"

rem Cleanup coverage
if exist "%SCRIPT_DIRECTORY%\.coverage" del "%SCRIPT_DIRECTORY%\.coverage"
if exist "%SCRIPT_DIRECTORY%\coverage.xml" del "%SCRIPT_DIRECTORY%\coverage.xml"
if exist "%SCRIPT_DIRECTORY%\junit.xml" del "%SCRIPT_DIRECTORY%\junit.xml"

rem Run test
copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\conftest.py" "%~dpn1"
copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\streamflow-%DOCKER%.yml" "%~dpn1\streamflow.yml"
cmd /c "%TEST_COMMAND%"
set "RETURN_CODE=%ERRORLEVEL%"

rem Coverage report
if "%RETURN_CODE%"=="0" (
coverage report
coverage xml
)

rem Cleanup
rd /s /q "%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%"
rd /s /q "%SCRIPT_DIRECTORY%\cwl-conformance-venv"
del "%COMMIT%.tar.gz" 2>nul

rem Exit
exit /b %RETURN_CODE%
goto :eof

:venv
if not exist "%~1" (
where virtualenv >nul 2>&1 && (
virtualenv -p python "%~1"
) || (
python -m venv "%~1"
)
)
call "%~1\Scripts\activate.bat"
goto :eof
4 changes: 2 additions & 2 deletions streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ def dict_product(**kwargs) -> MutableMapping[Any, Any]:
yield dict(zip(keys, list(instance), strict=True))


def encode_command(command: str, shell: str = "sh") -> str:
return f"echo {base64.b64encode(command.encode('utf-8')).decode('utf-8')} | base64 -d | {shell}"
def encode_command(command: str) -> str:
return f"echo {base64.b64encode(command.encode('utf-8')).decode('utf-8')} | base64 -d | sh"


async def eval_processors(unfinished: Iterable[asyncio.Task], name: str) -> Token:
Expand Down
63 changes: 37 additions & 26 deletions streamflow/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,31 @@ def invalidate_location(self, location: ExecutionLocation, path: str) -> None:
self.invalidate_location(data_loc.location, data_loc.path)

def put(
self, path: str, data_location: DataLocation, recursive: bool = False
self, path: StreamFlowPath, data_location: DataLocation, recursive: bool = False
) -> DataLocation:
path = PurePosixPath(Path(path).as_posix())
path_processor = get_path_processor(
self.context.deployment_manager.get_connector(data_location.deployment)
)
node = self._filesystem
nodes = {}
# Create or navigate hierarchy
for i, token in enumerate(path.parts):
node = node.children.setdefault(token, _RemotePathNode())
if recursive:
nodes[path_processor.join(*path.parts[: i + 1])] = node
nodes[path.parents[len(path.parts) - (i + 2)]] = node
if not recursive:
nodes[str(path)] = node
nodes[path] = node
# Process hierarchy bottom-up to add parent locations
relpath = data_location.relpath
for node_path in reversed(nodes):
node = nodes[node_path]
if node_path == str(path):
if node_path == path:
location = data_location
else:
location = DataLocation(
location=data_location.location,
path=node_path,
path=str(node_path),
relpath=(
relpath
if relpath and node_path.endswith(relpath)
else path_processor.basename(node_path)
if relpath and str(node_path).endswith(relpath)
else node_path.stem
),
data_type=DataType.PRIMARY,
available=True,
Expand All @@ -190,7 +186,11 @@ def put(
break
else:
node.locations[location.deployment][location.name].append(location)
relpath = path_processor.dirname(relpath)
relpath = str(
StreamFlowPath(
relpath, context=self.context, location=data_location.location
).parent
)
# Return location
return data_location

Expand Down Expand Up @@ -277,25 +277,24 @@ def register_path(
available=False,
)
]
self.path_mapper.put(path=path, data_location=data_locations[0], recursive=True)
sf_path = StreamFlowPath(path, context=self.context, location=location)
self.path_mapper.put(
path=sf_path, data_location=data_locations[0], recursive=True
)
self.context.checkpoint_manager.register(data_locations[0])
# Process wrapped locations if any
while (
path := get_inner_path(
path=StreamFlowPath(path, context=self.context, location=location)
)
) is not None:
while (sf_path := get_inner_path(path=sf_path)) is not None:
data_locations.append(
DataLocation(
location=location.wraps,
path=str(path),
relpath=relpath or str(path),
path=str(sf_path),
relpath=relpath or str(sf_path),
data_type=data_type,
available=False,
)
)
self.path_mapper.put(
path=str(path), data_location=data_locations[-1], recursive=True
path=sf_path, data_location=data_locations[-1], recursive=True
)
self.register_relation(
src_location=data_locations[0], dst_location=data_locations[-1]
Expand All @@ -309,8 +308,22 @@ def register_relation(
self, src_location: DataLocation, dst_location: DataLocation
) -> None:
for data_location in self.path_mapper.get(path=src_location.path):
self.path_mapper.put(data_location.path, dst_location)
self.path_mapper.put(dst_location.path, data_location)
self.path_mapper.put(
StreamFlowPath(
path=data_location.path,
context=self.context,
location=data_location.location,
),
dst_location,
)
self.path_mapper.put(
StreamFlowPath(
path=dst_location.path,
context=self.context,
location=dst_location.location,
),
data_location,
)

async def transfer_data(
self,
Expand Down Expand Up @@ -402,9 +415,7 @@ async def transfer_data(
relpath=src_data_location.relpath,
data_type=DataType.PRIMARY,
)
self.path_mapper.put(
path=str(loc_dst_path), data_location=dst_data_location
)
self.path_mapper.put(path=loc_dst_path, data_location=dst_data_location)
data_locations.append(dst_data_location)
# If the destination is not writable , map the new `DataLocation` object to the source locations
if not writable:
Expand Down
11 changes: 9 additions & 2 deletions streamflow/deployment/aiotarstream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import annotations

import copy
import grp
import os
import pwd
import re
import shutil
import stat
Expand All @@ -20,6 +18,15 @@
from streamflow.core.data import StreamWrapper
from streamflow.deployment.stream import BaseStreamWrapper

try:
import grp
except ImportError:
grp = None
try:
import pwd
except ImportError:
pwd = None


async def copyfileobj(src, dst, length=None, bufsize=None):
bufsize = bufsize or 16 * 1024
Expand Down
2 changes: 1 addition & 1 deletion streamflow/deployment/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async def run(
)
return await utils.run_in_subprocess(
location=location,
command=[utils.encode_command(command_str, "sh")],
command=[utils.encode_command(command_str)],
capture_output=capture_output,
timeout=timeout,
)
Expand Down
11 changes: 8 additions & 3 deletions streamflow/deployment/connector/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import posixpath
import sys
from abc import ABC, abstractmethod
from collections.abc import MutableMapping, MutableSequence
from importlib.resources import files
Expand Down Expand Up @@ -568,7 +569,7 @@ async def get_stream_reader(
) -> AsyncContextManager[StreamWrapper]:
return await self.connector.get_stream_reader(
command=self._get_run_command(
command=utils.encode_command(" ".join(command), "sh"),
command=utils.encode_command(" ".join(command)),
location=location,
interactive=False,
),
Expand Down Expand Up @@ -623,7 +624,7 @@ async def run(
return await self.connector.run(
location=get_inner_location(location),
command=self._get_run_command(
command=utils.encode_command(command, "sh"),
command=utils.encode_command(command),
location=location,
),
capture_output=capture_output,
Expand Down Expand Up @@ -718,7 +719,11 @@ async def _populate_instance(self, name: str):
)
# Check if the container user is the current host user
if self._wraps_local():
host_user = os.getuid()
if sys.platform == "win32":
# Windows does not support the `getuid` function
host_user = -1
else:
host_user = os.getuid()
else:
stdout, returncode = await self.connector.run(
location=self._get_inner_location(),
Expand Down
Loading
Loading