Skip to content

Commit c5a46bb

Browse files
authored
chore: use same server info for all map modes and refactor (#185)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent 5bbbe56 commit c5a46bb

File tree

19 files changed

+185
-132
lines changed

19 files changed

+185
-132
lines changed

examples/batchmap/flatmap/pipeline.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ spec:
1212
duration: 1s
1313
- name: batch-flatmap
1414
partitions: 2
15-
metadata:
16-
annotations:
17-
numaflow.numaproj.io/batch-map: "true"
1815
scale:
1916
min: 1
2017
udf:

examples/mapstream/flatmap_stream/pipeline.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ spec:
1313
rpu: 10
1414
duration: 1s
1515
- name: flatmap
16-
metadata:
17-
annotations:
18-
numaflow.numaproj.io/map-stream: "true"
1916
limits:
2017
readBatchSize: 1
2118
udf:

pynumaflow/_constants.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121

2222
# Server information file configs
2323
MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"
24-
MAP_STREAM_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapstreamer-server-info"
2524
REDUCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/reducer-server-info"
2625
REDUCE_STREAM_SERVER_INFO_FILE_PATH = "/var/run/numaflow/reducestreamer-server-info"
2726
SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcetransformer-server-info"
2827
SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info"
2928
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
3029
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
3130
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
32-
BATCH_MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/batchmapper-server-info"
3331

3432
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
3533
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"

pynumaflow/batchmapper/async_server.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
NUM_THREADS_DEFAULT,
77
_LOGGER,
88
BATCH_MAP_SOCK_PATH,
9-
BATCH_MAP_SERVER_INFO_FILE_PATH,
9+
MAP_SERVER_INFO_FILE_PATH,
1010
MAX_NUM_THREADS,
1111
)
1212
from pynumaflow.batchmapper._dtypes import BatchMapCallable
1313
from pynumaflow.batchmapper.servicer.async_servicer import AsyncBatchMapServicer
14+
from pynumaflow.info.types import ServerInfo, MAP_MODE_KEY, MapMode
1415
from pynumaflow.proto.batchmapper import batchmap_pb2_grpc
1516
from pynumaflow.shared.server import NumaflowServer, start_async_server
1617

@@ -26,7 +27,7 @@ def __init__(
2627
sock_path=BATCH_MAP_SOCK_PATH,
2728
max_message_size=MAX_MESSAGE_SIZE,
2829
max_threads=NUM_THREADS_DEFAULT,
29-
server_info_file=BATCH_MAP_SERVER_INFO_FILE_PATH,
30+
server_info_file=MAP_SERVER_INFO_FILE_PATH,
3031
):
3132
"""
3233
Create a new grpc Async Batch Map Server instance.
@@ -94,13 +95,23 @@ async def aexec(self):
9495
# same thread as the event loop so that all the async calls are made in the
9596
# same context
9697
# Create a new async server instance and add the servicer to it
97-
server = grpc.aio.server()
98+
server = grpc.aio.server(options=self._server_options)
9899
server.add_insecure_port(self.sock_path)
99100
batchmap_pb2_grpc.add_BatchMapServicer_to_server(
100101
self.servicer,
101102
server,
102103
)
103104
_LOGGER.info("Starting Batch Map Server")
105+
serv_info = ServerInfo.get_default_server_info()
106+
# Add the MAP_MODE metadata to the server info for the correct map mode
107+
serv_info.metadata[MAP_MODE_KEY] = MapMode.BatchMap
108+
109+
# Start the async server
104110
await start_async_server(
105-
server, self.sock_path, self.max_threads, self._server_options, self.server_info_file
111+
server_async=server,
112+
sock_path=self.sock_path,
113+
max_threads=self.max_threads,
114+
cleanup_coroutines=list(),
115+
server_info_file=self.server_info_file,
116+
server_info=serv_info,
106117
)

pynumaflow/info/server.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
from importlib.metadata import version
32
from typing import Any
43

54
from pynumaflow import setup_logging
@@ -12,19 +11,6 @@
1211
_LOGGER.setLevel(logging.DEBUG)
1312

1413

15-
def get_sdk_version() -> str:
16-
"""
17-
Return the pynumaflow SDK version
18-
"""
19-
try:
20-
return version("pynumaflow")
21-
except Exception as e:
22-
# Adding this to handle the case for local test/CI where pynumaflow
23-
# will not be installed as a package
24-
_LOGGER.error("Could not read SDK version %r", e, exc_info=True)
25-
return ""
26-
27-
2814
def write(server_info: ServerInfo, info_file: str):
2915
"""
3016
Write the ServerInfo to a file , shared with the client (numa container).

pynumaflow/info/types.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
1+
import logging
2+
import os
13
from dataclasses import dataclass, field
24
from enum import Enum
5+
from importlib.metadata import version
6+
from typing import TypeVar
7+
8+
from pynumaflow import setup_logging
9+
10+
_LOGGER = setup_logging(__name__)
11+
if os.getenv("PYTHONDEBUG"):
12+
_LOGGER.setLevel(logging.DEBUG)
313

414
# Constants for using in the info-server
515
# Specify the minimum Numaflow version required by the current SDK version
@@ -12,6 +22,16 @@
1222
# Format - (key, env_var)
1323
METADATA_ENVS = [("CPU_LIMIT", "NUMAFLOW_CPU_LIMIT")]
1424

25+
# Metadata keys
26+
27+
# MAP_MODE_KEY is used in the server info object's metadata to indicate which map mode is enabled
28+
MAP_MODE_KEY = "MAP_MODE"
29+
# MULTIPROC_KEY is the field used to indicate that Multiproc map mode is enabled
30+
# The value contains the number of servers spawned.
31+
MULTIPROC_KEY = "MULTIPROC"
32+
33+
SI = TypeVar("SI", bound="ServerInfo")
34+
1535

1636
class Protocol(str, Enum):
1737
"""
@@ -32,6 +52,16 @@ class Language(str, Enum):
3252
JAVA = "java"
3353

3454

55+
class MapMode(str, Enum):
56+
"""
57+
Enumerate Map Mode to be enabled
58+
"""
59+
60+
UnaryMap = "unary-map"
61+
StreamMap = "stream-map"
62+
BatchMap = "batch-map"
63+
64+
3565
@dataclass
3666
class ServerInfo:
3767
"""
@@ -50,3 +80,27 @@ class ServerInfo:
5080
minimum_numaflow_version: str
5181
version: str
5282
metadata: dict = field(default_factory=dict)
83+
84+
@classmethod
85+
def get_default_server_info(cls) -> SI:
86+
serv_info = ServerInfo(
87+
protocol=Protocol.UDS,
88+
language=Language.PYTHON,
89+
minimum_numaflow_version=MINIMUM_NUMAFLOW_VERSION,
90+
version=get_sdk_version(),
91+
metadata=dict(),
92+
)
93+
return serv_info
94+
95+
96+
def get_sdk_version() -> str:
97+
"""
98+
Return the pynumaflow SDK version
99+
"""
100+
try:
101+
return version("pynumaflow")
102+
except Exception as e:
103+
# Adding this to handle the case for local test/CI where pynumaflow
104+
# will not be installed as a package
105+
_LOGGER.error("Could not read SDK version %r", e, exc_info=True)
106+
return ""

pynumaflow/mapper/async_server.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
MAP_SERVER_INFO_FILE_PATH,
99
MAX_NUM_THREADS,
1010
)
11+
from pynumaflow.info.types import (
12+
ServerInfo,
13+
MAP_MODE_KEY,
14+
MapMode,
15+
)
1116
from pynumaflow.mapper._dtypes import MapAsyncCallable
1217
from pynumaflow.mapper.servicer.async_servicer import AsyncMapServicer
1318
from pynumaflow.proto.mapper import map_pb2_grpc
@@ -94,15 +99,20 @@ async def aexec(self) -> None:
9499
# same thread as the event loop so that all the async calls are made in the
95100
# same context
96101

97-
server_new = grpc.aio.server()
102+
server_new = grpc.aio.server(options=self._server_options)
98103
server_new.add_insecure_port(self.sock_path)
99104
map_pb2_grpc.add_MapServicer_to_server(self.servicer, server_new)
100105

106+
serv_info = ServerInfo.get_default_server_info()
107+
# Add the MAP_MODE metadata to the server info for the correct map mode
108+
serv_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap
109+
101110
# Start the async server
102111
await start_async_server(
103-
server_new,
104-
self.sock_path,
105-
self.max_threads,
106-
self._server_options,
107-
self.server_info_file,
112+
server_async=server_new,
113+
sock_path=self.sock_path,
114+
max_threads=self.max_threads,
115+
cleanup_coroutines=list(),
116+
server_info_file=self.server_info_file,
117+
server_info=serv_info,
108118
)

pynumaflow/mapper/multiproc_server.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
MAP_SERVER_INFO_FILE_PATH,
88
MAX_NUM_THREADS,
99
)
10+
from pynumaflow.info.server import get_metadata_env
11+
from pynumaflow.info.types import (
12+
ServerInfo,
13+
METADATA_ENVS,
14+
MAP_MODE_KEY,
15+
MapMode,
16+
MULTIPROC_KEY,
17+
)
1018
from pynumaflow.mapper._dtypes import MapSyncCallable
1119
from pynumaflow.mapper.servicer.sync_servicer import SyncMapServicer
1220
from pynumaflow.shared.server import (
@@ -100,6 +108,14 @@ def start(self) -> None:
100108
defined by the user. The max value is capped to 2 * CPU count.
101109
"""
102110

111+
# Create the server info file
112+
server_info = ServerInfo.get_default_server_info()
113+
server_info.metadata = get_metadata_env(envs=METADATA_ENVS)
114+
# Add the MULTIPROC metadata using the number of servers to use
115+
server_info.metadata[MULTIPROC_KEY] = str(self._process_count)
116+
# Add the MAP_MODE metadata to the server info for the correct map mode
117+
server_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap
118+
103119
# Start the multiproc server
104120
start_multiproc_server(
105121
max_threads=self.max_threads,
@@ -108,4 +124,5 @@ def start(self) -> None:
108124
server_info_file=self.server_info_file,
109125
server_options=self._server_options,
110126
udf_type=UDFType.Map,
127+
server_info=server_info,
111128
)

pynumaflow/mapper/sync_server.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
from pynumaflow.info.types import (
2+
ServerInfo,
3+
MAP_MODE_KEY,
4+
MapMode,
5+
)
16
from pynumaflow.mapper.servicer.sync_servicer import SyncMapServicer
27

38
from pynumaflow._constants import (
@@ -100,6 +105,11 @@ def start(self) -> None:
100105
self.sock_path,
101106
self.max_threads,
102107
)
108+
109+
serv_info = ServerInfo.get_default_server_info()
110+
# Add the MAP_MODE metadata to the server info for the correct map mode
111+
serv_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap
112+
103113
# Start the server
104114
sync_server_start(
105115
servicer=self.servicer,
@@ -108,4 +118,5 @@ def start(self) -> None:
108118
server_info_file=self.server_info_file,
109119
server_options=self._server_options,
110120
udf_type=UDFType.Map,
121+
server_info=serv_info,
111122
)

pynumaflow/mapstreamer/async_server.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import aiorun
22
import grpc
33

4+
from pynumaflow.info.types import (
5+
ServerInfo,
6+
MAP_MODE_KEY,
7+
MapMode,
8+
)
49
from pynumaflow.mapstreamer.servicer.async_servicer import AsyncMapStreamServicer
510
from pynumaflow.proto.mapstreamer import mapstream_pb2_grpc
611

@@ -9,7 +14,7 @@
914
MAX_MESSAGE_SIZE,
1015
NUM_THREADS_DEFAULT,
1116
_LOGGER,
12-
MAP_STREAM_SERVER_INFO_FILE_PATH,
17+
MAP_SERVER_INFO_FILE_PATH,
1318
MAX_NUM_THREADS,
1419
)
1520

@@ -29,7 +34,7 @@ def __init__(
2934
sock_path=MAP_STREAM_SOCK_PATH,
3035
max_message_size=MAX_MESSAGE_SIZE,
3136
max_threads=NUM_THREADS_DEFAULT,
32-
server_info_file=MAP_STREAM_SERVER_INFO_FILE_PATH,
37+
server_info_file=MAP_SERVER_INFO_FILE_PATH,
3338
):
3439
"""
3540
Create a new grpc Async Map Stream Server instance.
@@ -113,13 +118,23 @@ async def aexec(self):
113118
# same thread as the event loop so that all the async calls are made in the
114119
# same context
115120
# Create a new async server instance and add the servicer to it
116-
server = grpc.aio.server()
121+
server = grpc.aio.server(options=self._server_options)
117122
server.add_insecure_port(self.sock_path)
118123
mapstream_pb2_grpc.add_MapStreamServicer_to_server(
119124
self.servicer,
120125
server,
121126
)
122127
_LOGGER.info("Starting Map Stream Server")
128+
serv_info = ServerInfo.get_default_server_info()
129+
# Add the MAP_MODE metadata to the server info for the correct map mode
130+
serv_info.metadata[MAP_MODE_KEY] = MapMode.StreamMap
131+
132+
# Start the async server
123133
await start_async_server(
124-
server, self.sock_path, self.max_threads, self._server_options, self.server_info_file
134+
server_async=server,
135+
sock_path=self.sock_path,
136+
max_threads=self.max_threads,
137+
cleanup_coroutines=list(),
138+
server_info_file=self.server_info_file,
139+
server_info=serv_info,
125140
)

0 commit comments

Comments
 (0)