Skip to content

Commit b0c7e7b

Browse files
authored
chore: manifests for pynumaflow_lite source (#285)
1 parent c9dccff commit b0c7e7b

File tree

5 files changed

+226
-0
lines changed

5 files changed

+226
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
FROM python:3.11-slim-bullseye AS builder
2+
3+
ENV PYTHONFAULTHANDLER=1 \
4+
PYTHONUNBUFFERED=1 \
5+
PYTHONHASHSEED=random \
6+
PIP_NO_CACHE_DIR=on \
7+
PIP_DISABLE_PIP_VERSION_CHECK=on \
8+
PIP_DEFAULT_TIMEOUT=100 \
9+
POETRY_HOME="/opt/poetry" \
10+
POETRY_VIRTUALENVS_IN_PROJECT=true \
11+
POETRY_NO_INTERACTION=1 \
12+
PYSETUP_PATH="/opt/pysetup"
13+
14+
ENV PATH="$POETRY_HOME/bin:$PATH"
15+
16+
RUN apt-get update \
17+
&& apt-get install --no-install-recommends -y \
18+
curl \
19+
wget \
20+
# deps for building python deps
21+
build-essential \
22+
&& apt-get install -y git \
23+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
24+
&& curl -sSL https://install.python-poetry.org | python3 -
25+
26+
FROM builder AS udf
27+
28+
WORKDIR $PYSETUP_PATH
29+
COPY ./ ./
30+
31+
RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
32+
33+
RUN poetry lock
34+
RUN poetry install --no-cache --no-root && \
35+
rm -rf ~/.cache/pypoetry/
36+
37+
CMD ["python", "simple_source.py"]
38+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
To create the `wheel` file, refer [root](../../README.md)
2+
3+
## HOWTO build Image
4+
5+
```bash
6+
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-simple-source:v1 --load
7+
```
8+
9+
### `k3d`
10+
11+
Load it now to `k3d`
12+
13+
```bash
14+
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-simple-source:v1
15+
```
16+
17+
### Minikube
18+
19+
```bash
20+
minikube image load quay.io/numaio/numaflow/pynumaflow-lite-simple-source:v1
21+
```
22+
23+
## Run the pipeline
24+
25+
```bash
26+
kubectl apply -f pipeline.yaml
27+
```
28+
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: simple-source
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
udsource:
10+
container:
11+
image: quay.io/numaio/numaflow/pynumaflow-lite-simple-source:v1
12+
imagePullPolicy: Never
13+
limits:
14+
readBatchSize: 5
15+
- name: out
16+
sink:
17+
log: {}
18+
edges:
19+
- from: in
20+
to: out
21+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[project]
2+
name = "simple-source"
3+
version = "0.1.0"
4+
description = "User-defined source example using pynumaflow-lite"
5+
authors = [
6+
{ name = "Vigith Maurice", email = "[email protected]" }
7+
]
8+
readme = "README.md"
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
]
12+
13+
14+
[build-system]
15+
requires = ["poetry-core>=2.0.0,<3.0.0"]
16+
build-backend = "poetry.core.masonry.api"
17+
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import asyncio
2+
import logging
3+
import signal
4+
from datetime import datetime, timezone
5+
from collections.abc import AsyncIterator
6+
7+
from pynumaflow_lite import sourcer
8+
from pynumaflow_lite._source_dtypes import Sourcer
9+
10+
# Configure logging
11+
logging.basicConfig(level=logging.INFO)
12+
_LOGGER = logging.getLogger(__name__)
13+
14+
15+
class SimpleSource(Sourcer):
16+
"""
17+
Simple source that generates messages with incrementing numbers.
18+
This is a class-based user-defined source implementation.
19+
"""
20+
21+
def __init__(self):
22+
self.counter = 0
23+
self.partition_idx = 0
24+
25+
async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[sourcer.Message]:
26+
"""
27+
The simple source generates messages with incrementing numbers.
28+
"""
29+
_LOGGER.info(f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}")
30+
31+
# Generate the requested number of messages
32+
for i in range(datum.num_records):
33+
# Create message payload
34+
payload = f"message-{self.counter}".encode("utf-8")
35+
36+
# Create offset
37+
offset = sourcer.Offset(
38+
offset=str(self.counter).encode("utf-8"),
39+
partition_id=self.partition_idx
40+
)
41+
42+
# Create message
43+
message = sourcer.Message(
44+
payload=payload,
45+
offset=offset,
46+
event_time=datetime.now(timezone.utc),
47+
keys=["key1"],
48+
headers={"source": "simple"}
49+
)
50+
51+
_LOGGER.info(f"Generated message: {self.counter}")
52+
self.counter += 1
53+
54+
yield message
55+
56+
# Small delay to simulate real source
57+
await asyncio.sleep(0.1)
58+
59+
async def ack_handler(self, request: sourcer.AckRequest) -> None:
60+
"""
61+
The simple source acknowledges the offsets.
62+
"""
63+
_LOGGER.info(f"Acknowledging {len(request.offsets)} offsets")
64+
for offset in request.offsets:
65+
_LOGGER.debug(f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
66+
67+
async def nack_handler(self, request: sourcer.NackRequest) -> None:
68+
"""
69+
The simple source negatively acknowledges the offsets.
70+
"""
71+
_LOGGER.info(f"Negatively acknowledging {len(request.offsets)} offsets")
72+
for offset in request.offsets:
73+
_LOGGER.warning(f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
74+
75+
async def pending_handler(self) -> sourcer.PendingResponse:
76+
"""
77+
The simple source always returns zero to indicate there is no pending record.
78+
"""
79+
return sourcer.PendingResponse(count=0)
80+
81+
async def partitions_handler(self) -> sourcer.PartitionsResponse:
82+
"""
83+
The simple source always returns default partitions.
84+
"""
85+
return sourcer.PartitionsResponse(partitions=[self.partition_idx])
86+
87+
88+
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
89+
signal.signal(signal.SIGINT, signal.default_int_handler)
90+
try:
91+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
92+
except AttributeError:
93+
pass
94+
95+
96+
async def start():
97+
server = sourcer.SourceAsyncServer()
98+
99+
# Create an instance of the source handler
100+
handler = SimpleSource()
101+
102+
# Register loop-level signal handlers to request graceful shutdown
103+
loop = asyncio.get_running_loop()
104+
try:
105+
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
106+
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
107+
except (NotImplementedError, RuntimeError):
108+
pass
109+
110+
try:
111+
await server.start(handler)
112+
print("Shutting down gracefully...")
113+
except asyncio.CancelledError:
114+
try:
115+
server.stop()
116+
except Exception:
117+
pass
118+
return
119+
120+
121+
if __name__ == "__main__":
122+
asyncio.run(start())

0 commit comments

Comments
 (0)