diff --git a/packages/pynumaflow-lite/manifests/sideinput/Dockerfile b/packages/pynumaflow-lite/manifests/sideinput/Dockerfile new file mode 100644 index 00000000..e3ae81a8 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/sideinput/Dockerfile @@ -0,0 +1,39 @@ +FROM python:3.11-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" + + ENV PATH="$POETRY_HOME/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -sSL https://install.python-poetry.org | python3 - + +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY ./ ./ + +RUN pip + +RUN poetry lock +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ +RUN poetry add $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl + +CMD ["poetry", "run", "python", "sideinput_example.py"] + diff --git a/packages/pynumaflow-lite/manifests/sideinput/README.md b/packages/pynumaflow-lite/manifests/sideinput/README.md new file mode 100644 index 00000000..4cea0513 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/sideinput/README.md @@ -0,0 +1,28 @@ +To create the `wheel` file, refer [root](../../README.md) + +## HOWTO build Image + +```bash +docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-sideinput:v1 --load +``` + +### `k3d` + +Load it now to `k3d` + +```bash +k3d image import quay.io/numaio/numaflow/pynumaflow-lite-sideinput:v1 +``` + +### Minikube + +```bash +minikube image load quay.io/numaio/numaflow/pynumaflow-lite-sideinput:v1 +``` + +## Run the pipeline + +```bash +kubectl apply -f pipeline.yaml +``` + diff --git a/packages/pynumaflow-lite/manifests/sideinput/pipeline.yaml b/packages/pynumaflow-lite/manifests/sideinput/pipeline.yaml new file mode 100644 index 00000000..dc5e2db4 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/sideinput/pipeline.yaml @@ -0,0 +1,39 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-sideinput +spec: + sideInputs: + - name: myticker + container: + image: quay.io/numaio/numaflow/pynumaflow-lite-sideinput:v1 + imagePullPolicy: Never + trigger: + schedule: "*/10 * * * * *" + vertices: + - name: in + source: + # A self data generating source + generator: + rpu: 1 + duration: 1s + - name: si-map + udf: + container: + image: quay.io/numaio/numaflow/pynumaflow-lite-sideinput:v1 + imagePullPolicy: Never + env: + - name: MAPPER + value: "true" + sideInputs: + - myticker + - name: out + sink: + # A simple log printing sink + log: { } + edges: + - from: in + to: si-map + - from: si-map + to: out + diff --git a/packages/pynumaflow-lite/manifests/sideinput/pyproject.toml b/packages/pynumaflow-lite/manifests/sideinput/pyproject.toml new file mode 100644 index 00000000..424bda25 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/sideinput/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "sideinput-example" +version = "0.1.0" +description = "Side Input Example with Retriever and Mapper" +authors = [ + { name = "Vigith Maurice", email = "vigith@gmail.com" } +] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "watchfiles", + "watchdog", +] + + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" + diff --git a/packages/pynumaflow-lite/manifests/sideinput/sideinput_example.py b/packages/pynumaflow-lite/manifests/sideinput/sideinput_example.py new file mode 100644 index 00000000..781e997b --- /dev/null +++ b/packages/pynumaflow-lite/manifests/sideinput/sideinput_example.py @@ -0,0 +1,145 @@ +""" +Side Input Example for pynumaflow-lite. + +This module contains both a SideInput retriever and a Mapper that reads from side inputs. +The mode is controlled by the MAPPER environment variable: +- If MAPPER is set to "true", runs as a Mapper that reads side input files +- Otherwise, runs as a SideInput retriever that broadcasts values +""" +import asyncio +import os +import signal +import threading +from threading import Thread +import datetime + +from pynumaflow_lite import sideinputer, mapper +from watchfiles import watch + + +class ExampleSideInput(sideinputer.SideInput): + """ + A SideInput retriever that broadcasts a timestamp message every time. + """ + + def __init__(self): + self.counter = 0 + + async def retrieve_handler(self) -> sideinputer.Response: + """ + This function is called every time the side input is requested. + """ + time_now = datetime.datetime.now() + # val is the value to be broadcasted + val = f"an example: {str(time_now)}" + self.counter += 1 + # broadcast_message() is used to indicate that there is a broadcast + return sideinputer.Response.broadcast_message(val.encode("utf-8")) + + +class SideInputHandler(mapper.Mapper): + """ + A Mapper that reads from side input files and includes the value in its output. + """ + + # variable and lock for thread safety + data_value = "no_value" + data_value_lock = threading.Lock() + + # Side input file that we are watching + watched_file = "myticker" + + async def handler(self, keys: list[str], datum: mapper.Datum) -> mapper.Messages: + with self.data_value_lock: + current_value = self.data_value + + messages = mapper.Messages() + messages.append(mapper.Message(str.encode(current_value))) + return messages + + def file_watcher(self): + """ + This function is used to watch the side input directory for changes. + """ + path = sideinputer.DIR_PATH + for changes in watch(path): + for change in changes: + change_type, file_path = change + if file_path.endswith(self.watched_file): + with self.data_value_lock: + self.update_data_from_file(file_path) + + def init_data_value(self): + """Read the SIDE INPUT FILE for initial value before starting the server.""" + path = os.path.join(sideinputer.DIR_PATH, self.watched_file) + print(f"Initializing side input from: {path}") + self.update_data_from_file(path) + + def update_data_from_file(self, path): + try: + with open(path) as file: + value = file.read().strip() + self.data_value = value + print(f"Data value variable set to: {self.data_value}") + except Exception as e: + print(f"Error reading file: {e}") + + +# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly. +signal.signal(signal.SIGINT, signal.default_int_handler) +try: + signal.signal(signal.SIGTERM, signal.SIG_DFL) +except AttributeError: + pass + + +async def start_sideinput(): + """Start the SideInput retriever server.""" + server = sideinputer.SideInputAsyncServer() + side_input = ExampleSideInput() + + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGINT, lambda: server.stop()) + loop.add_signal_handler(signal.SIGTERM, lambda: server.stop()) + + try: + await server.start(side_input) + print("SideInput server shutting down gracefully...") + except asyncio.CancelledError: + server.stop() + + +async def start_mapper(): + """Start the Mapper server that reads from side inputs.""" + server = mapper.MapAsyncServer() + handler = SideInputHandler() + + # Initialize the data value from the side input file + handler.init_data_value() + + # Start the file watcher in a background thread + watcher_thread = Thread(target=handler.file_watcher, daemon=True) + watcher_thread.start() + + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGINT, lambda: server.stop()) + loop.add_signal_handler(signal.SIGTERM, lambda: server.stop()) + + try: + await server.start(handler) + print("Mapper server shutting down gracefully...") + except asyncio.CancelledError: + server.stop() + + +if __name__ == "__main__": + # Check if we should run as a mapper or side input retriever + is_mapper = os.environ.get("MAPPER", "").lower() == "true" + + if is_mapper: + print("Starting as Mapper (reading side inputs)...") + asyncio.run(start_mapper()) + else: + print("Starting as SideInput retriever...") + asyncio.run(start_sideinput()) +