Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flake8

mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192
types-paramiko
types-pkg-resources
types-setuptools
types-PyYAML
types-pycurl
types-requests
Expand Down
57 changes: 38 additions & 19 deletions docker/coexecutor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,60 @@
FROM conda/miniconda3
FROM python:3.12-bookworm
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmchilton commented:

I don't think it is fair at all to say "Miniconda" is not required in the base image as the commit message suggests. I get that it isn't the modality that you wish to run it in but it is documented in https://pulsar.readthedocs.io/en/latest/containers.html#co-execution as an option and it is an option that makes a lot of sense to me. Is the slurm stuff required for your use case?

Copy link
Contributor Author

@kysrpex kysrpex Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is fair at all to say "Miniconda" is not required in the base image as the commit message suggests.

I am assuming "Miniconda" is not required in the base image because I saw line 60 (the last line): RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda. It looks like it installs Miniconda, thus it should make it to the Docker image without needing to put it in the base image. But I am not familiar with Pulsar so I am not sure if line 60 actually has an effect equivalent to including it in the base image.

Copy link
Contributor Author

@kysrpex kysrpex Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the SLURM stuff, it is not needed for this use case but it was already in the old Dockerfile (it is needed for other use cases).

    # Install packages
    && apt-get update \
    && apt-get install -y --no-install-recommends gcc \
        libcurl4-openssl-dev \
        cvmfs cvmfs-config-default \
        slurm-llnl slurm-drmaa-dev \
        bzip2 \

Because I am changing the base image to Debian Bookworm, the changes are required not to break the build with the new base image (that's why the whole commit has the title "Change base image in coexecutor Dockerfile"). slurm-drmaa1 is not available on the Debian repos anymore for Bookworm (it has to be installed from the Galaxy Depot). The latest version of slurm-drmaa1 available in the Galaxy depot (1.1.4) requires a library libslurm36 from Debian Bullseye. There exists a new version 1.1.5 of libslurm-drmaa1 on GitHub that maybe works on Bookworm without libraries from Bullseye, but @natefoo has not published a Debian package yet.


ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
ENV DEBIAN_FRONTEND noninteractive
ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local

# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases)
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \
&& echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list

# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm)
RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \
cat <<EOF > /etc/apt/preferences.d/bullseye.pref
Package: *
Pin: release n=bullseye
Pin-Priority: -1

Package: libslurm36, slurm
Pin: release n=bullseye
Pin-Priority: 100
EOF

# set up CVMFS repository
RUN apt-get update \
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb

# wget, gcc, pip - to build and install Pulsar.
# bzip2 for Miniconda.
# TODO: pycurl stuff...
RUN apt-get update \
&& apt-get install -y --no-install-recommends apt-transport-https \
RUN apt-get update && apt-get install -y --no-install-recommends \
# Install CVMFS client
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb \
&& rm -f cvmfs-release-latest_all.deb \
cvmfs cvmfs-config-default \
# Install packages
&& apt-get update \
&& apt-get install -y --no-install-recommends gcc \
libcurl4-openssl-dev \
cvmfs cvmfs-config-default \
slurm-llnl slurm-drmaa-dev \
bzip2 \
gcc libcurl4-openssl-dev \
munge libmunge-dev slurm slurm-drmaa-dev \
bzip2 \
# Install Pulsar Python requirements
&& pip install --no-cache-dir -U pip \
&& pip install --no-cache-dir drmaa wheel kombu pykube pycurl \
webob psutil PasteDeploy pyyaml paramiko \
# Remove build deps and cleanup
&& apt-get -y remove gcc wget lsb-release \
&& apt-get -y autoremove \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \
&& /usr/sbin/create-munge-key
&& apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

ADD pulsar_app-*-py2.py3-none-any.whl /

ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl
SHELL ["/bin/bash", "-c"]

RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade
RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography
RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --upgrade 'importlib-metadata<5.0'
RUN _pulsar-configure-galaxy-cvmfs
RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda
68 changes: 60 additions & 8 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
List,
Optional,
Type,
)
from urllib.parse import urlencode
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -265,6 +273,7 @@ def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
action.file_type = file_type
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
Expand Down Expand Up @@ -300,10 +309,12 @@ class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
file_type: Optional[str] = None

def __init__(self, source, file_lister=None):
def __init__(self, source, file_lister=None, file_type=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
self.file_type = file_type

@property
def path(self):
Expand Down Expand Up @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -390,8 +404,8 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE

def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.source_directory = source_directory
self.destination_directory = destination_directory

Expand Down Expand Up @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction):
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister, file_type)
self.url = url
self._from_path = None
self._to_path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

def write_from_path(self, pulsar_path: str):
self._from_path = pulsar_path

def finalize(self):
if self._to_path:
return {"url": self.url, "to_path": self._to_path}
else:
return {"url": self.url, "from_path": self._from_path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +714,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +895,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
9 changes: 5 additions & 4 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface):
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand Down Expand Up @@ -405,7 +405,7 @@ def _build_status_request_message(self):
class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -491,7 +491,8 @@ def launch(
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
"""
"""
Expand Down
6 changes: 6 additions & 0 deletions pulsar/client/staging/down.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def collect(self):
self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
self.__collect_job_directory_files()
# Give actions that require a final action, like those that write a manifest, to write out their content
self.__finalize_action_mapper()
# finalize collection here for executors that need this ?
return self.exception_tracker.collection_failure_exceptions

def __collect_working_directory_outputs(self):
Expand Down Expand Up @@ -134,6 +137,9 @@ def __collect_job_directory_files(self):
'output_jobdir',
)

def __finalize_action_mapper(self):
self.action_mapper.finalize()

def __realized_dynamic_file_source_references(self):
references = {"filename": [], "extra_files": []}

Expand Down
13 changes: 13 additions & 0 deletions pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ def submit_job(client, client_job_description, job_config=None):
# it needs to be in the response to Pulsar even Pulsar is inititing staging actions
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources
launch_kwds["token_endpoint"] = client.token_endpoint

# populate `to_path`
staging_manifest = []
for action in file_stager.action_mapper.actions:
if action.file_type not in ("output", "output_workdir"):
name = basename(action.path)
path = file_stager.job_directory.calculate_path(name, action.file_type)
action.write_to_path(path)
staging_manifest.append(action.finalize())

if staging_manifest:
launch_kwds["staging_manifest"] = staging_manifest

# for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external
# id from the submission process (e.g. to TES).
launch_response = client.launch(**launch_kwds)
Expand Down
5 changes: 3 additions & 2 deletions pulsar/managers/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,16 @@ class JobDirectory(RemoteJobDirectory):
def __init__(
self,
staging_directory,
job_id,
job_id=None,
lock_manager=None,
directory_maker=None
):
super().__init__(staging_directory, remote_id=job_id, remote_sep=sep)
self._directory_maker = directory_maker or DirectoryMaker()
self.lock_manager = lock_manager
# Assert this job id isn't hacking path somehow.
assert job_id == basename(job_id)
if job_id:
assert job_id == basename(job_id)

def _job_file(self, name):
return os.path.join(self.job_directory, name)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/managers/staging/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled):
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
else:
staging_config = None
collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False


def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
collected = True
if "action_mapper" in staging_config:
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
Expand All @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel
if collection_failure_exceptions:
log.warn("Failures collecting results %s" % collection_failure_exceptions)
collected = False
return collected
return file_action_mapper, collected


def realized_dynamic_file_sources(job_directory):
Expand Down
Loading