Skip to content

Commit f46fbed

Browse files
authored
chore: refactor reduce handler argument (#201)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent f2f7bf6 commit f46fbed

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

pynumaflow/reducer/async_server.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def get_handler(
3535
"""
3636
if inspect.isfunction(reducer_handler):
3737
if len(init_args) > 0 or len(init_kwargs) > 0:
38-
# if the init_args or init_kwargs are passed, then the reducer_handler
38+
# if the init_args or init_kwargs are passed, then the reducer_instance
3939
# can only be of class Reducer type
4040
raise TypeError("Cannot pass function handler with init args or kwargs")
4141
# return the function handler
@@ -58,7 +58,7 @@ class ReduceAsyncServer(NumaflowServer):
5858
A new servicer instance is created and attached to the server.
5959
The server instance is returned.
6060
Args:
61-
reducer_handler: The reducer instance to be used for Reduce UDF
61+
reducer_instance: The reducer instance to be used for Reduce UDF
6262
sock_path: The UNIX socket path to be used for the server
6363
max_message_size: The max message size in bytes the server can receive and send
6464
max_threads: The max number of threads to be spawned;
@@ -115,7 +115,7 @@ async def reduce_handler(keys: list[str],
115115

116116
def __init__(
117117
self,
118-
reducer_handler: ReduceCallable,
118+
reducer_instance: ReduceCallable,
119119
init_args: tuple = (),
120120
init_kwargs: dict = None,
121121
sock_path=REDUCE_SOCK_PATH,
@@ -137,7 +137,7 @@ def __init__(
137137
"""
138138
if init_kwargs is None:
139139
init_kwargs = {}
140-
self.reducer_handler = get_handler(reducer_handler, init_args, init_kwargs)
140+
self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
141141
self.sock_path = f"unix://{sock_path}"
142142
self.max_message_size = max_message_size
143143
self.max_threads = min(max_threads, MAX_NUM_THREADS)

pynumaflow/reducestreamer/async_server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def get_handler(
3535
"""
3636
if inspect.isfunction(reducer_handler):
3737
if init_args or init_kwargs:
38-
# if the init_args or init_kwargs are passed, then the reduce_stream_handler
38+
# if the init_args or init_kwargs are passed, then the reduce_stream_instance
3939
# can only be of class ReduceStreamer type
4040
raise TypeError("Cannot pass function handler with init args or kwargs")
4141
# return the function handler
@@ -60,7 +60,7 @@ class ReduceStreamAsyncServer(NumaflowServer):
6060
A new servicer instance is created and attached to the server.
6161
The server instance is returned.
6262
Args:
63-
reduce_stream_handler: The reducer instance to be used for
63+
reduce_stream_instance: The reducer instance to be used for
6464
Reduce Streaming UDF
6565
init_args: The arguments to be passed to the reduce_stream_handler
6666
init_kwargs: The keyword arguments to be passed to the
@@ -128,7 +128,7 @@ async def reduce_handler(
128128

129129
def __init__(
130130
self,
131-
reduce_stream_handler: ReduceStreamCallable,
131+
reduce_stream_instance: ReduceStreamCallable,
132132
init_args: tuple = (),
133133
init_kwargs: dict = None,
134134
sock_path=REDUCE_STREAM_SOCK_PATH,
@@ -141,7 +141,7 @@ def __init__(
141141
A new servicer instance is created and attached to the server.
142142
The server instance is returned.
143143
Args:
144-
reduce_stream_handler: The reducer instance to be used for
144+
reduce_stream_instance: The reducer instance to be used for
145145
Reduce Streaming UDF
146146
init_args: The arguments to be passed to the reduce_stream_handler
147147
init_kwargs: The keyword arguments to be passed to the
@@ -154,7 +154,7 @@ def __init__(
154154
"""
155155
if init_kwargs is None:
156156
init_kwargs = {}
157-
self.reduce_stream_handler = get_handler(reduce_stream_handler, init_args, init_kwargs)
157+
self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
158158
self.sock_path = f"unix://{sock_path}"
159159
self.max_message_size = max_message_size
160160
self.max_threads = min(max_threads, MAX_NUM_THREADS)

tests/reduce/test_async_reduce.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def __stub(self):
231231
return reduce_pb2_grpc.ReduceStub(_channel)
232232

233233
def test_error_init(self):
234-
# Check that reducer_handler in required
234+
# Check that reducer_instance in required
235235
with self.assertRaises(TypeError):
236236
ReduceAsyncServer()
237237
# Check that the init_args and init_kwargs are passed
@@ -248,19 +248,19 @@ class ExampleBadClass:
248248
pass
249249

250250
with self.assertRaises(TypeError):
251-
ReduceAsyncServer(reducer_handler=ExampleBadClass)
251+
ReduceAsyncServer(reducer_instance=ExampleBadClass)
252252

253253
def test_max_threads(self):
254254
# max cap at 16
255-
server = ReduceAsyncServer(reducer_handler=ExampleClass, max_threads=32)
255+
server = ReduceAsyncServer(reducer_instance=ExampleClass, max_threads=32)
256256
self.assertEqual(server.max_threads, 16)
257257

258258
# use argument provided
259-
server = ReduceAsyncServer(reducer_handler=ExampleClass, max_threads=5)
259+
server = ReduceAsyncServer(reducer_instance=ExampleClass, max_threads=5)
260260
self.assertEqual(server.max_threads, 5)
261261

262262
# defaults to 4
263-
server = ReduceAsyncServer(reducer_handler=ExampleClass)
263+
server = ReduceAsyncServer(reducer_instance=ExampleClass)
264264
self.assertEqual(server.max_threads, 4)
265265

266266

tests/reducestreamer/test_async_reduce.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def __stub(self):
262262
return reduce_pb2_grpc.ReduceStub(_channel)
263263

264264
def test_error_init(self):
265-
# Check that reducer_handler in required
265+
# Check that reducer_instance in required
266266
with self.assertRaises(TypeError):
267267
ReduceStreamAsyncServer()
268268
# Check that the init_args and init_kwargs are passed
@@ -279,19 +279,19 @@ class ExampleBadClass:
279279
pass
280280

281281
with self.assertRaises(TypeError):
282-
ReduceStreamAsyncServer(reduce_stream_handler=ExampleBadClass)
282+
ReduceStreamAsyncServer(reduce_stream_instance=ExampleBadClass)
283283

284284
def test_max_threads(self):
285285
# max cap at 16
286-
server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass, max_threads=32)
286+
server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass, max_threads=32)
287287
self.assertEqual(server.max_threads, 16)
288288

289289
# use argument provided
290-
server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass, max_threads=5)
290+
server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass, max_threads=5)
291291
self.assertEqual(server.max_threads, 5)
292292

293293
# defaults to 4
294-
server = ReduceStreamAsyncServer(reduce_stream_handler=ExampleClass)
294+
server = ReduceStreamAsyncServer(reduce_stream_instance=ExampleClass)
295295
self.assertEqual(server.max_threads, 4)
296296

297297

0 commit comments

Comments
 (0)