Skip to content

Commit c04a279

Browse files
authored
fix: multiproc mapper max threads and default numprocess (#112)
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
1 parent 43892a9 commit c04a279

File tree

9 files changed

+468
-495
lines changed

9 files changed

+468
-495
lines changed

.pre-commit-config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ repos:
2727
- id: check-ast
2828
- id: check-case-conflict
2929
- id: check-docstring-first
30+
- repo: https://github.com/python-poetry/poetry
31+
rev: "1.6"
32+
hooks:
33+
- id: poetry-check

poetry.lock

Lines changed: 432 additions & 471 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pynumaflow/mapper/_dtypes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import Iterator, Sequence
1+
from collections.abc import Iterator, Sequence, Awaitable
22
from dataclasses import dataclass
33
from datetime import datetime
44
from typing import TypeVar, Callable
@@ -163,3 +163,4 @@ def watermark(self) -> datetime:
163163

164164

165165
MapCallable = Callable[[list[str], Datum], Messages]
166+
MapAsyncCallable = Callable[[list[str], Datum], Awaitable[Messages]]

pynumaflow/mapper/async_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
MAP_SOCK_PATH,
1313
)
1414
from pynumaflow.mapper import Datum
15-
from pynumaflow.mapper._dtypes import MapCallable
15+
from pynumaflow.mapper._dtypes import MapAsyncCallable
1616
from pynumaflow.mapper.proto import map_pb2
1717
from pynumaflow.mapper.proto import map_pb2_grpc
1818
from pynumaflow.types import NumaflowServicerContext
@@ -58,12 +58,12 @@ class AsyncMapper(map_pb2_grpc.MapServicer):
5858

5959
def __init__(
6060
self,
61-
handler: MapCallable,
61+
handler: MapAsyncCallable,
6262
sock_path=MAP_SOCK_PATH,
6363
max_message_size=MAX_MESSAGE_SIZE,
6464
max_threads=MAX_THREADS,
6565
):
66-
self.__map_handler: MapCallable = handler
66+
self.__map_handler: MapAsyncCallable = handler
6767
self.sock_path = f"unix://{sock_path}"
6868
self._max_message_size = max_message_size
6969
self._max_threads = max_threads

pynumaflow/mapper/multiproc_server.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,13 @@ class MultiProcMapper(map_pb2_grpc.MapServicer):
4646
handler: Function callable following the type signature of MapCallable
4747
sock_path: Path to the TCP port to bind to
4848
max_message_size: The max message size in bytes the server can receive and send
49-
max_threads: The max number of threads to be spawned;
50-
defaults to number of processors x4
5149
5250
Example invocation:
5351
>>> from typing import Iterator
5452
>>> from pynumaflow.mapper import Messages, Message \
5553
... Datum, MultiProcMapper
5654
...
57-
>>> def map_handler(key: [str], datum: Datum) -> Messages:
55+
>>> def map_handler(keys: list[str], datum: Datum) -> Messages:
5856
... val = datum.value
5957
... _ = datum.event_time
6058
... _ = datum.watermark
@@ -65,6 +63,15 @@ class MultiProcMapper(map_pb2_grpc.MapServicer):
6563
>>> grpc_server.start()
6664
"""
6765

66+
__slots__ = (
67+
"__map_handler",
68+
"_max_message_size",
69+
"_server_options",
70+
"_sock_path",
71+
"_process_count",
72+
"_threads_per_proc",
73+
)
74+
6875
def __init__(
6976
self,
7077
handler: MapCallable,
@@ -81,10 +88,8 @@ def __init__(
8188
("grpc.so_reuseaddr", 1),
8289
]
8390
self._sock_path = sock_path
84-
self._process_count = int(
85-
os.getenv("NUM_CPU_MULTIPROC") or os.getenv("NUMAFLOW_CPU_LIMIT", 1)
86-
)
87-
self._thread_concurrency = int(os.getenv("MAX_THREADS", 0)) or (self._process_count * 4)
91+
self._process_count = int(os.getenv("NUM_CPU_MULTIPROC") or os.cpu_count())
92+
self._threads_per_proc = int(os.getenv("MAX_THREADS", "4"))
8893

8994
def MapFn(
9095
self, request: map_pb2.MapRequest, context: NumaflowServicerContext
@@ -127,12 +132,16 @@ def IsReady(
127132
"""
128133
return map_pb2.ReadyResponse(ready=True)
129134

130-
def _run_server(self, bind_address):
135+
def _run_server(self, bind_address: str) -> None:
131136
"""Start a server in a subprocess."""
132-
_LOGGER.info("Starting new server.")
137+
_LOGGER.info(
138+
"Starting new server with num_procs: %s, num_threads/proc: %s",
139+
self._process_count,
140+
self._threads_per_proc,
141+
)
133142
server = grpc.server(
134143
futures.ThreadPoolExecutor(
135-
max_workers=self._thread_concurrency,
144+
max_workers=self._threads_per_proc,
136145
),
137146
options=self._server_options,
138147
)

pynumaflow/reducer/_dtypes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from asyncio import Task
2-
from collections.abc import Iterator, Sequence
2+
from collections.abc import Iterator, Sequence, Awaitable
33
from dataclasses import dataclass
44
from datetime import datetime
55
from typing import TypeVar, Callable
@@ -232,4 +232,4 @@ def keys(self) -> list[str]:
232232
return self._key
233233

234234

235-
ReduceCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Messages]
235+
ReduceCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]]

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "pynumaflow"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow."
55
authors = ["NumaFlow Developers"]
66
readme = "README.md"
@@ -26,7 +26,7 @@ grpcio-tools = "^1.48.1"
2626
google-cloud = "^0.34.0"
2727
google-api-core = "^2.11.0"
2828
protobuf = ">=3.20,<5.0"
29-
aiorun = "^2022.11.1"
29+
aiorun = "^2023.7"
3030

3131
[tool.poetry.group.dev]
3232
optional = true

tests/map/test_async_mapper.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,8 @@ def startup_callable(loop):
6060
loop.run_forever()
6161

6262

63-
def NewAsyncMapper(
64-
map_handler=async_map_handler,
65-
):
63+
def new_async_mapper():
6664
udfs = AsyncMapper(handler=async_map_handler)
67-
6865
return udfs
6966

7067

@@ -88,7 +85,7 @@ def setUpClass(cls) -> None:
8885
_loop = loop
8986
_thread = threading.Thread(target=startup_callable, args=(loop,), daemon=True)
9087
_thread.start()
91-
udfs = NewAsyncMapper()
88+
udfs = new_async_mapper()
9289
asyncio.run_coroutine_threadsafe(start_server(udfs), loop=loop)
9390
while True:
9491
try:

tests/map/test_multiproc_mapper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import unittest
33
from unittest import mock
4+
from unittest.mock import patch, Mock
45

56
import grpc
67
from google.protobuf import empty_pb2 as _empty_pb2
@@ -34,7 +35,7 @@ def test_multiproc_init(self) -> None:
3435
self.assertEqual(server._sock_path, 55551)
3536
self.assertEqual(server._process_count, 3)
3637

37-
@mockenv(NUMAFLOW_CPU_LIMIT="4")
38+
@patch("os.cpu_count", Mock(return_value=4))
3839
def test_multiproc_process_count(self) -> None:
3940
server = MultiProcMapper(handler=map_handler)
4041
self.assertEqual(server._sock_path, 55551)

0 commit comments

Comments
 (0)