Skip to content

Commit 7ac5fb7

Browse files
committed
add k8s example code
1 parent 5112717 commit 7ac5fb7

File tree

3 files changed

+1326
-3
lines changed

3 files changed

+1326
-3
lines changed

pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@ authors = [
88
]
99
requires-python = ">=3.13"
1010
dependencies = [
11-
"mdanalysis[analysis,parallelization]==2.10.0",
11+
"mdanalysis[analysis,parallel]==2.10.0",
1212
]
1313

1414
[build-system]
1515
requires = ["uv_build>=0.8.15,<0.9.0"]
1616
build-backend = "uv_build"
17+
18+
[dependency-groups]
19+
k8s = [
20+
"dask-kubernetes>=2025.7.0",
21+
]
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import MDAnalysis as mda
2+
from dask.distributed import Client, get_client
3+
from MDAnalysis.analysis.backends import BackendBase
4+
from MDAnalysis.analysis.dssp import DSSP
5+
from dask_kubernetes.operator import KubeCluster
6+
from dataclasses import dataclass
7+
from dask.delayed import delayed
8+
9+
from typing import NamedTuple, Literal
10+
11+
12+
class Resources(NamedTuple):
13+
memory: str
14+
cpu: str
15+
16+
def as_dict(self):
17+
return {'memory': self.memory, 'cpu': self.cpu}
18+
19+
@dataclass(frozen=True)
20+
class Config:
21+
"""
22+
Example:
23+
--------
24+
25+
```python
26+
config = {
27+
"name": "dask-cluster-2",
28+
"image": "<redacted>",
29+
"n_workers": 7,
30+
"resources":{"requests": {"memory": "2Gi", "cpu": "800m"},
31+
"limits": {"memory": "4Gi", "cpu": "1"}}
32+
}
33+
```
34+
"""
35+
36+
name: str
37+
image: str
38+
n_workers: int
39+
resources: dict[Literal['requests', 'limits'], Resources]
40+
41+
def as_dict(self):
42+
resources = {k:v.as_dict() for k, v in self.resources.items()}
43+
return {'name': self.name, 'image': self.image, 'n_workers': self.n_workers, 'resources': resources}
44+
45+
46+
def start_cluster(config: Config) -> KubeCluster:
47+
return KubeCluster(**config.as_dict())
48+
49+
def get_client_for(cluster: KubeCluster) -> Client:
50+
return Client(cluster)
51+
52+
53+
class DistributedBackend(BackendBase):
54+
def __init__(self, n_workers):
55+
super().__init__(n_workers)
56+
57+
def assign_client(self, client):
58+
self.client = client
59+
60+
def apply(self, func, computations):
61+
return self.client.compute([delayed(func)(c) for c in computations], sync=True)
62+
63+
64+
if __name__ == '__main__':
65+
u = mda.Universe("YiiP_lipids.gro.gz", "YiiP_lipids.xtc")
66+
backend = DistributedBackend(n_workers=5)
67+
config = ...
68+
cluster = start_cluster(config)
69+
client = get_client(cluster)
70+
backend.assign_client(client)
71+
72+
# test computation
73+
tasks = [delayed(lambda x: x + 1)(i) for i in range(11)]
74+
print(f"{client.compute(tasks, sync=True)=}")
75+
76+
# actual MDAnalysis computation
77+
print(DSSP(u).run(backend=backend, unsupported_backend=True).results)

0 commit comments

Comments
 (0)