diff --git a/packages/pynumaflow-lite/Cargo.toml b/packages/pynumaflow-lite/Cargo.toml index c5025cf0..c19a0fa7 100644 --- a/packages/pynumaflow-lite/Cargo.toml +++ b/packages/pynumaflow-lite/Cargo.toml @@ -9,7 +9,7 @@ name = "pynumaflow_lite" crate-type = ["cdylib", "rlib"] [dependencies] -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "42a1e814459d18b89eb6ca874fd0a989fd134303" } +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" } pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] } tokio = "1.47.1" tonic = "0.14.2" @@ -63,3 +63,7 @@ path = "tests/bin/source.rs" [[bin]] name = "test_sourcetransform" path = "tests/bin/sourcetransform.rs" + +[[bin]] +name = "test_sideinput" +path = "tests/bin/sideinput.rs" diff --git a/packages/pynumaflow-lite/pynumaflow_lite/__init__.py b/packages/pynumaflow-lite/pynumaflow_lite/__init__.py index 07b309fe..477e7b72 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/__init__.py +++ b/packages/pynumaflow-lite/pynumaflow_lite/__init__.py @@ -53,8 +53,13 @@ except Exception: # pragma: no cover sourcetransformer = None +try: + sideinputer = _import_module(__name__ + ".sideinputer") +except Exception: # pragma: no cover + sideinputer = None + # Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker, -# Sourcer, and SourceTransformer classes under the extension submodules for convenient access +# Sourcer, SourceTransformer, and SideInput classes under the extension submodules for convenient access from ._map_dtypes import Mapper from ._batchmapper_dtypes import BatchMapper from ._mapstream_dtypes import MapStreamer @@ -65,6 +70,7 @@ from ._sink_dtypes import Sinker from ._source_dtypes import Sourcer from ._sourcetransformer_dtypes import SourceTransformer +from ._sideinput_dtypes import SideInput if mapper is not None: try: @@ -126,9 +132,15 @@ except Exception: pass +if sideinputer is not None: + try: + setattr(sideinputer, "SideInput", SideInput) + except Exception: + pass + # Public API __all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator", - "sinker", "sourcer", "sourcetransformer"] + "sinker", "sourcer", "sourcetransformer", "sideinputer"] __doc__ = pynumaflow_lite.__doc__ if hasattr(pynumaflow_lite, "__all__"): diff --git a/packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi b/packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi index 6d6cbbb7..30d3d3f2 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi @@ -10,5 +10,7 @@ from . import session_reducer as session_reducer from . import accumulator as accumulator from . import sinker as sinker from . import sourcer as sourcer +from . import sourcetransformer as sourcetransformer +from . import sideinputer as sideinputer -__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer'] +__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer', 'sourcetransformer', 'sideinputer'] diff --git a/packages/pynumaflow-lite/pynumaflow_lite/_sideinput_dtypes.py b/packages/pynumaflow-lite/pynumaflow_lite/_sideinput_dtypes.py new file mode 100644 index 00000000..4400956d --- /dev/null +++ b/packages/pynumaflow-lite/pynumaflow_lite/_sideinput_dtypes.py @@ -0,0 +1,33 @@ +from abc import ABCMeta, abstractmethod +from pynumaflow_lite.sideinputer import Response + + +class SideInput(metaclass=ABCMeta): + """ + Provides an interface to write a SideInput retriever + which will be exposed over a gRPC server. + + A SideInput is used for periodically retrieving data that can be + broadcast to other vertices in the pipeline. + """ + + async def __call__(self, *args, **kwargs): + """ + This allows to execute the handler function directly if + class instance is sent as a callable. + """ + return await self.retrieve_handler(*args, **kwargs) + + @abstractmethod + async def retrieve_handler(self) -> Response: + """ + Implement this handler function which implements the SideInput interface. + + This function is called every time the side input is requested. + + Returns: + Response: Either Response.broadcast_message(value) to broadcast a value, + or Response.no_broadcast_message() to skip broadcasting. + """ + pass + diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sideinputer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sideinputer.pyi new file mode 100644 index 00000000..475b6d1c --- /dev/null +++ b/packages/pynumaflow-lite/pynumaflow_lite/sideinputer.pyi @@ -0,0 +1,53 @@ +from __future__ import annotations + +from typing import Callable, Awaitable, Any + +# Re-export the Python ABC for user convenience and typing +from ._sideinput_dtypes import SideInput as SideInput + + +class Response: + """Response from the side input retrieve handler.""" + + value: bytes + broadcast: bool + + @staticmethod + def broadcast_message(value: bytes) -> Response: + """Create a response that broadcasts the given value.""" + ... + + @staticmethod + def no_broadcast_message() -> Response: + """Create a response that does not broadcast any value.""" + ... + + def __repr__(self) -> str: ... + + def __str__(self) -> str: ... + + +class SideInputAsyncServer: + """Async SideInput Server that can be started from Python.""" + + def __init__( + self, + sock_file: str | None = ..., + info_file: str | None = ..., + ) -> None: ... + + def start(self, py_sideinput: SideInput) -> Awaitable[None]: ... + + def stop(self) -> None: ... + + +DIR_PATH: str +"""Default directory path where side input files are stored.""" + +__all__ = [ + "Response", + "SideInputAsyncServer", + "SideInput", + "DIR_PATH", +] + diff --git a/packages/pynumaflow-lite/src/lib.rs b/packages/pynumaflow-lite/src/lib.rs index b3aa76ec..0f5a7c2d 100644 --- a/packages/pynumaflow-lite/src/lib.rs +++ b/packages/pynumaflow-lite/src/lib.rs @@ -7,6 +7,7 @@ pub mod pyrs; pub mod reduce; pub mod reducestream; pub mod session_reduce; +pub mod sideinput; pub mod sink; pub mod source; pub mod sourcetransform; @@ -83,6 +84,13 @@ fn sourcetransformer(_py: Python, m: &Bound) -> PyResult<()> { Ok(()) } +/// Submodule: pynumaflow_lite.sideinputer +#[pymodule] +fn sideinputer(_py: Python, m: &Bound) -> PyResult<()> { + crate::sideinput::populate_py_module(m)?; + Ok(()) +} + /// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`. #[pymodule] fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { @@ -97,6 +105,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { m.add_wrapped(pyo3::wrap_pymodule!(sinker))?; m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?; m.add_wrapped(pyo3::wrap_pymodule!(sourcetransformer))?; + m.add_wrapped(pyo3::wrap_pymodule!(sideinputer))?; // Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access let binding = m.getattr("mapper")?; @@ -188,5 +197,14 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { .getattr("modules")? .set_item(fullname, &sub)?; + // Ensure it's importable as `pynumaflow_lite.sideinputer` as well + let binding = m.getattr("sideinputer")?; + let sub = binding.cast::()?; + let fullname = "pynumaflow_lite.sideinputer"; + sub.setattr("__name__", fullname)?; + py.import("sys")? + .getattr("modules")? + .set_item(fullname, &sub)?; + Ok(()) } diff --git a/packages/pynumaflow-lite/src/sideinput/mod.rs b/packages/pynumaflow-lite/src/sideinput/mod.rs new file mode 100644 index 00000000..9ef24155 --- /dev/null +++ b/packages/pynumaflow-lite/src/sideinput/mod.rs @@ -0,0 +1,117 @@ +/// Side Input interface managed by Python. It means Python code will start the server +/// and can pass in the Python class implementing the SideInput interface. +pub mod server; + +use pyo3::prelude::*; +use std::sync::Mutex; + +/// Response from the side input retrieve handler. +/// Indicates whether to broadcast a value or not. +#[pyclass(module = "pynumaflow_lite.sideinputer")] +#[derive(Clone, Debug)] +pub struct Response { + /// The value to broadcast (if any). + #[pyo3(get)] + pub value: Vec, + /// Whether to broadcast this value. + #[pyo3(get)] + pub broadcast: bool, +} + +#[pymethods] +impl Response { + /// Create a new Response that broadcasts the given value. + #[staticmethod] + #[pyo3(signature = (value))] + fn broadcast_message(value: Vec) -> Self { + Self { + value, + broadcast: true, + } + } + + /// Create a new Response that does not broadcast any value. + #[staticmethod] + #[pyo3(signature = ())] + fn no_broadcast_message() -> Self { + Self { + value: vec![], + broadcast: false, + } + } + + fn __repr__(&self) -> String { + format!( + "Response(value={:?}, broadcast={})", + self.value, self.broadcast + ) + } + + fn __str__(&self) -> String { + self.__repr__() + } +} + +/// Async SideInput Server that can be started from Python code which will run the Python UDF. +#[pyclass(module = "pynumaflow_lite.sideinputer")] +pub struct SideInputAsyncServer { + sock_file: String, + info_file: String, + shutdown_tx: Mutex>>, +} + +#[pymethods] +impl SideInputAsyncServer { + /// Create a new SideInputAsyncServer. + /// + /// Args: + /// sock_file: Path to the Unix domain socket file. + /// info_file: Path to the server info file. + #[new] + #[pyo3(signature = (sock_file=None, info_file=None))] + fn new(sock_file: Option, info_file: Option) -> Self { + Self { + sock_file: sock_file.unwrap_or_else(|| "/var/run/numaflow/sideinput.sock".to_string()), + info_file: info_file + .unwrap_or_else(|| "/var/run/numaflow/sideinput-server-info".to_string()), + shutdown_tx: Mutex::new(None), + } + } + + /// Start the server with the given Python SideInput class instance. + #[pyo3(signature = (py_sideinput: "SideInput") -> "None")] + pub fn start<'a>(&self, py: Python<'a>, py_sideinput: Py) -> PyResult> { + let sock_file = self.sock_file.clone(); + let info_file = self.info_file.clone(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + { + let mut guard = self.shutdown_tx.lock().unwrap(); + *guard = Some(tx); + } + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + crate::sideinput::server::start(py_sideinput, sock_file, info_file, rx) + .await + .expect("server failed to start"); + Ok(()) + }) + } + + /// Trigger server shutdown from Python (idempotent). + #[pyo3(signature = () -> "None")] + pub fn stop(&self) -> PyResult<()> { + if let Some(tx) = self.shutdown_tx.lock().unwrap().take() { + let _ = tx.send(()); + } + Ok(()) + } +} + +/// Helper to populate a PyModule with sideinput types/functions. +pub(crate) fn populate_py_module(m: &Bound) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + m.add("DIR_PATH", numaflow::sideinput::DIR_PATH)?; + + Ok(()) +} diff --git a/packages/pynumaflow-lite/src/sideinput/server.rs b/packages/pynumaflow-lite/src/sideinput/server.rs new file mode 100644 index 00000000..7d6f3cfb --- /dev/null +++ b/packages/pynumaflow-lite/src/sideinput/server.rs @@ -0,0 +1,91 @@ +use crate::sideinput::Response; +use numaflow::sideinput; + +use pyo3::prelude::*; +use std::sync::Arc; + +pub(crate) struct PySideInputRunner { + pub(crate) event_loop: Arc>, + pub(crate) py_sideinput: Arc>, +} + +#[tonic::async_trait] +impl sideinput::SideInputer for PySideInputRunner { + async fn retrieve_sideinput(&self) -> Option> { + let py_sideinput = self.py_sideinput.clone(); + let event_loop = self.event_loop.clone(); + + // Call the Python async retrieve_handler method + let fut = Python::attach(|py| { + let locals = pyo3_async_runtimes::TaskLocals::new(event_loop.bind(py).clone()); + + let coro = py_sideinput + .call_method0(py, "retrieve_handler") + .unwrap() + .into_bound(py); + + pyo3_async_runtimes::into_future_with_locals(&locals, coro).unwrap() + }); + + let result = fut.await.unwrap(); + + let response = Python::attach(|py| { + let response: Response = result.extract(py).unwrap(); + response + }); + + if response.broadcast { + Some(response.value) + } else { + None + } + } +} + +// Start the sideinput server by spinning up a dedicated Python asyncio loop and wiring shutdown. +pub(super) async fn start( + py_sideinput: Py, + sock_file: String, + info_file: String, + shutdown_rx: tokio::sync::oneshot::Receiver<()>, +) -> Result<(), pyo3::PyErr> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let py_asyncio_loop_handle = tokio::task::spawn_blocking(move || crate::pyrs::run_asyncio(tx)); + let event_loop = rx.await.unwrap(); + + let (sig_handle, combined_rx) = crate::pyrs::setup_sig_handler(shutdown_rx); + + let py_sideinput_runner = PySideInputRunner { + py_sideinput: Arc::new(py_sideinput), + event_loop: event_loop.clone(), + }; + + let mut server = sideinput::Server::new(py_sideinput_runner) + .with_socket_file(&sock_file) + .with_server_info_file(&info_file); + + let result = server + .start_with_shutdown(combined_rx) + .await + .map_err(|e| pyo3::PyErr::new::(e.to_string())); + + // Ensure the event loop is stopped even if shutdown came from elsewhere. + Python::attach(|py| { + if let Ok(stop_cb) = event_loop.getattr(py, "stop") { + let _ = event_loop.call_method1(py, "call_soon_threadsafe", (stop_cb,)); + } + }); + + println!("Numaflow SideInput has shutdown..."); + + // Wait for the blocking asyncio thread to finish. + let _ = py_asyncio_loop_handle.await; + + // if not finished, abort it + if !sig_handle.is_finished() { + println!("Aborting signal handler"); + sig_handle.abort(); + } + + result +} diff --git a/packages/pynumaflow-lite/tests/bin/sideinput.rs b/packages/pynumaflow-lite/tests/bin/sideinput.rs new file mode 100644 index 00000000..d5580bb0 --- /dev/null +++ b/packages/pynumaflow-lite/tests/bin/sideinput.rs @@ -0,0 +1,88 @@ +use std::env; +use std::path::PathBuf; + +use numaflow::proto::side_input::side_input_client::SideInputClient; +use tokio::net::UnixStream; +use tonic::transport::Uri; +use tower::service_fn; + +// Simple Rust client binary that exercises the SideInput server over Unix Domain Socket. +#[tokio::main] +async fn main() -> Result<(), Box> { + // Allow overriding the socket path via first CLI arg or env var. + let sock_path = env::args() + .nth(1) + .or_else(|| env::var("NUMAFLOW_SIDEINPUT_SOCK").ok()) + .unwrap_or_else(|| "/tmp/var/run/numaflow/sideinput.sock".to_string()); + + // Set up tonic channel over Unix Domain Socket. + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_: Uri| { + let sock = PathBuf::from(sock_path.clone()); + async move { + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( + UnixStream::connect(sock).await?, + )) + } + })) + .await?; + + let mut client = SideInputClient::new(channel); + + // Test 1: First call should broadcast (counter = 1, odd) + let response = client.retrieve_side_input(()).await?; + let resp = response.into_inner(); + println!( + "Response 1: value={:?}, no_broadcast={}", + String::from_utf8_lossy(&resp.value), + resp.no_broadcast + ); + assert!(!resp.no_broadcast, "First call should broadcast"); + assert!(!resp.value.is_empty(), "First call should have a value"); + assert!( + String::from_utf8_lossy(&resp.value).starts_with("an example:"), + "Value should start with 'an example:'" + ); + + // Test 2: Second call should NOT broadcast (counter = 2, even) + let response = client.retrieve_side_input(()).await?; + let resp = response.into_inner(); + println!( + "Response 2: value={:?}, no_broadcast={}", + String::from_utf8_lossy(&resp.value), + resp.no_broadcast + ); + assert!(resp.no_broadcast, "Second call should not broadcast"); + assert!(resp.value.is_empty(), "Second call should have empty value"); + + // Test 3: Third call should broadcast again (counter = 3, odd) + let response = client.retrieve_side_input(()).await?; + let resp = response.into_inner(); + println!( + "Response 3: value={:?}, no_broadcast={}", + String::from_utf8_lossy(&resp.value), + resp.no_broadcast + ); + assert!(!resp.no_broadcast, "Third call should broadcast"); + assert!(!resp.value.is_empty(), "Third call should have a value"); + + // Test 4: Fourth call should NOT broadcast (counter = 4, even) + let response = client.retrieve_side_input(()).await?; + let resp = response.into_inner(); + println!( + "Response 4: value={:?}, no_broadcast={}", + String::from_utf8_lossy(&resp.value), + resp.no_broadcast + ); + assert!(resp.no_broadcast, "Fourth call should not broadcast"); + + // Test is_ready endpoint + let ready_response = client.is_ready(()).await?; + let ready = ready_response.into_inner(); + println!("IsReady: {}", ready.ready); + assert!(ready.ready, "Server should be ready"); + + println!("All side input tests passed!"); + + Ok(()) +} diff --git a/packages/pynumaflow-lite/tests/examples/sideinput_example.py b/packages/pynumaflow-lite/tests/examples/sideinput_example.py new file mode 100644 index 00000000..b23b41da --- /dev/null +++ b/packages/pynumaflow-lite/tests/examples/sideinput_example.py @@ -0,0 +1,56 @@ +import asyncio +import signal +import datetime +from pynumaflow_lite import sideinputer + + +class ExampleSideInput(sideinputer.SideInput): + """ + An example SideInput that broadcasts a message every other time. + """ + + def __init__(self): + self.counter = 0 + + async def retrieve_handler(self) -> sideinputer.Response: + """ + This function is called every time the side input is requested. + """ + time_now = datetime.datetime.now() + # val is the value to be broadcasted + val = f"an example: {str(time_now)}" + self.counter += 1 + # broadcast every other time + if self.counter % 2 == 0: + # no_broadcast_message() is used to indicate that there is no broadcast + return sideinputer.Response.no_broadcast_message() + # broadcast_message() is used to indicate that there is a broadcast + return sideinputer.Response.broadcast_message(val.encode("utf-8")) + + +async def main(): + # Create the server with custom socket paths for testing + server = sideinputer.SideInputAsyncServer( + sock_file="/tmp/var/run/numaflow/sideinput.sock", + info_file="/tmp/var/run/numaflow/sideinput-server-info", + ) + + # Create the side input instance + side_input = ExampleSideInput() + + # Set up signal handling for graceful shutdown + loop = asyncio.get_running_loop() + + def handle_signal(): + server.stop() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, handle_signal) + + # Start the server + await server.start(side_input) + + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/packages/pynumaflow-lite/tests/test_sideinput.py b/packages/pynumaflow-lite/tests/test_sideinput.py new file mode 100644 index 00000000..8922dfa7 --- /dev/null +++ b/packages/pynumaflow-lite/tests/test_sideinput.py @@ -0,0 +1,23 @@ +from pathlib import Path + +import pytest + +from _test_utils import run_python_server_with_rust_client + +SOCK_PATH = Path("/tmp/var/run/numaflow/sideinput.sock") +SERVER_INFO = Path("/tmp/var/run/numaflow/sideinput-server-info") + +SCRIPTS = [ + "sideinput_example.py", +] + + +@pytest.mark.parametrize("script", SCRIPTS) +def test_python_server_and_rust_client(script: str, tmp_path: Path): + run_python_server_with_rust_client( + script=script, + sock_path=SOCK_PATH, + server_info_path=SERVER_INFO, + rust_bin_name="test_sideinput", + ) +