Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/pynumaflow-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name = "pynumaflow_lite"
crate-type = ["cdylib", "rlib"]

[dependencies]
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "42a1e814459d18b89eb6ca874fd0a989fd134303" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" }
pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] }
tokio = "1.47.1"
tonic = "0.14.2"
Expand Down Expand Up @@ -63,3 +63,7 @@ path = "tests/bin/source.rs"
[[bin]]
name = "test_sourcetransform"
path = "tests/bin/sourcetransform.rs"

[[bin]]
name = "test_sideinput"
path = "tests/bin/sideinput.rs"
16 changes: 14 additions & 2 deletions packages/pynumaflow-lite/pynumaflow_lite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@
except Exception: # pragma: no cover
sourcetransformer = None

try:
sideinputer = _import_module(__name__ + ".sideinputer")
except Exception: # pragma: no cover
sideinputer = None

# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker,
# Sourcer, and SourceTransformer classes under the extension submodules for convenient access
# Sourcer, SourceTransformer, and SideInput classes under the extension submodules for convenient access
from ._map_dtypes import Mapper
from ._batchmapper_dtypes import BatchMapper
from ._mapstream_dtypes import MapStreamer
Expand All @@ -65,6 +70,7 @@
from ._sink_dtypes import Sinker
from ._source_dtypes import Sourcer
from ._sourcetransformer_dtypes import SourceTransformer
from ._sideinput_dtypes import SideInput

if mapper is not None:
try:
Expand Down Expand Up @@ -126,9 +132,15 @@
except Exception:
pass

if sideinputer is not None:
try:
setattr(sideinputer, "SideInput", SideInput)
except Exception:
pass

# Public API
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator",
"sinker", "sourcer", "sourcetransformer"]
"sinker", "sourcer", "sourcetransformer", "sideinputer"]

__doc__ = pynumaflow_lite.__doc__
if hasattr(pynumaflow_lite, "__all__"):
Expand Down
4 changes: 3 additions & 1 deletion packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ from . import session_reducer as session_reducer
from . import accumulator as accumulator
from . import sinker as sinker
from . import sourcer as sourcer
from . import sourcetransformer as sourcetransformer
from . import sideinputer as sideinputer

__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer']
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer', 'sourcetransformer', 'sideinputer']
33 changes: 33 additions & 0 deletions packages/pynumaflow-lite/pynumaflow_lite/_sideinput_dtypes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABCMeta, abstractmethod
from pynumaflow_lite.sideinputer import Response


class SideInput(metaclass=ABCMeta):
"""
Provides an interface to write a SideInput retriever
which will be exposed over a gRPC server.

A SideInput is used for periodically retrieving data that can be
broadcast to other vertices in the pipeline.
"""

async def __call__(self, *args, **kwargs):
"""
This allows to execute the handler function directly if
class instance is sent as a callable.
"""
return await self.retrieve_handler(*args, **kwargs)

@abstractmethod
async def retrieve_handler(self) -> Response:
"""
Implement this handler function which implements the SideInput interface.

This function is called every time the side input is requested.

Returns:
Response: Either Response.broadcast_message(value) to broadcast a value,
or Response.no_broadcast_message() to skip broadcasting.
"""
pass

53 changes: 53 additions & 0 deletions packages/pynumaflow-lite/pynumaflow_lite/sideinputer.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import annotations

from typing import Callable, Awaitable, Any

# Re-export the Python ABC for user convenience and typing
from ._sideinput_dtypes import SideInput as SideInput


class Response:
"""Response from the side input retrieve handler."""

value: bytes
broadcast: bool

@staticmethod
def broadcast_message(value: bytes) -> Response:
"""Create a response that broadcasts the given value."""
...

@staticmethod
def no_broadcast_message() -> Response:
"""Create a response that does not broadcast any value."""
...

def __repr__(self) -> str: ...

def __str__(self) -> str: ...


class SideInputAsyncServer:
"""Async SideInput Server that can be started from Python."""

def __init__(
self,
sock_file: str | None = ...,
info_file: str | None = ...,
) -> None: ...

def start(self, py_sideinput: SideInput) -> Awaitable[None]: ...

def stop(self) -> None: ...


DIR_PATH: str
"""Default directory path where side input files are stored."""

__all__ = [
"Response",
"SideInputAsyncServer",
"SideInput",
"DIR_PATH",
]

18 changes: 18 additions & 0 deletions packages/pynumaflow-lite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod pyrs;
pub mod reduce;
pub mod reducestream;
pub mod session_reduce;
pub mod sideinput;
pub mod sink;
pub mod source;
pub mod sourcetransform;
Expand Down Expand Up @@ -83,6 +84,13 @@ fn sourcetransformer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
Ok(())
}

/// Submodule: pynumaflow_lite.sideinputer
#[pymodule]
fn sideinputer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
crate::sideinput::populate_py_module(m)?;
Ok(())
}

/// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`.
#[pymodule]
fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
Expand All @@ -97,6 +105,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
m.add_wrapped(pyo3::wrap_pymodule!(sourcetransformer))?;
m.add_wrapped(pyo3::wrap_pymodule!(sideinputer))?;

// Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access
let binding = m.getattr("mapper")?;
Expand Down Expand Up @@ -188,5 +197,14 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
.getattr("modules")?
.set_item(fullname, &sub)?;

// Ensure it's importable as `pynumaflow_lite.sideinputer` as well
let binding = m.getattr("sideinputer")?;
let sub = binding.cast::<PyModule>()?;
let fullname = "pynumaflow_lite.sideinputer";
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;

Ok(())
}
117 changes: 117 additions & 0 deletions packages/pynumaflow-lite/src/sideinput/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/// Side Input interface managed by Python. It means Python code will start the server
/// and can pass in the Python class implementing the SideInput interface.
pub mod server;

use pyo3::prelude::*;
use std::sync::Mutex;

/// Response from the side input retrieve handler.
/// Indicates whether to broadcast a value or not.
#[pyclass(module = "pynumaflow_lite.sideinputer")]
#[derive(Clone, Debug)]
pub struct Response {
/// The value to broadcast (if any).
#[pyo3(get)]
pub value: Vec<u8>,
/// Whether to broadcast this value.
#[pyo3(get)]
pub broadcast: bool,
}

#[pymethods]
impl Response {
/// Create a new Response that broadcasts the given value.
#[staticmethod]
#[pyo3(signature = (value))]
fn broadcast_message(value: Vec<u8>) -> Self {
Self {
value,
broadcast: true,
}
}

/// Create a new Response that does not broadcast any value.
#[staticmethod]
#[pyo3(signature = ())]
fn no_broadcast_message() -> Self {
Self {
value: vec![],
broadcast: false,
}
}

fn __repr__(&self) -> String {
format!(
"Response(value={:?}, broadcast={})",
self.value, self.broadcast
)
}

fn __str__(&self) -> String {
self.__repr__()
}
}

/// Async SideInput Server that can be started from Python code which will run the Python UDF.
#[pyclass(module = "pynumaflow_lite.sideinputer")]
pub struct SideInputAsyncServer {
sock_file: String,
info_file: String,
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
}

#[pymethods]
impl SideInputAsyncServer {
/// Create a new SideInputAsyncServer.
///
/// Args:
/// sock_file: Path to the Unix domain socket file.
/// info_file: Path to the server info file.
#[new]
#[pyo3(signature = (sock_file=None, info_file=None))]
fn new(sock_file: Option<String>, info_file: Option<String>) -> Self {
Self {
sock_file: sock_file.unwrap_or_else(|| "/var/run/numaflow/sideinput.sock".to_string()),
info_file: info_file
.unwrap_or_else(|| "/var/run/numaflow/sideinput-server-info".to_string()),
shutdown_tx: Mutex::new(None),
}
}

/// Start the server with the given Python SideInput class instance.
#[pyo3(signature = (py_sideinput: "SideInput") -> "None")]
pub fn start<'a>(&self, py: Python<'a>, py_sideinput: Py<PyAny>) -> PyResult<Bound<'a, PyAny>> {
let sock_file = self.sock_file.clone();
let info_file = self.info_file.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
{
let mut guard = self.shutdown_tx.lock().unwrap();
*guard = Some(tx);
}

pyo3_async_runtimes::tokio::future_into_py(py, async move {
crate::sideinput::server::start(py_sideinput, sock_file, info_file, rx)
.await
.expect("server failed to start");
Ok(())
})
}

/// Trigger server shutdown from Python (idempotent).
#[pyo3(signature = () -> "None")]
pub fn stop(&self) -> PyResult<()> {
if let Some(tx) = self.shutdown_tx.lock().unwrap().take() {
let _ = tx.send(());
}
Ok(())
}
}

/// Helper to populate a PyModule with sideinput types/functions.
pub(crate) fn populate_py_module(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<Response>()?;
m.add_class::<SideInputAsyncServer>()?;
m.add("DIR_PATH", numaflow::sideinput::DIR_PATH)?;

Ok(())
}
Loading