Skip to content

Commit 2ebe47f

Browse files
vaibhavtiwari33vtiwari5
andauthored
feat: Support for on_success sink in pynumaflow (#302)
Signed-off-by: vtiwari5 <[email protected]> Signed-off-by: Vaibhav Tiwari <[email protected]> Co-authored-by: vtiwari5 <[email protected]>
1 parent 5500636 commit 2ebe47f

File tree

17 files changed

+520
-27
lines changed

17 files changed

+520
-27
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# Stage 1: Base Builder - installs core dependencies using poetry
3+
####################################################################################################
4+
FROM python:3.10-slim-bullseye AS base-builder
5+
6+
ENV PYSETUP_PATH="/opt/pysetup"
7+
WORKDIR $PYSETUP_PATH
8+
9+
# Copy only core dependency files first for better caching
10+
COPY pyproject.toml poetry.lock README.md ./
11+
COPY pynumaflow/ ./pynumaflow/
12+
RUN apt-get update && apt-get install --no-install-recommends -y \
13+
curl wget build-essential git \
14+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
15+
&& pip install poetry \
16+
&& poetry install --no-root --no-interaction
17+
18+
####################################################################################################
19+
# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps
20+
####################################################################################################
21+
FROM base-builder AS udf-builder
22+
23+
ENV EXAMPLE_PATH="/opt/pysetup/examples/sink/all_sinks"
24+
ENV POETRY_VIRTUALENVS_IN_PROJECT=true
25+
26+
WORKDIR $EXAMPLE_PATH
27+
COPY examples/sink/all_sinks/ ./
28+
RUN poetry install --no-root --no-interaction
29+
30+
####################################################################################################
31+
# Stage 3: UDF Runtime - clean container with only needed stuff
32+
####################################################################################################
33+
FROM python:3.10-slim-bullseye AS udf
34+
35+
ENV PYSETUP_PATH="/opt/pysetup"
36+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sink/all_sinks"
37+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
38+
ENV PATH="$VENV_PATH/bin:$PATH"
39+
40+
RUN apt-get update && apt-get install --no-install-recommends -y wget \
41+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
42+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
43+
&& chmod +x /dumb-init
44+
45+
WORKDIR $PYSETUP_PATH
46+
COPY --from=udf-builder $VENV_PATH $VENV_PATH
47+
COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH
48+
49+
WORKDIR $EXAMPLE_PATH
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["python", "$EXAMPLE_PATH/example.py"]
53+
54+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/all-sinks:${TAG}
4+
DOCKER_FILE_PATH = examples/sink/all_sinks/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from collections.abc import AsyncIterable
2+
from pynumaflow.sinker import Datum, Responses, Response, Sinker, Message
3+
from pynumaflow.sinker import SinkAsyncServer
4+
import logging
5+
import random
6+
7+
logging.basicConfig(level=logging.DEBUG)
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class UserDefinedSink(Sinker):
12+
async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
13+
responses = Responses()
14+
async for msg in datums:
15+
if primary_sink_write_status():
16+
logger.info(
17+
"Write to User Defined Sink succeeded, writing %s to onSuccess sink",
18+
msg.value.decode("utf-8"),
19+
)
20+
# create a message to be sent to onSuccess sink
21+
on_success_message = Response.as_on_success(
22+
msg.id,
23+
Message(msg.value, ["on_success"], msg.user_metadata),
24+
)
25+
responses.append(on_success_message)
26+
# Sending `None`, on the other hand, specifies that simply send
27+
# the original message to the onSuccess sink
28+
# `responses.append(Response.as_on_success(msg.id, None))`
29+
else:
30+
logger.info(
31+
"Write to User Defined Sink failed, writing %s to fallback sink",
32+
msg.value.decode("utf-8"),
33+
)
34+
responses.append(Response.as_fallback(msg.id))
35+
return responses
36+
37+
38+
async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
39+
responses = Responses()
40+
async for msg in datums:
41+
if primary_sink_write_status():
42+
logger.info(
43+
"Write to User Defined Sink succeeded, writing %s to onSuccess sink",
44+
msg.value.decode("utf-8"),
45+
)
46+
# create a message to be sent to onSuccess sink
47+
on_success_message = Response.as_on_success(
48+
msg.id,
49+
Message(msg.value).with_keys(["on_success"]).with_user_metadata(msg.user_metadata),
50+
)
51+
responses.append(on_success_message)
52+
# Sending `None`, on the other hand, specifies that simply send
53+
# the original message to the onSuccess sink
54+
# `responses.append(Response.as_on_success(msg.id, None))`
55+
else:
56+
logger.info(
57+
"Write to User Defined Sink failed, writing %s to fallback sink",
58+
msg.value.decode("utf-8"),
59+
)
60+
responses.append(Response.as_fallback(msg.id))
61+
return responses
62+
63+
64+
def primary_sink_write_status():
65+
# simulate writing to primary sink and return status of the same
66+
# return True if writing to primary sink succeeded
67+
# return False if writing to primary sink failed
68+
return random.randint(0, 1) == 1
69+
70+
71+
if __name__ == "__main__":
72+
sink_handler = UserDefinedSink()
73+
grpc_server = SinkAsyncServer(sink_handler)
74+
grpc_server.start()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: all-sinks-pipeline
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
generator:
10+
rpu: 1
11+
duration: 1s
12+
msgSize: 10
13+
- name: out
14+
sink:
15+
udsink:
16+
container:
17+
args:
18+
- python
19+
- example.py
20+
image: quay.io/numaio/numaflow-python/all-sinks:stable
21+
imagePullPolicy: IfNotPresent
22+
env:
23+
- name: PYTHONDEBUG
24+
value: "true"
25+
- name: INVOKE
26+
value: "func_handler"
27+
fallback:
28+
udsink:
29+
container:
30+
image: quay.io/numaio/numaflow-python/sink-log:stable
31+
imagePullPolicy: IfNotPresent
32+
onSuccess:
33+
udsink:
34+
container:
35+
image: quay.io/numaio/numaflow-rs/sink-log:stable
36+
imagePullPolicy: IfNotPresent
37+
- name: log-output
38+
sink:
39+
log: {}
40+
edges:
41+
- from: in
42+
to: out
43+
- from: in
44+
to: log-output
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "example-sink"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = ">=3.10,<3.13"
9+
pynumaflow = { path = "../../../"}
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"

packages/pynumaflow/pynumaflow/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock"
2626
MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc"
2727
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"
28+
ON_SUCCESS_SINK_SOCK_PATH = "/var/run/numaflow/ons-sink.sock"
2829
BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock"
2930
ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock"
3031

@@ -37,10 +38,12 @@
3738
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
3839
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
3940
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
41+
ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/ons-sinker-server-info"
4042
ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info"
4143

4244
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
4345
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"
46+
UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink"
4447

4548
# TODO: need to make sure the DATUM_KEY value is the same as
4649
# https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6

packages/pynumaflow/pynumaflow/proto/sinker/sink.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,28 @@ enum Status {
6767
FAILURE = 1;
6868
FALLBACK = 2;
6969
SERVE = 3;
70+
ON_SUCCESS = 4;
7071
}
7172

7273
/**
7374
* SinkResponse is the individual response of each message written to the sink.
7475
*/
7576
message SinkResponse {
7677
message Result {
78+
message Message {
79+
bytes value = 1;
80+
repeated string keys = 2;
81+
common.Metadata metadata = 3;
82+
}
7783
// id is the ID of the message, can be used to uniquely identify the message.
7884
string id = 1;
7985
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
8086
Status status = 2;
8187
// err_msg is the error message, set it if success is set to false.
8288
string err_msg = 3;
8389
optional bytes serve_response = 4;
90+
// on_success_msg is the message to be sent to on_success sink.
91+
optional Message on_success_msg = 5;
8492
}
8593
repeated Result results = 1;
8694
optional Handshake handshake = 2;

packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py

Lines changed: 10 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ class Status(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
1818
FAILURE: _ClassVar[Status]
1919
FALLBACK: _ClassVar[Status]
2020
SERVE: _ClassVar[Status]
21+
ON_SUCCESS: _ClassVar[Status]
2122
SUCCESS: Status
2223
FAILURE: Status
2324
FALLBACK: Status
2425
SERVE: Status
26+
ON_SUCCESS: Status
2527

2628
class SinkRequest(_message.Message):
2729
__slots__ = ("request", "status", "handshake")
@@ -78,16 +80,27 @@ class TransmissionStatus(_message.Message):
7880
class SinkResponse(_message.Message):
7981
__slots__ = ("results", "handshake", "status")
8082
class Result(_message.Message):
81-
__slots__ = ("id", "status", "err_msg", "serve_response")
83+
__slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg")
84+
class Message(_message.Message):
85+
__slots__ = ("value", "keys", "metadata")
86+
VALUE_FIELD_NUMBER: _ClassVar[int]
87+
KEYS_FIELD_NUMBER: _ClassVar[int]
88+
METADATA_FIELD_NUMBER: _ClassVar[int]
89+
value: bytes
90+
keys: _containers.RepeatedScalarFieldContainer[str]
91+
metadata: _metadata_pb2.Metadata
92+
def __init__(self, value: _Optional[bytes] = ..., keys: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ...
8293
ID_FIELD_NUMBER: _ClassVar[int]
8394
STATUS_FIELD_NUMBER: _ClassVar[int]
8495
ERR_MSG_FIELD_NUMBER: _ClassVar[int]
8596
SERVE_RESPONSE_FIELD_NUMBER: _ClassVar[int]
97+
ON_SUCCESS_MSG_FIELD_NUMBER: _ClassVar[int]
8698
id: str
8799
status: Status
88100
err_msg: str
89101
serve_response: bytes
90-
def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ...) -> None: ...
102+
on_success_msg: SinkResponse.Result.Message
103+
def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ...) -> None: ...
91104
RESULTS_FIELD_NUMBER: _ClassVar[int]
92105
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
93106
STATUS_FIELD_NUMBER: _ClassVar[int]

packages/pynumaflow/pynumaflow/sinker/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
from pynumaflow.sinker.server import SinkServer
44

55
from pynumaflow._metadata import UserMetadata, SystemMetadata
6-
from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker
6+
from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker, Message
77

88
__all__ = [
9+
"Message",
910
"Response",
1011
"Responses",
1112
"Datum",

0 commit comments

Comments
 (0)