Skip to content

Commit e9ca0e3

Browse files
Import channel implementations from Python SDK v0.9.0 (#3)
2 parents 50a3ebc + f6dfeea commit e9ca0e3

32 files changed

+2726
-0
lines changed

.darglint

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# -*- mode: ini -*-
2+
3+
[darglint]
4+
docstring_style=google
5+
6+
# DAR402: The docstring describes an exception not explicitly raised.
7+
#
8+
# Ignoring DAR402 because there are cases where public methods
9+
# document exceptions raised by underlying functions.
10+
ignore=DAR402

.github/workflows/ci.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: frequenz-channels-python
2+
3+
on:
4+
push:
5+
branches: [ v0.x.x ]
6+
7+
pull_request:
8+
9+
workflow_dispatch:
10+
11+
jobs:
12+
test:
13+
runs-on: ubuntu-20.04
14+
15+
steps:
16+
- name: Fetch sources
17+
uses: actions/checkout@v2
18+
with:
19+
submodules: true
20+
21+
- name: Set up Python
22+
uses: actions/setup-python@v2
23+
with:
24+
python-version: "3.8"
25+
26+
- uses: actions/cache@v2
27+
with:
28+
path: ~/.cache/pip
29+
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.cfg') }}
30+
restore-keys: |
31+
${{ runner.os }}-pip-
32+
33+
- name: Install required Python packages
34+
run: |
35+
python -m pip install --upgrade pip
36+
python -m pip install nox
37+
38+
- name: run nox
39+
run: nox
40+
timeout-minutes: 10

.gitignore

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Byte-compiled / optimized / DLL files
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class
5+
6+
# C extensions
7+
*.so
8+
9+
# Distribution / packaging
10+
.Python
11+
build/
12+
develop-eggs/
13+
dist/
14+
downloads/
15+
eggs/
16+
.eggs/
17+
lib/
18+
lib64/
19+
parts/
20+
sdist/
21+
var/
22+
wheels/
23+
pip-wheel-metadata/
24+
share/python-wheels/
25+
*.egg-info/
26+
.installed.cfg
27+
*.egg
28+
MANIFEST
29+
30+
# PyInstaller
31+
# Usually these files are written by a python script from a template
32+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
33+
*.manifest
34+
*.spec
35+
36+
# Installer logs
37+
pip-log.txt
38+
pip-delete-this-directory.txt
39+
40+
# Unit test / coverage reports
41+
htmlcov/
42+
.tox/
43+
.nox/
44+
.coverage
45+
.coverage.*
46+
.cache
47+
nosetests.xml
48+
coverage.xml
49+
*.cover
50+
*.py,cover
51+
.hypothesis/
52+
.pytest_cache/
53+
54+
# Translations
55+
*.mo
56+
*.pot
57+
58+
# Django stuff:
59+
*.log
60+
local_settings.py
61+
db.sqlite3
62+
db.sqlite3-journal
63+
64+
# Flask stuff:
65+
instance/
66+
.webassets-cache
67+
68+
# Scrapy stuff:
69+
.scrapy
70+
71+
# Sphinx documentation
72+
docs/_build/
73+
74+
# PyBuilder
75+
target/
76+
77+
# Jupyter Notebook
78+
.ipynb_checkpoints
79+
80+
# IPython
81+
profile_default/
82+
ipython_config.py
83+
84+
# pyenv
85+
.python-version
86+
87+
# pipenv
88+
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
89+
# However, in case of collaboration, if having platform-specific dependencies or dependencies
90+
# having no cross-platform support, pipenv may install dependencies that don't work, or not
91+
# install all needed dependencies.
92+
#Pipfile.lock
93+
94+
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
95+
__pypackages__/
96+
97+
# Celery stuff
98+
celerybeat-schedule
99+
celerybeat.pid
100+
101+
# SageMath parsed files
102+
*.sage.py
103+
104+
# Environments
105+
.env
106+
.venv
107+
env/
108+
venv/
109+
ENV/
110+
env.bak/
111+
venv.bak/
112+
113+
# Spyder project settings
114+
.spyderproject
115+
.spyproject
116+
117+
# Rope project settings
118+
.ropeproject
119+
120+
# mkdocs documentation
121+
/site
122+
123+
# mypy
124+
.mypy_cache/
125+
.dmypy.json
126+
dmypy.json
127+
128+
# Pyre type checker
129+
.pyre/

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Frequenz channels
2+
3+
This repository contains channel implementations for python.

benchmarks/benchmark_anycast.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""Benchmark for Anycast channels.
2+
3+
Copyright
4+
Copyright © 2022 Frequenz Energy-as-a-Service GmbH
5+
6+
License
7+
MIT
8+
"""
9+
10+
import asyncio
11+
import csv
12+
import timeit
13+
from typing import Any, Coroutine, Dict, List, Tuple
14+
15+
from frequenz.channels import Anycast, Receiver, Sender
16+
17+
18+
async def send_msg(num_messages: int, chan: Sender[int]) -> None:
19+
"""Send messages to the channel continuously.
20+
21+
Args:
22+
num_messages (int): Number of messages to send.
23+
chan (Sender[int]): Channel sender to send the messages to.
24+
"""
25+
# send one message for each receiver
26+
for ctr in range(num_messages):
27+
await chan.send(ctr + 1)
28+
29+
30+
async def benchmark_anycast(
31+
num_channels: int,
32+
num_messages: int,
33+
num_receivers: int,
34+
buffer_size: int,
35+
) -> int:
36+
"""Ensure sent messages are received by one receiver.
37+
38+
Args:
39+
num_channels (int): Number of channels to create.
40+
num_messages (int): Number of messages to send per channel.
41+
num_receivers (int): Number of broadcast receivers per channel.
42+
buffer_size (int): Buffer size of each channel.
43+
44+
Returns:
45+
int: Total number of messages received by all channels.
46+
"""
47+
channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
48+
senders = [
49+
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
50+
for bcast in channels
51+
]
52+
53+
# Even though we want just a single int, use a list, so that it can be
54+
# updated from other methods.
55+
recv_trackers = [0]
56+
57+
async def update_tracker_on_receive(chan: Receiver[int]) -> None:
58+
while True:
59+
msg = await chan.receive()
60+
if msg is None:
61+
return
62+
recv_trackers[0] += 1
63+
64+
receivers = []
65+
for acast in channels:
66+
for _ in range(num_receivers):
67+
receivers.append(update_tracker_on_receive(acast.get_receiver()))
68+
69+
receivers_runs = asyncio.gather(*receivers)
70+
71+
await asyncio.gather(*senders)
72+
for bcast in channels:
73+
await bcast.close()
74+
await receivers_runs
75+
return recv_trackers[0]
76+
77+
78+
def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]:
79+
"""Run a task and return the time taken and the result.
80+
81+
Args:
82+
task (asyncio.Task): Task to run.
83+
84+
Returns:
85+
(float, Any): Run time in fractional seconds, task return value.
86+
"""
87+
start = timeit.default_timer()
88+
ret = asyncio.run(task)
89+
return timeit.default_timer() - start, ret
90+
91+
92+
def run_one(
93+
num_channels: int,
94+
num_messages: int,
95+
num_receivers: int,
96+
buffer_size: int,
97+
) -> Dict[str, Any]:
98+
"""Run a single benchmark."""
99+
runtime, total_msgs = time_async_task(
100+
benchmark_anycast(num_channels, num_messages, num_receivers, buffer_size)
101+
)
102+
ret = {
103+
"channels": num_channels,
104+
"messages_per_channel": num_messages,
105+
"receivers": num_receivers,
106+
"buffer_size": buffer_size,
107+
"total_messages": total_msgs,
108+
"runtime": f"{runtime:.3f}",
109+
}
110+
111+
return ret
112+
113+
114+
def run() -> None:
115+
"""Run all benchmarks."""
116+
with open("/dev/stdout", "w", encoding="utf-8") as csvfile:
117+
fields = run_one(1, 0, 1, 1)
118+
out = csv.DictWriter(csvfile, fields.keys())
119+
out.writeheader()
120+
out.writerow(run_one(1, 1000000, 1, 100))
121+
out.writerow(run_one(1, 1000000, 1, 1000))
122+
out.writerow(run_one(1, 1000000, 10, 100))
123+
out.writerow(run_one(1, 1000000, 10, 1000))
124+
out.writerow(run_one(1000, 1000, 1, 100))
125+
out.writerow(run_one(1000, 1000, 1, 1000))
126+
out.writerow(run_one(1000, 1000, 10, 100))
127+
out.writerow(run_one(1000, 1000, 10, 1000))
128+
129+
130+
if __name__ == "__main__":
131+
run()

0 commit comments

Comments
 (0)