Skip to content

Commit 6e09b75

Browse files
Integrate uvloop async engine
This commit integrates the `uvloop` asynchronous engine as a drop-in replacement for `asyncio` when Python runs on top of the `Cython` library. The `uvloop` library is faster than the standard async engine, improving StreamFlow performance epsecially on HTC workflows.
1 parent 81ca086 commit 6e09b75

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

examples/mpi/streamflow.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ workflows:
99
bindings:
1010
- step: /compile
1111
target:
12-
deployment: k8s-mpi
12+
deployment: helm-mpi
1313
service: openmpi
1414
- step: /execute
1515
target:
16-
deployment: k8s-mpi
16+
deployment: helm-mpi
1717
locations: 2
1818
service: openmpi
1919
deployments:
@@ -28,7 +28,6 @@ deployments:
2828
type: helm
2929
config:
3030
chart: environment/helm/openmpi
31-
kubeconfig: ~/.kube/config-streamflow
3231
releaseName: openmpi-rel
3332
workdir: /tmp
3433
k8s-mpi:
@@ -37,5 +36,4 @@ deployments:
3736
files:
3837
- environment/k8s/secrets.yaml
3938
- environment/k8s/deployment.yaml
40-
kubeconfig: ~/.kube/config-streamflow
4139
workdir: /tmp

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ psutil==7.1.3
1313
referencing==0.37.0
1414
rdflib==7.4.0
1515
typing-extensions==4.15.0
16-
yattag==1.16.1
16+
uvloop==0.22.1
17+
yattag==1.16.1

streamflow/main.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import asyncio
55
import logging
66
import os
7+
import platform
78
import sys
89
import uuid
10+
import uvloop
911
from collections.abc import MutableMapping, Sequence
1012
from typing import TYPE_CHECKING, Any
1113

@@ -252,17 +254,22 @@ def build_context(config: MutableMapping[str, Any]) -> StreamFlowContext:
252254
def main(args: Sequence[str]) -> int:
253255
try:
254256
parsed_args = parser.parse_args(args)
257+
if sys.platform != "win32" and platform.python_implementation() == "CPython":
258+
logger.info("CPython detected: using uvloop EventLoop implementation")
259+
engine = uvloop
260+
else:
261+
engine = asyncio
255262
match parsed_args.context:
256263
case "ext":
257-
asyncio.run(_async_ext(parsed_args))
264+
engine.run(_async_ext(parsed_args))
258265
case "list":
259-
asyncio.run(_async_list(parsed_args))
266+
engine.run(_async_list(parsed_args))
260267
case "plugin":
261-
asyncio.run(_async_plugin(parsed_args))
268+
engine.run(_async_plugin(parsed_args))
262269
case "prov":
263-
asyncio.run(_async_prov(parsed_args))
270+
engine.run(_async_prov(parsed_args))
264271
case "report":
265-
asyncio.run(_async_report(parsed_args))
272+
engine.run(_async_report(parsed_args))
266273
case "run":
267274
if parsed_args.quiet:
268275
logger.setLevel(logging.WARNING)
@@ -278,7 +285,7 @@ def main(args: Sequence[str]) -> int:
278285
logger.handlers = []
279286
logger.addHandler(colored_stream_handler)
280287
logger.addFilter(HighlitingFilter())
281-
asyncio.run(_async_run(parsed_args))
288+
engine.run(_async_run(parsed_args))
282289
case "schema":
283290
load_extensions()
284291
print(SfSchema().dump(parsed_args.version, parsed_args.pretty))

0 commit comments

Comments
 (0)