Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 113 additions & 4 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,14 @@ class RemoteWorker(WorkerBase):
default=False,
description="Whether the authentication to the host should be interactive",
)

def get_host(self) -> BaseHost:
"""
Return the RemoteHost.
Return the RemoteHost for this worker.

Returns
-------
The RemoteHost.
RemoteHost
The RemoteHost instance for this worker.
"""
connect_kwargs = dict(self.connect_kwargs) if self.connect_kwargs else {}
if self.password:
Expand All @@ -455,6 +455,7 @@ def get_host(self) -> BaseHost:
connect_kwargs["key_filename"] = self.key_filename
if self.passphrase:
connect_kwargs["passphrase"] = self.passphrase

return RemoteHost(
host=self.host,
user=self.user,
Expand Down Expand Up @@ -489,7 +490,115 @@ def cli_info(self) -> dict:
)


WorkerConfig = Annotated[LocalWorker | RemoteWorker, Field(discriminator="type")]
class SeparatedTransferWorker(RemoteWorker):
"""
Worker with separate hosts for commands and file transfers.

This is useful for HPC systems where login nodes have SFTP disabled but a
dedicated data transfer node is available (e.g., LRC at LBNL).

Command execution goes through the main host connection, while file operations
(put, get, mkdir, etc.) go through the transfer host.
"""

type: Literal["separated_transfer"] = Field(
"separated_transfer",
description="The discriminator field to determine the worker type",
)
transfer: ConnectionData = Field(
description="Connection data for the file transfer host. Use this when "
"the main host has SFTP disabled but a dedicated transfer node is available. "
"File operations (put/get) will use this connection while commands use the "
"main host.",
)

def get_host(self) -> BaseHost:
"""
Return a SeparatedTransferHost for this worker.

Creates a host that delegates commands to the main connection and file
operations to the transfer connection.

Returns
-------
SeparatedTransferHost
The host instance with separate command and transfer connections.
"""
from jobflow_remote.remote.host import SeparatedTransferHost

connect_kwargs = dict(self.connect_kwargs) if self.connect_kwargs else {}
if self.password:
connect_kwargs["password"] = self.password
if self.key_filename:
connect_kwargs["key_filename"] = self.key_filename
if self.passphrase:
connect_kwargs["passphrase"] = self.passphrase

command_host = RemoteHost(
host=self.host,
user=self.user,
port=self.port,
gateway=self.gateway,
forward_agent=self.forward_agent,
connect_timeout=self.connect_timeout,
connect_kwargs=connect_kwargs,
inline_ssh_env=self.inline_ssh_env,
timeout_execute=self.timeout_execute,
keepalive=self.keepalive,
shell_cmd=self.shell_cmd,
login_shell=self.login_shell,
interactive_login=self.interactive_login,
sanitize=self.sanitize_command,
)

transfer_connect_kwargs = self.transfer.get_connect_kwargs()
if not transfer_connect_kwargs:
# Fall back to main connection credentials if not specified
transfer_connect_kwargs = connect_kwargs

transfer_host = RemoteHost(
host=self.transfer.host,
user=self.transfer.user or self.user,
port=self.transfer.port or self.port,
gateway=self.transfer.gateway,
forward_agent=self.forward_agent,
connect_timeout=self.connect_timeout,
connect_kwargs=transfer_connect_kwargs,
inline_ssh_env=self.inline_ssh_env,
timeout_execute=self.timeout_execute,
keepalive=self.keepalive,
shell_cmd=self.shell_cmd,
login_shell=self.login_shell,
interactive_login=self.interactive_login,
sanitize=self.sanitize_command,
)

return SeparatedTransferHost(
command_host=command_host,
transfer_host=transfer_host,
)

@property
def cli_info(self) -> dict:
"""
Short information about the worker to be displayed in the CLI.

Returns
-------
dict
A dictionary with the Worker short information.
"""
return dict(
host=self.host,
transfer_host=self.transfer.host,
scheduler_type=self.scheduler_type,
work_dir=self.work_dir,
)


WorkerConfig = Annotated[
LocalWorker | RemoteWorker | SeparatedTransferWorker, Field(discriminator="type")
]


class ExecutionConfig(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion src/jobflow_remote/remote/host/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from jobflow_remote.remote.host.base import BaseHost
from jobflow_remote.remote.host.local import LocalHost
from jobflow_remote.remote.host.remote import RemoteHost
from jobflow_remote.remote.host.separated import SeparatedTransferHost

__all__ = ("BaseHost", "LocalHost", "RemoteHost")
__all__ = ("BaseHost", "LocalHost", "RemoteHost", "SeparatedTransferHost")
Loading