Skip to content

Commit 2f0d4c4

Browse files
authored
fix: async source init (#120)
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
1 parent b24f87d commit 2f0d4c4

File tree

9 files changed

+188
-5
lines changed

9 files changed

+188
-5
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup" \
18+
VENV_PATH="/opt/pysetup/.venv"
19+
20+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
21+
22+
RUN apt-get update \
23+
&& apt-get install --no-install-recommends -y \
24+
curl \
25+
wget \
26+
# deps for building python deps
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
40+
41+
WORKDIR $PYSETUP_PATH
42+
COPY pyproject.toml ./
43+
RUN poetry install --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
45+
46+
ADD . /app
47+
WORKDIR /app
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.PHONY: image
2+
image:
3+
docker build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" .
4+
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
5+
# under the CI E2E test environment.
6+
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
7+
# docker buildx build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" --platform linux/amd64,linux/arm64 . --push
8+
# If command failed, refer to https://billglover.me/notes/build-multi-arch-docker-images/ to fix
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Example Python User Defined Source
2+
A simple example of a user-defined source. The source maintains an array of messages and implements the Read,
3+
Ack, and Pending methods.
4+
The Read(x) method returns the next x number of messages in the array.
5+
The Ack() method acknowledges the last batch of messages returned by Read().
6+
The Pending() method returns 0 to indicate that the simple source always has 0 pending messages.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from datetime import datetime
2+
from collections.abc import AsyncIterable
3+
4+
import aiorun
5+
6+
from pynumaflow.sourcer import (
7+
ReadRequest,
8+
Message,
9+
AckRequest,
10+
PendingResponse,
11+
Offset,
12+
AsyncSourcer,
13+
)
14+
15+
16+
class AsyncSource:
17+
"""
18+
AsyncSource is a class for User Defined Source implementation.
19+
"""
20+
21+
def __init__(self):
22+
"""
23+
to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
24+
read_idx : the offset idx till where the messages have been read
25+
"""
26+
self.to_ack_set = set()
27+
self.read_idx = 0
28+
29+
async def read_handler(self, datum: ReadRequest) -> AsyncIterable[Message]:
30+
"""
31+
read_handler is used to read the data from the source and send the data forward
32+
for each read request we process num_records and increment the read_idx to indicate that
33+
the message has been read and the same is added to the ack set
34+
"""
35+
if self.to_ack_set:
36+
return
37+
38+
for x in range(datum.num_records):
39+
yield Message(
40+
payload=str(self.read_idx).encode(),
41+
offset=Offset(offset=str(self.read_idx).encode(), partition_id="0"),
42+
event_time=datetime.now(),
43+
)
44+
self.to_ack_set.add(str(self.read_idx))
45+
self.read_idx += 1
46+
47+
async def ack_handler(self, ack_request: AckRequest):
48+
"""
49+
The ack handler is used acknowledge the offsets that have been read, and remove them
50+
from the to_ack_set
51+
"""
52+
for offset in ack_request.offset:
53+
self.to_ack_set.remove(str(offset.offset, "utf-8"))
54+
55+
async def pending_handler(self) -> PendingResponse:
56+
"""
57+
The simple source always returns zero to indicate there is no pending record.
58+
"""
59+
return PendingResponse(count=0)
60+
61+
62+
if __name__ == "__main__":
63+
ud_source = AsyncSource()
64+
grpc_server = AsyncSourcer(
65+
read_handler=ud_source.read_handler,
66+
ack_handler=ud_source.ack_handler,
67+
pending_handler=ud_source.pending_handler,
68+
)
69+
aiorun.run(grpc_server.start())
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: async-source
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
udsource:
10+
container:
11+
# A simple user-defined async source
12+
image: quay.io/numaio/numaflow-python/async-source:v0.5.4
13+
imagePullPolicy: Always
14+
limits:
15+
readBatchSize: 2
16+
- name: out
17+
sink:
18+
log: {}
19+
edges:
20+
- from: in
21+
to: out
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[tool.poetry]
2+
name = "simple-source"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = "~0.5.4"
10+
aiorun = "^2023.7"
11+
12+
13+
[tool.poetry.dev-dependencies]
14+
15+
[build-system]
16+
requires = ["poetry-core>=1.0.0"]
17+
build-backend = "poetry.core.masonry.api"

pynumaflow/sourcer/async_server.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,20 @@ async def AckFn(
137137
for offset in request.request.offsets:
138138
offsets.append(Offset(offset.offset, offset.partition_id))
139139
try:
140-
await self.__invoke_ack(ack_req=request)
140+
await self.__invoke_ack(ack_req=offsets)
141141
except Exception as e:
142142
context.set_code(grpc.StatusCode.UNKNOWN)
143143
context.set_details(str(e))
144144
raise e
145145

146146
return source_pb2.AckResponse()
147147

148-
async def __invoke_ack(self, ack_req: AckRequest):
148+
async def __invoke_ack(self, ack_req: list[Offset]):
149149
"""
150150
Invokes the Source Ack Function.
151151
"""
152152
try:
153-
await self.__source_ack_handler(ack_req)
153+
await self.__source_ack_handler(AckRequest(offsets=ack_req))
154154
except Exception as err:
155155
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
156156
raise err
@@ -182,7 +182,11 @@ async def PendingFn(
182182

183183
async def __serve_async(self, server) -> None:
184184
source_pb2_grpc.add_SourceServicer_to_server(
185-
AsyncSourcer(read_handler=self.__source_read_handler),
185+
AsyncSourcer(
186+
read_handler=self.__source_read_handler,
187+
ack_handler=self.__source_ack_handler,
188+
pending_handler=self.__source_pending_handler,
189+
),
186190
server,
187191
)
188192
server.add_insecure_port(self.sock_path)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "pynumaflow"
3-
version = "0.5.3"
3+
version = "0.5.4"
44
description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow."
55
authors = ["NumaFlow Developers"]
66
readme = "README.md"

0 commit comments

Comments
 (0)