|
| 1 | +.. |dask_kubernetes_backend| replace:: :py:class:`~openff.evaluator.backends.dask_kubernetes.DaskKubernetesBackend` |
| 2 | +.. |dask_kubernetes_existing_backend| replace:: :py:class:`~openff.evaluator.backends.dask_kubernetes.DaskKubernetesExistingBackend` |
| 3 | +.. |evaluator_server| replace:: :py:class:`~openff.evaluator.server.EvaluatorServer` |
| 4 | +.. |evaluator_client| replace:: :py:class:`~openff.evaluator.client.EvaluatorClient` |
| 5 | +.. |pod_resources| replace:: :py:class:`~openff.evaluator.backends.backends.PodResources` |
| 6 | +.. |compute_resources| replace:: :py:class:`~openff.evaluator.backends.ComputeResources` |
| 7 | +.. |kubernetes_persistent_volume_claim| replace:: :py:class:`~openff.evaluator.backends.dask_kubernetes.KubernetesPersistentVolumeClaim` |
| 8 | +.. |kubernetes_secret| replace:: :py:class:`~openff.evaluator.backends.dask_kubernetes.KubernetesSecret` |
| 9 | +.. |openmm_simulation| replace:: :py:class:`~openff.evaluator.protocols.openmm.OpenMMSimulation` |
| 10 | + |
| 11 | + |
| 12 | +Dask Kubernetes Backend |
| 13 | +======================== |
| 14 | + |
| 15 | +The framework implements a special set of calculation backends which integrate with the ``dask`` `distributed <https:// |
| 16 | +distributed.dask.org/>`_ and `dask-kubernetes <https://kubernetes.dask.org/en/latest/>`_ libraries. |
| 17 | +These backends are designed to run on the `National Research Platform <https://nationalresearchplatform.org/nautilus/>`_ |
| 18 | +(NRP) and have not been otherwise tested. |
| 19 | + |
| 20 | + |
| 21 | +Several separate components are required for executing Evaluator on NRP due to the limited user permissions we have: |
| 22 | + |
| 23 | +* a shared filesystem that is accessible by the |evaluator_server| and the |dask_kubernetes_backend|. |
| 24 | + Typically this is constructed with a `PersistentVolumeClaim <https://ucsd-prp.gitlab.io/userdocs/tutorial/storage/>`_. |
| 25 | +* a |dask_kubernetes_backend| that can submit tasks to the Kubernetes cluster. This must be initiated locally with NRP. |
| 26 | + The backend must have the PVC mounted. |
| 27 | +* an |evaluator_server|, running remotely on a deployment on NRP, that can receive tasks from the local |evaluator_client|. |
| 28 | + This needs to connect to the |dask_kubernetes_backend| to submit tasks to the Kubernetes cluster. |
| 29 | + If permissions are limited as they are on NRP, you may not be able to create the |dask_kubernetes_backend| remotely. |
| 30 | + In that case, you will need a |dask_kubernetes_existing_backend| to connect to an existing KubeCluster. |
| 31 | +* the |evaluator_server| port forwarded so a local |evaluator_client| can communicate with the |evaluator_server|. |
| 32 | + |
| 33 | + |
| 34 | +PersistentVolumeClaims in Python |
| 35 | +-------------------------------- |
| 36 | + |
| 37 | +A PVC can be constructed with `this tutorial <https://ucsd-prp.gitlab.io/userdocs/tutorial/storage/>`_, |
| 38 | +or dynamically through Python using the Kubernetes client:: |
| 39 | + |
| 40 | + import time |
| 41 | + from kubernetes import client, config |
| 42 | + from openff.units import unit |
| 43 | + |
| 44 | + core_v1 = client.CoreV1Api() |
| 45 | + |
| 46 | + # from https://ucsd-prp.gitlab.io/userdocs/storage/ceph/#currently-available-storageclasses |
| 47 | + storage_class_name = "rook-cephfs-central" |
| 48 | + |
| 49 | + # required space to request |
| 50 | + storage_space = 1 * unit.gigabytes |
| 51 | + |
| 52 | + pvc_spec = client.V1PersistentVolumeClaimSpec( |
| 53 | + access_modes=["ReadWriteMany"], |
| 54 | + storage_class_name=storage_class_name, |
| 55 | + resources=client.V1ResourceRequirements( |
| 56 | + requests={ |
| 57 | + "storage": f"{storage_space.to(unit.gigabytes).m}Gi", |
| 58 | + } |
| 59 | + ), |
| 60 | + ) |
| 61 | + |
| 62 | + |
| 63 | + pvc_name = f"evaluator-storage-{job_name}" |
| 64 | + metadata = client.V1ObjectMeta(name=pvc_name) |
| 65 | + pvc = client.V1PersistentVolumeClaim( |
| 66 | + api_version="v1", |
| 67 | + kind="PersistentVolumeClaim", |
| 68 | + metadata=metadata, |
| 69 | + spec=pvc_spec, |
| 70 | + ) |
| 71 | + api_response = core_v1.create_namespaced_persistent_volume_claim( |
| 72 | + namespace=namespace, |
| 73 | + body=pvc |
| 74 | + ) |
| 75 | + logger.info( |
| 76 | + f"Created PVC {pvc.metadata.name}. State={api_response.status.phase}" |
| 77 | + ) |
| 78 | + |
| 79 | + # wait for PVC to bind |
| 80 | + timeout = 1000 |
| 81 | + end_time = time.time() + timeout |
| 82 | + while time.time() < end_time: |
| 83 | + pvc = core_v1.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace) |
| 84 | + if pvc.status.phase == "Bound": |
| 85 | + logger.info(f"PVC '{pvc_name}' is Bound.") |
| 86 | + return pvc_name |
| 87 | + logger.info(f"Waiting for PVC '{pvc_name}' to become Bound. Current phase: {pvc.status.phase}") |
| 88 | + time.sleep(5) |
| 89 | + |
| 90 | + |
| 91 | +Dask Kubernetes Cluster |
| 92 | +----------------------- |
| 93 | + |
| 94 | +The |dask_kubernetes_backend| backend wraps around the dask `Dask KubeCluster <https://kubernetes.dask.org/en/latest/operator_kubecluster.html>`_ |
| 95 | +class to distribute tasks on Kubernetes:: |
| 96 | + |
| 97 | + # replace with own docker image |
| 98 | + docker_image = "ghcr.io/lilyminium/openff-images:tmp-evaluator-dask-v2" |
| 99 | + cluster_name = "evaluator-cluster" |
| 100 | + namespace = "openforcefield" # namespace on NRP |
| 101 | + |
| 102 | + backend = DaskKubernetesBackend( |
| 103 | + cluster_name=cluster_name, |
| 104 | + gpu_resources_per_worker=gpu_resources_per_worker, # see below |
| 105 | + cpu_resources_per_worker=cpu_resources_per_worker, # see below |
| 106 | + image=image, |
| 107 | + namespace=namespace, |
| 108 | + env={ |
| 109 | + "OE_LICENSE": "/secrets/oe_license.txt", |
| 110 | + # daemonic processes are not allowed to have children |
| 111 | + "DASK_DISTRIBUTED__WORKER__DAEMON": "False", |
| 112 | + "DASK_LOGGING__DISTRIBUTED": "debug", |
| 113 | + "DASK__TEMPORARY_DIRECTORY": "/evaluator-storage", |
| 114 | + "STORAGE_DIRECTORY": "/evaluator-storage", |
| 115 | + "EXTRA_PIP_PACKAGES": "jupyterlab" |
| 116 | + }, |
| 117 | + volumes=[volume], # see below |
| 118 | + secrets=[secret], # see below |
| 119 | + annotate_resources=True, # see below |
| 120 | + cluster_kwargs={"resource_timeout": 300} |
| 121 | + ) |
| 122 | + |
| 123 | + |
| 124 | +Specifying pod resources |
| 125 | +~~~~~~~~~~~~~~~~~~~~~~~~ |
| 126 | + |
| 127 | +Pod resources should be specified using |pod_resources|, which works analogously to |compute_resources|, |
| 128 | +but encodes settings for Kubernetes pods. For example:: |
| 129 | + |
| 130 | + from openff.units import unit |
| 131 | + |
| 132 | + ephemeral_storage = 20 * unit.gigabytes |
| 133 | + memory = 8 * unit.gigabytes |
| 134 | + |
| 135 | + gpu_resources_per_worker=PodResources( |
| 136 | + minimum_number_of_workers=0, |
| 137 | + maximum_number_of_workers=10, |
| 138 | + number_of_threads=1, |
| 139 | + memory_limit=memory, |
| 140 | + ephemeral_storage_limit=ephemeral_storage, |
| 141 | + number_of_gpus=1, |
| 142 | + preferred_gpu_toolkit=ComputeResources.GPUToolkit.CUDA, |
| 143 | + ) |
| 144 | + cpu_resources_per_worker=PodResources( |
| 145 | + minimum_number_of_workers=0, |
| 146 | + maximum_number_of_workers=40, |
| 147 | + number_of_threads=1, |
| 148 | + memory_limit=memory, |
| 149 | + ephemeral_storage_limit=ephemeral_storage, |
| 150 | + number_of_gpus=0, |
| 151 | + ) |
| 152 | + |
| 153 | + |
| 154 | +Specifying volumes |
| 155 | +~~~~~~~~~~~~~~~~~~ |
| 156 | + |
| 157 | +Volumes should be specified as a list of |kubernetes_persistent_volume_claim| objects. For example:: |
| 158 | + |
| 159 | + volume = KubernetesPersistentVolumeClaim( |
| 160 | + name="evaluator-storage", # `pvc_name`, the name of the PVC |
| 161 | + mount_path="/evaluator-storage", # where to mount the PVC |
| 162 | + ) |
| 163 | + |
| 164 | + |
| 165 | +Specifying secrets |
| 166 | +~~~~~~~~~~~~~~~~~~ |
| 167 | + |
| 168 | +Secrets should be specified as a list of |kubernetes_secret| objects. For example:: |
| 169 | + |
| 170 | + secret = KubernetesSecret( |
| 171 | + name="openeye-license", |
| 172 | + secret_name="oe-license", |
| 173 | + mount_path="/secrets/oe_license.txt", |
| 174 | + sub_path="oe_license.txt", |
| 175 | + read_only=True, |
| 176 | + ) |
| 177 | + |
| 178 | + |
| 179 | +This example of mounting an OpenEye license mounts the ``secret_name`` secret |
| 180 | +at the ``mount_path`` path in the pod, at the ``sub_path`` path. |
| 181 | + |
| 182 | +.. note:: |
| 183 | + |
| 184 | + A secret should first be created in Kubernetes as following |
| 185 | + `the documentation <https://kubernetes.io/docs/tasks/configmap-secret/managing-secret-using-kubectl/#create-a-secret>`_. |
| 186 | + |
| 187 | + |
| 188 | +Annotating resources |
| 189 | +~~~~~~~~~~~~~~~~~~~~ |
| 190 | + |
| 191 | +Dask allows you to specify whether tasks require particular |
| 192 | +`resources <https://distributed.dask.org/en/latest/resources.html>`_ to be available on the worker used |
| 193 | +to execute them. Setting ``annotate_resources=True`` will split tasks into those that can only be |
| 194 | +executed on GPU workers, and those that can only be executed on CPU workers. |
| 195 | +Simulation protocols such as |openmm_simulation| are executed on GPUs, whereas tasks such as packing boxes |
| 196 | +are executed on CPUs. Splitting tasks this way will increase the GPU utilization of GPU workers. |
| 197 | + |
| 198 | +The resources specified are 'GPU' (set to 0.5 per protocol to encourage multiple protocols to run on the same worker), |
| 199 | +and 'notGPU' (set to 1 per protocol). Workers are run with either the 'GPU' or 'notGPU' resource, and tasks are |
| 200 | +allocated to workers based on the resources they require. |
| 201 | + |
| 202 | +Setting ``annotate_resources=False`` will allow tasks to be executed on any worker. |
| 203 | + |
| 204 | + |
| 205 | + |
| 206 | +Dask Kubernetes Existing Backend |
| 207 | +-------------------------------- |
| 208 | + |
| 209 | +If you are unable to create a |dask_kubernetes_backend| remotely, you can connect to an existing KubeCluster |
| 210 | +with the |dask_kubernetes_existing_backend| with the same arguments:: |
| 211 | + |
| 212 | + from openff.evaluator.backends.dask_kubernetes import DaskKubernetesExistingBackend |
| 213 | + |
| 214 | + backend = DaskKubernetesExistingBackend( |
| 215 | + cluster_name=cluster_name, |
| 216 | + gpu_resources_per_worker=gpu_resources_per_worker, |
| 217 | + cpu_resources_per_worker=cpu_resources_per_worker, |
| 218 | + image=image, |
| 219 | + namespace=namespace, |
| 220 | + env={ |
| 221 | + "OE_LICENSE": "/secrets/oe_license.txt", |
| 222 | + # daemonic processes are not allowed to have children |
| 223 | + "DASK_DISTRIBUTED__WORKER__DAEMON": "False", |
| 224 | + "DASK_LOGGING__DISTRIBUTED": "debug", |
| 225 | + "DASK__TEMPORARY_DIRECTORY": "/evaluator-storage", |
| 226 | + "STORAGE_DIRECTORY": "/evaluator-storage", |
| 227 | + "EXTRA_PIP_PACKAGES": "jupyterlab" |
| 228 | + }, |
| 229 | + volumes=[volume], |
| 230 | + secrets=[secret], |
| 231 | + annotate_resources=True, |
| 232 | + cluster_kwargs={"resource_timeout": 300} |
| 233 | + ) |
| 234 | + |
| 235 | +Not all of these are important to keep the same, as this cluster simply connects to an |
| 236 | +already initialized |dask_kubernetes_backend|. However, the following are important to keep the same: |
| 237 | + |
| 238 | +* ``cluster_name`` -- for connection |
| 239 | +* ``namespace`` -- for connection |
| 240 | +* ``gpu_resources_per_worker`` -- the `preferred_gpu_toolkit` is important here, although not the number of workers |
| 241 | +* ``volumes`` -- the PVC must be mounted |
| 242 | +* ``secrets`` -- an OpenEye license would ideally be mounted |
| 243 | +* ``annotate_resources`` -- this controls whether or not to split tasks between GPU/CPU workers |
| 244 | + |
| 245 | + |
| 246 | +Deployment |
| 247 | +~~~~~~~~~~ |
| 248 | + |
| 249 | +The |evaluator_server| can be deployed remotely on NRP with the following command:: |
| 250 | + |
| 251 | + with backend: |
| 252 | + evaluator_server = EvaluatorServer( |
| 253 | + backend=backend, |
| 254 | + port=port, |
| 255 | + debug=True, |
| 256 | + ) |
| 257 | + evaluator_server.start(asynchronous=False) |
| 258 | + |
| 259 | +Ideally this should be done on a Kubernetes deployment to ensure the |evaluator_server| is always running. |
| 260 | +The |evaluator_server| should be port forwarded to allow ForceBalance to communicate with it on a ``server_port``. |
0 commit comments