diff --git a/packages/pynumaflow/examples/sink/all_sinks/Dockerfile b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile new file mode 100644 index 00000000..c0126132 --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile @@ -0,0 +1,54 @@ +#################################################################################################### +# Stage 1: Base Builder - installs core dependencies using poetry +#################################################################################################### +FROM python:3.10-slim-bullseye AS base-builder + +ENV PYSETUP_PATH="/opt/pysetup" +WORKDIR $PYSETUP_PATH + +# Copy only core dependency files first for better caching +COPY pyproject.toml poetry.lock README.md ./ +COPY pynumaflow/ ./pynumaflow/ +RUN apt-get update && apt-get install --no-install-recommends -y \ + curl wget build-essential git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && pip install poetry \ + && poetry install --no-root --no-interaction + +#################################################################################################### +# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps +#################################################################################################### +FROM base-builder AS udf-builder + +ENV EXAMPLE_PATH="/opt/pysetup/examples/sink/all_sinks" +ENV POETRY_VIRTUALENVS_IN_PROJECT=true + +WORKDIR $EXAMPLE_PATH +COPY examples/sink/all_sinks/ ./ +RUN poetry install --no-root --no-interaction + +#################################################################################################### +# Stage 3: UDF Runtime - clean container with only needed stuff +#################################################################################################### +FROM python:3.10-slim-bullseye AS udf + +ENV PYSETUP_PATH="/opt/pysetup" +ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sink/all_sinks" +ENV VENV_PATH="$EXAMPLE_PATH/.venv" +ENV PATH="$VENV_PATH/bin:$PATH" + +RUN apt-get update && apt-get install --no-install-recommends -y wget \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init + +WORKDIR $PYSETUP_PATH +COPY --from=udf-builder $VENV_PATH $VENV_PATH +COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH + +WORKDIR $EXAMPLE_PATH + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["python", "$EXAMPLE_PATH/example.py"] + +EXPOSE 5000 diff --git a/packages/pynumaflow/examples/sink/all_sinks/Makefile b/packages/pynumaflow/examples/sink/all_sinks/Makefile new file mode 100644 index 00000000..fdd8e89d --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-python/all-sinks:${TAG} +DOCKER_FILE_PATH = examples/sink/all_sinks/Dockerfile + +.PHONY: update +update: + poetry update -vv + +.PHONY: image-push +image-push: update + cd ../../../ && docker buildx build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} \ + --platform linux/amd64,linux/arm64 . --push + +.PHONY: image +image: update + cd ../../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi diff --git a/packages/pynumaflow/examples/sink/all_sinks/example.py b/packages/pynumaflow/examples/sink/all_sinks/example.py new file mode 100644 index 00000000..58a18601 --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/example.py @@ -0,0 +1,74 @@ +from collections.abc import AsyncIterable +from pynumaflow.sinker import Datum, Responses, Response, Sinker, Message +from pynumaflow.sinker import SinkAsyncServer +import logging +import random + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +class UserDefinedSink(Sinker): + async def handler(self, datums: AsyncIterable[Datum]) -> Responses: + responses = Responses() + async for msg in datums: + if primary_sink_write_status(): + logger.info( + "Write to User Defined Sink succeeded, writing %s to onSuccess sink", + msg.value.decode("utf-8"), + ) + # create a message to be sent to onSuccess sink + on_success_message = Response.as_on_success( + msg.id, + Message(msg.value, ["on_success"], msg.user_metadata), + ) + responses.append(on_success_message) + # Sending `None`, on the other hand, specifies that simply send + # the original message to the onSuccess sink + # `responses.append(Response.as_on_success(msg.id, None))` + else: + logger.info( + "Write to User Defined Sink failed, writing %s to fallback sink", + msg.value.decode("utf-8"), + ) + responses.append(Response.as_fallback(msg.id)) + return responses + + +async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: + responses = Responses() + async for msg in datums: + if primary_sink_write_status(): + logger.info( + "Write to User Defined Sink succeeded, writing %s to onSuccess sink", + msg.value.decode("utf-8"), + ) + # create a message to be sent to onSuccess sink + on_success_message = Response.as_on_success( + msg.id, + Message(msg.value).with_keys(["on_success"]).with_user_metadata(msg.user_metadata), + ) + responses.append(on_success_message) + # Sending `None`, on the other hand, specifies that simply send + # the original message to the onSuccess sink + # `responses.append(Response.as_on_success(msg.id, None))` + else: + logger.info( + "Write to User Defined Sink failed, writing %s to fallback sink", + msg.value.decode("utf-8"), + ) + responses.append(Response.as_fallback(msg.id)) + return responses + + +def primary_sink_write_status(): + # simulate writing to primary sink and return status of the same + # return True if writing to primary sink succeeded + # return False if writing to primary sink failed + return random.randint(0, 1) == 1 + + +if __name__ == "__main__": + sink_handler = UserDefinedSink() + grpc_server = SinkAsyncServer(sink_handler) + grpc_server.start() diff --git a/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml new file mode 100644 index 00000000..53d256c7 --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml @@ -0,0 +1,44 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: all-sinks-pipeline +spec: + vertices: + - name: in + source: + generator: + rpu: 1 + duration: 1s + msgSize: 10 + - name: out + sink: + udsink: + container: + args: + - python + - example.py + image: quay.io/numaio/numaflow-python/all-sinks:stable + imagePullPolicy: IfNotPresent + env: + - name: PYTHONDEBUG + value: "true" + - name: INVOKE + value: "func_handler" + fallback: + udsink: + container: + image: quay.io/numaio/numaflow-python/sink-log:stable + imagePullPolicy: IfNotPresent + onSuccess: + udsink: + container: + image: quay.io/numaio/numaflow-rs/sink-log:stable + imagePullPolicy: IfNotPresent + - name: log-output + sink: + log: {} + edges: + - from: in + to: out + - from: in + to: log-output diff --git a/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml b/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml new file mode 100644 index 00000000..2ff3e97a --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "example-sink" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] + +[tool.poetry.dependencies] +python = ">=3.10,<3.13" +pynumaflow = { path = "../../../"} + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/packages/pynumaflow/pynumaflow/_constants.py b/packages/pynumaflow/pynumaflow/_constants.py index 01ae44d5..16c74134 100644 --- a/packages/pynumaflow/pynumaflow/_constants.py +++ b/packages/pynumaflow/pynumaflow/_constants.py @@ -25,6 +25,7 @@ SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock" MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc" FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock" +ON_SUCCESS_SINK_SOCK_PATH = "/var/run/numaflow/ons-sink.sock" BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock" ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock" @@ -37,10 +38,12 @@ SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info" SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info" FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info" +ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/ons-sinker-server-info" ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info" ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE" UD_CONTAINER_FALLBACK_SINK = "fb-udsink" +UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink" # TODO: need to make sure the DATUM_KEY value is the same as # https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6 diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto index a94afd31..fdba28d2 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto @@ -67,6 +67,7 @@ enum Status { FAILURE = 1; FALLBACK = 2; SERVE = 3; + ON_SUCCESS = 4; } /** @@ -74,6 +75,11 @@ enum Status { */ message SinkResponse { message Result { + message Message { + bytes value = 1; + repeated string keys = 2; + common.Metadata metadata = 3; + } // id is the ID of the message, can be used to uniquely identify the message. string id = 1; // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. @@ -81,6 +87,8 @@ message SinkResponse { // err_msg is the error message, set it if success is set to false. string err_msg = 3; optional bytes serve_response = 4; + // on_success_msg is the message to be sent to on_success sink. + optional Message on_success_msg = 5; } repeated Result results = 1; optional Handshake handshake = 2; diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py index 92b0e505..651b2784 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py @@ -27,7 +27,7 @@ from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xac\x02\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1av\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x42\x11\n\x0f_serve_responseB\x0c\n\n_handshakeB\t\n\x07_status*;\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xcf\x03\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x98\x02\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x41\n\x0eon_success_msg\x18\x05 \x01(\x0b\x32$.sink.v1.SinkResponse.Result.MessageH\x01\x88\x01\x01\x1aJ\n\x07Message\x12\r\n\x05value\x18\x01 \x01(\x0c\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\"\n\x08metadata\x18\x03 \x01(\x0b\x32\x10.common.MetadataB\x11\n\x0f_serve_responseB\x11\n\x0f_on_success_msgB\x0c\n\n_handshakeB\t\n\x07_status*K\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x12\x0e\n\nON_SUCCESS\x10\x04\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,8 +36,8 @@ DESCRIPTOR._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_STATUS']._serialized_start=1003 - _globals['_STATUS']._serialized_end=1062 + _globals['_STATUS']._serialized_start=1166 + _globals['_STATUS']._serialized_end=1241 _globals['_SINKREQUEST']._serialized_start=150 _globals['_SINKREQUEST']._serialized_end=605 _globals['_SINKREQUEST_REQUEST']._serialized_start=302 @@ -51,9 +51,11 @@ _globals['_TRANSMISSIONSTATUS']._serialized_start=665 _globals['_TRANSMISSIONSTATUS']._serialized_end=698 _globals['_SINKRESPONSE']._serialized_start=701 - _globals['_SINKRESPONSE']._serialized_end=1001 - _globals['_SINKRESPONSE_RESULT']._serialized_start=858 - _globals['_SINKRESPONSE_RESULT']._serialized_end=976 - _globals['_SINK']._serialized_start=1064 - _globals['_SINK']._serialized_end=1188 + _globals['_SINKRESPONSE']._serialized_end=1164 + _globals['_SINKRESPONSE_RESULT']._serialized_start=859 + _globals['_SINKRESPONSE_RESULT']._serialized_end=1139 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_start=1027 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_end=1101 + _globals['_SINK']._serialized_start=1243 + _globals['_SINK']._serialized_end=1367 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi index 57a3728b..2c696e72 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi @@ -18,10 +18,12 @@ class Status(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): FAILURE: _ClassVar[Status] FALLBACK: _ClassVar[Status] SERVE: _ClassVar[Status] + ON_SUCCESS: _ClassVar[Status] SUCCESS: Status FAILURE: Status FALLBACK: Status SERVE: Status +ON_SUCCESS: Status class SinkRequest(_message.Message): __slots__ = ("request", "status", "handshake") @@ -78,16 +80,27 @@ class TransmissionStatus(_message.Message): class SinkResponse(_message.Message): __slots__ = ("results", "handshake", "status") class Result(_message.Message): - __slots__ = ("id", "status", "err_msg", "serve_response") + __slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg") + class Message(_message.Message): + __slots__ = ("value", "keys", "metadata") + VALUE_FIELD_NUMBER: _ClassVar[int] + KEYS_FIELD_NUMBER: _ClassVar[int] + METADATA_FIELD_NUMBER: _ClassVar[int] + value: bytes + keys: _containers.RepeatedScalarFieldContainer[str] + metadata: _metadata_pb2.Metadata + def __init__(self, value: _Optional[bytes] = ..., keys: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ... ID_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] ERR_MSG_FIELD_NUMBER: _ClassVar[int] SERVE_RESPONSE_FIELD_NUMBER: _ClassVar[int] + ON_SUCCESS_MSG_FIELD_NUMBER: _ClassVar[int] id: str status: Status err_msg: str serve_response: bytes - def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ...) -> None: ... + on_success_msg: SinkResponse.Result.Message + def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ...) -> None: ... RESULTS_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] diff --git a/packages/pynumaflow/pynumaflow/sinker/__init__.py b/packages/pynumaflow/pynumaflow/sinker/__init__.py index 1064d96a..5a67f752 100644 --- a/packages/pynumaflow/pynumaflow/sinker/__init__.py +++ b/packages/pynumaflow/pynumaflow/sinker/__init__.py @@ -3,9 +3,10 @@ from pynumaflow.sinker.server import SinkServer from pynumaflow._metadata import UserMetadata, SystemMetadata -from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker +from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker, Message __all__ = [ + "Message", "Response", "Responses", "Datum", diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index 43ce3386..c2582ca2 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -12,6 +12,57 @@ Rs = TypeVar("Rs", bound="Responses") +@dataclass +class Message: + """ + Basic datatype for OnSuccess UDSink message. + + Args: + keys: the keys of the on_success message. + value: the payload of the on_success message. + user_metadata: the user metadata of the on_success message. + """ + + _keys: Optional[list[str]] + _value: bytes + _user_metadata: Optional[UserMetadata] + + __slots__ = ("_keys", "_value", "_user_metadata") + + def __init__( + self, + value: bytes, + keys: Optional[list[str]] = None, + user_metadata: Optional[UserMetadata] = None, + ): + self._value = value + self._keys = keys + self._user_metadata = user_metadata + + def with_keys(self, keys: Optional[list[str]]): + self._keys = keys + return self + + def with_user_metadata(self, user_metadata: Optional[UserMetadata]): + self._user_metadata = user_metadata + return self + + @property + def keys(self) -> Optional[list[str]]: + """Returns the id of the event.""" + return self._keys + + @property + def value(self) -> bytes: + """Returns the id of the event.""" + return self._value + + @property + def user_metadata(self) -> Optional[UserMetadata]: + """Returns the id of the event.""" + return self._user_metadata + + @dataclass class Response: """ @@ -28,26 +79,52 @@ class Response: success: bool err: Optional[str] fallback: bool + on_success: bool + on_success_msg: Optional[Message] - __slots__ = ("id", "success", "err", "fallback") + __slots__ = ("id", "success", "err", "fallback", "on_success", "on_success_msg") # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod - def as_success(cls: type[R], id_: str) -> R: - return Response(id=id_, success=True, err=None, fallback=False) + def as_success(cls, id_: str) -> "Response": + return Response( + id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None + ) # as_failure creates a failed Response with the given id and error message. # The success field is set to false and the err field is set to the provided error message. @classmethod - def as_failure(cls: type[R], id_: str, err_msg: str) -> R: - return Response(id=id_, success=False, err=err_msg, fallback=False) + def as_failure(cls, id_: str, err_msg: str) -> "Response": + return Response( + id=id_, + success=False, + err=err_msg, + fallback=False, + on_success=False, + on_success_msg=None, + ) # as_fallback creates a Response with the fallback field set to true. # This indicates that the message should be sent to the fallback sink. @classmethod - def as_fallback(cls: type[R], id_: str) -> R: - return Response(id=id_, fallback=True, err=None, success=False) + def as_fallback(cls, id_: str) -> "Response": + return Response( + id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None + ) + + # as_on_success creates a Response with the on_success field set to true. + # This indicates that the message should be sent to the on_success sink. + @classmethod + def as_on_success(cls, id_: str, on_success: Optional[Message] = None) -> "Response": + return Response( + id=id_, + fallback=False, + err=None, + success=False, + on_success=True, + on_success_msg=on_success, + ) class Responses(Sequence[R]): diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index a329e47e..a9331aca 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -18,6 +18,9 @@ _LOGGER, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, MAX_NUM_THREADS, ) @@ -86,6 +89,10 @@ def __init__( _LOGGER.info("Using Fallback Sink") sock_path = FALLBACK_SINK_SOCK_PATH server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH + elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK: + _LOGGER.info("Using On Success Sink") + sock_path = ON_SUCCESS_SINK_SOCK_PATH + server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/packages/pynumaflow/pynumaflow/sinker/server.py b/packages/pynumaflow/pynumaflow/sinker/server.py index dc3a4788..842c1725 100644 --- a/packages/pynumaflow/pynumaflow/sinker/server.py +++ b/packages/pynumaflow/pynumaflow/sinker/server.py @@ -14,6 +14,9 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, MAX_NUM_THREADS, ) @@ -82,6 +85,10 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses: _LOGGER.info("Using Fallback Sink") sock_path = FALLBACK_SINK_SOCK_PATH server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH + elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK: + _LOGGER.info("Using On Success Sink") + sock_path = ON_SUCCESS_SINK_SOCK_PATH + server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py index 465240b4..feee3d0f 100644 --- a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py +++ b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py @@ -1,6 +1,8 @@ +from typing import Optional + from pynumaflow._metadata import _user_and_system_metadata_from_proto from pynumaflow.proto.sinker import sink_pb2 -from pynumaflow.sinker._dtypes import Response, Datum, Responses +from pynumaflow.sinker._dtypes import Response, Datum, Responses, Message def build_sink_resp_results(responses: Responses) -> list[sink_pb2.SinkResponse.Result]: @@ -32,12 +34,33 @@ def build_sink_response(rspn: Response) -> sink_pb2.SinkResponse.Result: return sink_pb2.SinkResponse.Result(id=rid, status=sink_pb2.Status.SUCCESS) elif rspn.fallback: return sink_pb2.SinkResponse.Result(id=rid, status=sink_pb2.Status.FALLBACK) + elif rspn.on_success: + return sink_pb2.SinkResponse.Result( + id=rid, + status=sink_pb2.Status.ON_SUCCESS, + on_success_msg=build_on_success_message(rspn.on_success_msg), + ) else: return sink_pb2.SinkResponse.Result( id=rid, status=sink_pb2.Status.FAILURE, err_msg=rspn.err ) +def build_on_success_message( + msg: Optional[Message], +) -> Optional[sink_pb2.SinkResponse.Result.Message]: + if msg is None: + return None + + metadata = msg.user_metadata._to_proto() if msg.user_metadata is not None else None + + return sink_pb2.SinkResponse.Result.Message( + keys=msg.keys, + value=msg.value, + metadata=metadata, + ) + + def datum_from_sink_req(d: sink_pb2.SinkRequest) -> Datum: """ Convert a SinkRequest object to a Datum object. diff --git a/packages/pynumaflow/tests/sink/test_async_sink.py b/packages/pynumaflow/tests/sink/test_async_sink.py index c3d91fe5..cedc6241 100644 --- a/packages/pynumaflow/tests/sink/test_async_sink.py +++ b/packages/pynumaflow/tests/sink/test_async_sink.py @@ -15,12 +15,15 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, ) from pynumaflow.proto.common import metadata_pb2 from pynumaflow.sinker import ( Datum, ) -from pynumaflow.sinker import Responses, Response +from pynumaflow.sinker import Responses, Response, Message, UserMetadata from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2 from pynumaflow.sinker.async_server import SinkAsyncServer from tests.sink.test_server import ( @@ -41,6 +44,12 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: raise ValueError("test_mock_err_message") elif msg.value.decode("utf-8") == "test_mock_fallback_message": responses.append(Response.as_fallback(msg.id)) + elif msg.value.decode("utf-8") == "test_mock_on_success1_message": + responses.append(Response.as_on_success(msg.id, None)) + elif msg.value.decode("utf-8") == "test_mock_on_success2_message": + responses.append( + Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata())) + ) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") @@ -59,6 +68,12 @@ def start_sink_streaming_request(_id: str, req_type) -> (Datum, tuple): if req_type == "fallback": value = mock_fallback_message() + if req_type == "on_success1": + value = b"test_mock_on_success1_message" + + if req_type == "on_success2": + value = b"test_mock_on_success2_message" + request = sink_pb2.SinkRequest.Request( value=value, event_time=event_time_timestamp, @@ -259,6 +274,72 @@ def test_sink_fallback(self) -> None: except grpc.RpcError as e: logging.error(e) + def test_sink_on_success1(self) -> None: + stub = self.__stub() + grpc_exception = None + try: + generator_response = stub.SinkFn( + request_iterator=request_generator(count=10, req_type="on_success1", session=1) + ) + handshake = next(generator_response) + # assert that handshake response is received. + self.assertTrue(handshake.handshake.sot) + + data_resp = [] + for r in generator_response: + data_resp.append(r) + + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + + idx = 0 + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.ON_SUCCESS) + idx += 1 + # EOT Response + self.assertEqual(data_resp[1].status.eot, True) + + except grpc.RpcError as e: + logging.error(e) + grpc_exception = e + + self.assertIsNone(grpc_exception) + + def test_sink_on_success2(self) -> None: + stub = self.__stub() + grpc_exception = None + try: + generator_response = stub.SinkFn( + request_iterator=request_generator(count=10, req_type="on_success2", session=1) + ) + handshake = next(generator_response) + # assert that handshake response is received. + self.assertTrue(handshake.handshake.sot) + + data_resp = [] + for r in generator_response: + data_resp.append(r) + + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + + idx = 0 + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.ON_SUCCESS) + idx += 1 + # EOT Response + self.assertEqual(data_resp[1].status.eot, True) + + except grpc.RpcError as e: + logging.error(e) + grpc_exception = e + + self.assertIsNone(grpc_exception) + def __stub(self): return sink_pb2_grpc.SinkStub(_channel) @@ -272,6 +353,12 @@ def test_start_fallback_sink(self): self.assertEqual(server.sock_path, f"unix://{FALLBACK_SINK_SOCK_PATH}") self.assertEqual(server.server_info_file, FALLBACK_SINK_SERVER_INFO_FILE_PATH) + @mockenv(NUMAFLOW_UD_CONTAINER_TYPE=UD_CONTAINER_ON_SUCCESS_SINK) + def test_start_on_success_sink(self): + server = SinkAsyncServer(sinker_instance=udsink_handler) + self.assertEqual(server.sock_path, f"unix://{ON_SUCCESS_SINK_SOCK_PATH}") + self.assertEqual(server.server_info_file, ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH) + def test_max_threads(self): # max cap at 16 server = SinkAsyncServer(sinker_instance=udsink_handler, max_threads=32) diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index 118570d5..73a11cf7 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -1,7 +1,7 @@ import unittest from collections.abc import Iterator -from pynumaflow.sinker import Response, Responses, Sinker, Datum +from pynumaflow.sinker import Response, Responses, Sinker, Datum, Message, UserMetadata class TestResponse(unittest.TestCase): @@ -18,6 +18,12 @@ def test_as_fallback(self): self.assertFalse(_response.success) self.assertTrue(_response.fallback) + def test_as_on_success(self): + _response = Response.as_on_success("5", Message(b"value", ["key"], UserMetadata())) + self.assertFalse(_response.success) + self.assertFalse(_response.fallback) + self.assertTrue(_response.on_success) + class TestResponses(unittest.TestCase): def setUp(self) -> None: @@ -29,7 +35,9 @@ def setUp(self) -> None: def test_responses(self): self.resps.append(Response.as_success("4")) - self.assertEqual(4, len(self.resps)) + self.resps.append(Response.as_on_success("6", Message(b"value", ["key"], UserMetadata()))) + self.resps.append(Response.as_on_success("7", None)) + self.assertEqual(6, len(self.resps)) for resp in self.resps: self.assertIsInstance(resp, Response) @@ -38,12 +46,23 @@ def test_responses(self): self.assertEqual(self.resps[1].id, "3") self.assertEqual(self.resps[2].id, "5") self.assertEqual(self.resps[3].id, "4") + self.assertEqual(self.resps[4].id, "6") + self.assertEqual(self.resps[5].id, "7") self.assertEqual( - "[Response(id='2', success=True, err=None, fallback=False), " - "Response(id='3', success=False, err='RuntimeError encountered!', fallback=False), " - "Response(id='5', success=False, err=None, fallback=True), " - "Response(id='4', success=True, err=None, fallback=False)]", + "[Response(id='2', success=True, err=None, fallback=False, " + "on_success=False, on_success_msg=None), " + "Response(id='3', success=False, err='RuntimeError encountered!', " + "fallback=False, on_success=False, on_success_msg=None), " + "Response(id='5', success=False, err=None, fallback=True, " + "on_success=False, on_success_msg=None), " + "Response(id='4', success=True, err=None, fallback=False, " + "on_success=False, on_success_msg=None), " + "Response(id='6', success=False, err=None, fallback=False, " + "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', " + "_user_metadata=UserMetadata(_data={}))), " + "Response(id='7', success=False, err=None, fallback=False, " + "on_success=True, on_success_msg=None)]", repr(self.resps), ) diff --git a/packages/pynumaflow/tests/sink/test_server.py b/packages/pynumaflow/tests/sink/test_server.py index 8baf9c96..9e517b16 100644 --- a/packages/pynumaflow/tests/sink/test_server.py +++ b/packages/pynumaflow/tests/sink/test_server.py @@ -14,10 +14,13 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, ) from pynumaflow.proto.common import metadata_pb2 from pynumaflow.proto.sinker import sink_pb2 -from pynumaflow.sinker import Responses, Datum, Response, SinkServer +from pynumaflow.sinker import Responses, Datum, Response, SinkServer, Message, UserMetadata from tests.testing_utils import mock_terminate_on_stop @@ -32,6 +35,12 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses: results.append(Response.as_failure(msg.id, "mock sink message error")) elif "fallback" in msg.value.decode("utf-8"): results.append(Response.as_fallback(msg.id)) + elif "on_success1" in msg.value.decode("utf-8"): + results.append(Response.as_on_success(msg.id, None)) + elif "on_success2" in msg.value.decode("utf-8"): + results.append( + Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata())) + ) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") @@ -230,6 +239,24 @@ def test_forward_message(self): metadata=self.metadata, ) ), + sink_pb2.SinkRequest( + request=sink_pb2.SinkRequest.Request( + id="test_id_2", + value=b"test_mock_on_success1_message", + event_time=event_time_timestamp, + watermark=watermark_timestamp, + metadata=self.metadata, + ) + ), + sink_pb2.SinkRequest( + request=sink_pb2.SinkRequest.Request( + id="test_id_3", + value=b"test_mock_on_success2_message", + event_time=event_time_timestamp, + watermark=watermark_timestamp, + metadata=self.metadata, + ) + ), sink_pb2.SinkRequest(status=sink_pb2.TransmissionStatus(eot=True)), ] @@ -258,9 +285,13 @@ def test_forward_message(self): # first message should be handshake response self.assertTrue(responses[0].handshake.sot) + self.assertEqual(4, len(responses[1].results)) + # assert the values for the corresponding messages self.assertEqual("test_id_0", responses[1].results[0].id) self.assertEqual("test_id_1", responses[1].results[1].id) + self.assertEqual("test_id_2", responses[1].results[2].id) + self.assertEqual("test_id_3", responses[1].results[3].id) self.assertEqual(responses[1].results[0].status, sink_pb2.Status.SUCCESS) self.assertEqual(responses[1].results[1].status, sink_pb2.Status.FAILURE) self.assertEqual("", responses[1].results[0].err_msg) @@ -282,6 +313,12 @@ def test_start_fallback_sink(self): self.assertEqual(server.sock_path, f"unix://{FALLBACK_SINK_SOCK_PATH}") self.assertEqual(server.server_info_file, FALLBACK_SINK_SERVER_INFO_FILE_PATH) + @mockenv(NUMAFLOW_UD_CONTAINER_TYPE=UD_CONTAINER_ON_SUCCESS_SINK) + def test_start_on_success_sink(self): + server = SinkServer(sinker_instance=udsink_handler) + self.assertEqual(server.sock_path, f"unix://{ON_SUCCESS_SINK_SOCK_PATH}") + self.assertEqual(server.server_info_file, ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH) + def test_max_threads(self): # max cap at 16 server = SinkServer(sinker_instance=udsink_handler, max_threads=32)