Skip to content

Commit 9311f25

Browse files
authored
feat: side-input for pynumaflow-lite (#296)
Signed-off-by: Vigith Maurice <[email protected]>
1 parent 02e5145 commit 9311f25

File tree

11 files changed

+501
-4
lines changed

11 files changed

+501
-4
lines changed

packages/pynumaflow-lite/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ name = "pynumaflow_lite"
99
crate-type = ["cdylib", "rlib"]
1010

1111
[dependencies]
12-
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "42a1e814459d18b89eb6ca874fd0a989fd134303" }
12+
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" }
1313
pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] }
1414
tokio = "1.47.1"
1515
tonic = "0.14.2"
@@ -63,3 +63,7 @@ path = "tests/bin/source.rs"
6363
[[bin]]
6464
name = "test_sourcetransform"
6565
path = "tests/bin/sourcetransform.rs"
66+
67+
[[bin]]
68+
name = "test_sideinput"
69+
path = "tests/bin/sideinput.rs"

packages/pynumaflow-lite/pynumaflow_lite/__init__.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,13 @@
5353
except Exception: # pragma: no cover
5454
sourcetransformer = None
5555

56+
try:
57+
sideinputer = _import_module(__name__ + ".sideinputer")
58+
except Exception: # pragma: no cover
59+
sideinputer = None
60+
5661
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker,
57-
# Sourcer, and SourceTransformer classes under the extension submodules for convenient access
62+
# Sourcer, SourceTransformer, and SideInput classes under the extension submodules for convenient access
5863
from ._map_dtypes import Mapper
5964
from ._batchmapper_dtypes import BatchMapper
6065
from ._mapstream_dtypes import MapStreamer
@@ -65,6 +70,7 @@
6570
from ._sink_dtypes import Sinker
6671
from ._source_dtypes import Sourcer
6772
from ._sourcetransformer_dtypes import SourceTransformer
73+
from ._sideinput_dtypes import SideInput
6874

6975
if mapper is not None:
7076
try:
@@ -126,9 +132,15 @@
126132
except Exception:
127133
pass
128134

135+
if sideinputer is not None:
136+
try:
137+
setattr(sideinputer, "SideInput", SideInput)
138+
except Exception:
139+
pass
140+
129141
# Public API
130142
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator",
131-
"sinker", "sourcer", "sourcetransformer"]
143+
"sinker", "sourcer", "sourcetransformer", "sideinputer"]
132144

133145
__doc__ = pynumaflow_lite.__doc__
134146
if hasattr(pynumaflow_lite, "__all__"):

packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ from . import session_reducer as session_reducer
1010
from . import accumulator as accumulator
1111
from . import sinker as sinker
1212
from . import sourcer as sourcer
13+
from . import sourcetransformer as sourcetransformer
14+
from . import sideinputer as sideinputer
1315

14-
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer']
16+
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer', 'sourcetransformer', 'sideinputer']
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from abc import ABCMeta, abstractmethod
2+
from pynumaflow_lite.sideinputer import Response
3+
4+
5+
class SideInput(metaclass=ABCMeta):
6+
"""
7+
Provides an interface to write a SideInput retriever
8+
which will be exposed over a gRPC server.
9+
10+
A SideInput is used for periodically retrieving data that can be
11+
broadcast to other vertices in the pipeline.
12+
"""
13+
14+
async def __call__(self, *args, **kwargs):
15+
"""
16+
This allows to execute the handler function directly if
17+
class instance is sent as a callable.
18+
"""
19+
return await self.retrieve_handler(*args, **kwargs)
20+
21+
@abstractmethod
22+
async def retrieve_handler(self) -> Response:
23+
"""
24+
Implement this handler function which implements the SideInput interface.
25+
26+
This function is called every time the side input is requested.
27+
28+
Returns:
29+
Response: Either Response.broadcast_message(value) to broadcast a value,
30+
or Response.no_broadcast_message() to skip broadcasting.
31+
"""
32+
pass
33+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from __future__ import annotations
2+
3+
from typing import Callable, Awaitable, Any
4+
5+
# Re-export the Python ABC for user convenience and typing
6+
from ._sideinput_dtypes import SideInput as SideInput
7+
8+
9+
class Response:
10+
"""Response from the side input retrieve handler."""
11+
12+
value: bytes
13+
broadcast: bool
14+
15+
@staticmethod
16+
def broadcast_message(value: bytes) -> Response:
17+
"""Create a response that broadcasts the given value."""
18+
...
19+
20+
@staticmethod
21+
def no_broadcast_message() -> Response:
22+
"""Create a response that does not broadcast any value."""
23+
...
24+
25+
def __repr__(self) -> str: ...
26+
27+
def __str__(self) -> str: ...
28+
29+
30+
class SideInputAsyncServer:
31+
"""Async SideInput Server that can be started from Python."""
32+
33+
def __init__(
34+
self,
35+
sock_file: str | None = ...,
36+
info_file: str | None = ...,
37+
) -> None: ...
38+
39+
def start(self, py_sideinput: SideInput) -> Awaitable[None]: ...
40+
41+
def stop(self) -> None: ...
42+
43+
44+
DIR_PATH: str
45+
"""Default directory path where side input files are stored."""
46+
47+
__all__ = [
48+
"Response",
49+
"SideInputAsyncServer",
50+
"SideInput",
51+
"DIR_PATH",
52+
]
53+

packages/pynumaflow-lite/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod pyrs;
77
pub mod reduce;
88
pub mod reducestream;
99
pub mod session_reduce;
10+
pub mod sideinput;
1011
pub mod sink;
1112
pub mod source;
1213
pub mod sourcetransform;
@@ -83,6 +84,13 @@ fn sourcetransformer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
8384
Ok(())
8485
}
8586

87+
/// Submodule: pynumaflow_lite.sideinputer
88+
#[pymodule]
89+
fn sideinputer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
90+
crate::sideinput::populate_py_module(m)?;
91+
Ok(())
92+
}
93+
8694
/// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`.
8795
#[pymodule]
8896
fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
@@ -97,6 +105,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
97105
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
98106
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
99107
m.add_wrapped(pyo3::wrap_pymodule!(sourcetransformer))?;
108+
m.add_wrapped(pyo3::wrap_pymodule!(sideinputer))?;
100109

101110
// Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access
102111
let binding = m.getattr("mapper")?;
@@ -188,5 +197,14 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
188197
.getattr("modules")?
189198
.set_item(fullname, &sub)?;
190199

200+
// Ensure it's importable as `pynumaflow_lite.sideinputer` as well
201+
let binding = m.getattr("sideinputer")?;
202+
let sub = binding.cast::<PyModule>()?;
203+
let fullname = "pynumaflow_lite.sideinputer";
204+
sub.setattr("__name__", fullname)?;
205+
py.import("sys")?
206+
.getattr("modules")?
207+
.set_item(fullname, &sub)?;
208+
191209
Ok(())
192210
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/// Side Input interface managed by Python. It means Python code will start the server
2+
/// and can pass in the Python class implementing the SideInput interface.
3+
pub mod server;
4+
5+
use pyo3::prelude::*;
6+
use std::sync::Mutex;
7+
8+
/// Response from the side input retrieve handler.
9+
/// Indicates whether to broadcast a value or not.
10+
#[pyclass(module = "pynumaflow_lite.sideinputer")]
11+
#[derive(Clone, Debug)]
12+
pub struct Response {
13+
/// The value to broadcast (if any).
14+
#[pyo3(get)]
15+
pub value: Vec<u8>,
16+
/// Whether to broadcast this value.
17+
#[pyo3(get)]
18+
pub broadcast: bool,
19+
}
20+
21+
#[pymethods]
22+
impl Response {
23+
/// Create a new Response that broadcasts the given value.
24+
#[staticmethod]
25+
#[pyo3(signature = (value))]
26+
fn broadcast_message(value: Vec<u8>) -> Self {
27+
Self {
28+
value,
29+
broadcast: true,
30+
}
31+
}
32+
33+
/// Create a new Response that does not broadcast any value.
34+
#[staticmethod]
35+
#[pyo3(signature = ())]
36+
fn no_broadcast_message() -> Self {
37+
Self {
38+
value: vec![],
39+
broadcast: false,
40+
}
41+
}
42+
43+
fn __repr__(&self) -> String {
44+
format!(
45+
"Response(value={:?}, broadcast={})",
46+
self.value, self.broadcast
47+
)
48+
}
49+
50+
fn __str__(&self) -> String {
51+
self.__repr__()
52+
}
53+
}
54+
55+
/// Async SideInput Server that can be started from Python code which will run the Python UDF.
56+
#[pyclass(module = "pynumaflow_lite.sideinputer")]
57+
pub struct SideInputAsyncServer {
58+
sock_file: String,
59+
info_file: String,
60+
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
61+
}
62+
63+
#[pymethods]
64+
impl SideInputAsyncServer {
65+
/// Create a new SideInputAsyncServer.
66+
///
67+
/// Args:
68+
/// sock_file: Path to the Unix domain socket file.
69+
/// info_file: Path to the server info file.
70+
#[new]
71+
#[pyo3(signature = (sock_file=None, info_file=None))]
72+
fn new(sock_file: Option<String>, info_file: Option<String>) -> Self {
73+
Self {
74+
sock_file: sock_file.unwrap_or_else(|| "/var/run/numaflow/sideinput.sock".to_string()),
75+
info_file: info_file
76+
.unwrap_or_else(|| "/var/run/numaflow/sideinput-server-info".to_string()),
77+
shutdown_tx: Mutex::new(None),
78+
}
79+
}
80+
81+
/// Start the server with the given Python SideInput class instance.
82+
#[pyo3(signature = (py_sideinput: "SideInput") -> "None")]
83+
pub fn start<'a>(&self, py: Python<'a>, py_sideinput: Py<PyAny>) -> PyResult<Bound<'a, PyAny>> {
84+
let sock_file = self.sock_file.clone();
85+
let info_file = self.info_file.clone();
86+
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
87+
{
88+
let mut guard = self.shutdown_tx.lock().unwrap();
89+
*guard = Some(tx);
90+
}
91+
92+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
93+
crate::sideinput::server::start(py_sideinput, sock_file, info_file, rx)
94+
.await
95+
.expect("server failed to start");
96+
Ok(())
97+
})
98+
}
99+
100+
/// Trigger server shutdown from Python (idempotent).
101+
#[pyo3(signature = () -> "None")]
102+
pub fn stop(&self) -> PyResult<()> {
103+
if let Some(tx) = self.shutdown_tx.lock().unwrap().take() {
104+
let _ = tx.send(());
105+
}
106+
Ok(())
107+
}
108+
}
109+
110+
/// Helper to populate a PyModule with sideinput types/functions.
111+
pub(crate) fn populate_py_module(m: &Bound<PyModule>) -> PyResult<()> {
112+
m.add_class::<Response>()?;
113+
m.add_class::<SideInputAsyncServer>()?;
114+
m.add("DIR_PATH", numaflow::sideinput::DIR_PATH)?;
115+
116+
Ok(())
117+
}

0 commit comments

Comments
 (0)