Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions htcdaskgateway/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
__version_tuple__: VERSION_TUPLE
version_tuple: VERSION_TUPLE

__version__ = version = '1.0.2.dev6+g5379e9a.d20250418'
__version_tuple__ = version_tuple = (1, 0, 2, 'dev6', 'g5379e9a.d20250418')
__version__ = version = '1.0.5.dev3+gf1d34f4'
__version_tuple__ = version_tuple = (1, 0, 5, 'dev3', 'gf1d34f4')
24 changes: 16 additions & 8 deletions htcdaskgateway/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@


class HTCGatewayCluster(GatewayCluster):
def __init__(self, container_image=None, **kwargs):
def __init__(
self, container_image=None, memory: str = "32GB", cpus: int = 4, **kwargs
):
self.scheduler_proxy_ip = kwargs.pop("", "dask.software-dev.ncsa.illinois.edu")
self.batchWorkerJobs = []
self.cluster_options = kwargs.get("cluster_options")
self.container_image = container_image
self.memory = memory
self.cpus = cpus
self.condor_bin_dir = os.environ["CONDOR_BIN_DIR"]

super().__init__(**kwargs)
Expand Down Expand Up @@ -87,18 +91,16 @@ def scale_batch_workers(self, n):
f.write(security.tls_key)

# Prepare JDL
resources = f" request_memory = {self.memory} \n request_cpus = {self.cpus}"
jdl = (
"""executable = start.sh
arguments = """
+ cluster_name
+ """ htcdask-worker_$(Cluster)_$(Process)
output = condor/htcdask-worker$(Cluster)_$(Process).out
error = condor/htcdask-worker$(Cluster)_$(Process).err
log = condor/htcdask-worker$(Cluster)_$(Process).log
request_cpus = 4
request_memory = 32GB
container_image="""
+ self.container_image
log = condor/htcdask-worker$(Cluster)_$(Process).log"""
+ resources
+ """
should_transfer_files = yes
transfer_input_files = ./dask-credentials, ./dask-worker-space , ./condor
Expand All @@ -119,11 +121,17 @@ def scale_batch_workers(self, n):
export APPTAINERENV_DASK_GATEWAY_API_URL="https://dask.software-dev.ncsa.illinois.edu/api"
export APPTAINERENV_DASK_GATEWAY_CLUSTER_NAME=$1
#export APPTAINERENV_DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"

export DASK_LOGGING__DISTRIBUTED=info
worker_space_dir=${PWD}/dask-worker-space/$2
mkdir -p $worker_space_dir
hostname -i
DASK_LOGGING__DISTRIBUTED=info dask worker --name $2 --tls-ca-file dask-credentials/dask.crt --tls-cert dask-credentials/dask.crt --tls-key dask-credentials/dask.pem --worker-port 10000:10070 --no-nanny --scheduler-sni daskgateway-"""
/usr/bin/apptainer exec --bind /scratch --bind /projects \
--env DASK_LOGGING__DISTRIBUTED=info \
--env DASK_GATEWAY_CLUSTER_NAME=$1 \
--env DASK_GATEWAY_WORKER_NAME=$2 \
--env DASK_GATEWAY_API_URL="https://dask.software-dev.ncsa.illinois.edu/api" """
+ self.container_image
+ " dask worker --name $2 --tls-ca-file dask-credentials/dask.crt --tls-cert dask-credentials/dask.crt --tls-key dask-credentials/dask.pem --worker-port 10000:10070 --no-nanny --scheduler-sni daskgateway-"
+ cluster_name
+ """ --nthreads 1 tls://"""
+ self.scheduler_proxy_ip
Expand Down
10 changes: 10 additions & 0 deletions htcdaskgateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def new_cluster(
cluster_options=None,
shutdown_on_close=True,
container_image=None,
memory="32GB",
cpus=4,
**kwargs,
):
"""Submit a new cluster to the gateway, and wait for it to be started.
Expand All @@ -36,6 +38,12 @@ def new_cluster(
close. Set to False to have cluster persist until explicitly
shutdown.
container_image : str, Path to the apptainer image to run in the workers
cpus: int number of CPUs to request for each worker
memory: str amount of memory to request for each worker Characters may be appended
to a numerical value to indicate units. K or KB indicates KiB,
2^10 numbers of bytes. M or MB indicates MiB, 2^20 numbers of bytes.
G or GB indicates GiB, 2^30 numbers of bytes. T or TB indicates TiB,
2^40 numbers of bytes.
**kwargs :
Additional cluster configuration options. If ``cluster_options`` is
provided, these are applied afterwards as overrides. Available
Expand All @@ -55,6 +63,8 @@ def new_cluster(
shutdown_on_close=shutdown_on_close,
cluster_options=cluster_options,
container_image=container_image,
memory=memory,
cpus=cpus,
**kwargs,
)

Expand Down