diff --git a/htcdaskgateway/_version.py b/htcdaskgateway/_version.py index fb962e3..2fddb88 100644 --- a/htcdaskgateway/_version.py +++ b/htcdaskgateway/_version.py @@ -17,5 +17,5 @@ __version_tuple__: VERSION_TUPLE version_tuple: VERSION_TUPLE -__version__ = version = '1.0.2.dev6+g5379e9a.d20250418' -__version_tuple__ = version_tuple = (1, 0, 2, 'dev6', 'g5379e9a.d20250418') +__version__ = version = '1.0.5.dev3+gf1d34f4' +__version_tuple__ = version_tuple = (1, 0, 5, 'dev3', 'gf1d34f4') diff --git a/htcdaskgateway/cluster.py b/htcdaskgateway/cluster.py index 3b05b9f..31cfd52 100644 --- a/htcdaskgateway/cluster.py +++ b/htcdaskgateway/cluster.py @@ -15,11 +15,15 @@ class HTCGatewayCluster(GatewayCluster): - def __init__(self, container_image=None, **kwargs): + def __init__( + self, container_image=None, memory: str = "32GB", cpus: int = 4, **kwargs + ): self.scheduler_proxy_ip = kwargs.pop("", "dask.software-dev.ncsa.illinois.edu") self.batchWorkerJobs = [] self.cluster_options = kwargs.get("cluster_options") self.container_image = container_image + self.memory = memory + self.cpus = cpus self.condor_bin_dir = os.environ["CONDOR_BIN_DIR"] super().__init__(**kwargs) @@ -87,6 +91,7 @@ def scale_batch_workers(self, n): f.write(security.tls_key) # Prepare JDL + resources = f" request_memory = {self.memory} \n request_cpus = {self.cpus}" jdl = ( """executable = start.sh arguments = """ @@ -94,11 +99,8 @@ def scale_batch_workers(self, n): + """ htcdask-worker_$(Cluster)_$(Process) output = condor/htcdask-worker$(Cluster)_$(Process).out error = condor/htcdask-worker$(Cluster)_$(Process).err -log = condor/htcdask-worker$(Cluster)_$(Process).log -request_cpus = 4 -request_memory = 32GB -container_image=""" - + self.container_image +log = condor/htcdask-worker$(Cluster)_$(Process).log""" + + resources + """ should_transfer_files = yes transfer_input_files = ./dask-credentials, ./dask-worker-space , ./condor @@ -119,11 +121,17 @@ def scale_batch_workers(self, n): export APPTAINERENV_DASK_GATEWAY_API_URL="https://dask.software-dev.ncsa.illinois.edu/api" export APPTAINERENV_DASK_GATEWAY_CLUSTER_NAME=$1 #export APPTAINERENV_DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info" - +export DASK_LOGGING__DISTRIBUTED=info worker_space_dir=${PWD}/dask-worker-space/$2 mkdir -p $worker_space_dir hostname -i -DASK_LOGGING__DISTRIBUTED=info dask worker --name $2 --tls-ca-file dask-credentials/dask.crt --tls-cert dask-credentials/dask.crt --tls-key dask-credentials/dask.pem --worker-port 10000:10070 --no-nanny --scheduler-sni daskgateway-""" +/usr/bin/apptainer exec --bind /scratch --bind /projects \ + --env DASK_LOGGING__DISTRIBUTED=info \ + --env DASK_GATEWAY_CLUSTER_NAME=$1 \ + --env DASK_GATEWAY_WORKER_NAME=$2 \ + --env DASK_GATEWAY_API_URL="https://dask.software-dev.ncsa.illinois.edu/api" """ + + self.container_image + + " dask worker --name $2 --tls-ca-file dask-credentials/dask.crt --tls-cert dask-credentials/dask.crt --tls-key dask-credentials/dask.pem --worker-port 10000:10070 --no-nanny --scheduler-sni daskgateway-" + cluster_name + """ --nthreads 1 tls://""" + self.scheduler_proxy_ip diff --git a/htcdaskgateway/gateway.py b/htcdaskgateway/gateway.py index d3ee03f..0ecf76c 100644 --- a/htcdaskgateway/gateway.py +++ b/htcdaskgateway/gateway.py @@ -23,6 +23,8 @@ def new_cluster( cluster_options=None, shutdown_on_close=True, container_image=None, + memory="32GB", + cpus=4, **kwargs, ): """Submit a new cluster to the gateway, and wait for it to be started. @@ -36,6 +38,12 @@ def new_cluster( close. Set to False to have cluster persist until explicitly shutdown. container_image : str, Path to the apptainer image to run in the workers + cpus: int number of CPUs to request for each worker + memory: str amount of memory to request for each worker Characters may be appended + to a numerical value to indicate units. K or KB indicates KiB, + 2^10 numbers of bytes. M or MB indicates MiB, 2^20 numbers of bytes. + G or GB indicates GiB, 2^30 numbers of bytes. T or TB indicates TiB, + 2^40 numbers of bytes. **kwargs : Additional cluster configuration options. If ``cluster_options`` is provided, these are applied afterwards as overrides. Available @@ -55,6 +63,8 @@ def new_cluster( shutdown_on_close=shutdown_on_close, cluster_options=cluster_options, container_image=container_image, + memory=memory, + cpus=cpus, **kwargs, )