Skip to content

Commit 921b7c0

Browse files
authored
feat: batch-map implementation (#177)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent 32a9d26 commit 921b7c0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1770
-90
lines changed

.github/workflows/build-push.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ jobs:
2020
example_directories: [
2121
"examples/map/even_odd", "examples/map/flatmap", "examples/map/forward_message",
2222
"examples/map/multiproc_map", "examples/mapstream/flatmap_stream", "examples/reduce/counter",
23-
"examples/reducestream/counter", "examples/reducestream/sum", "examples/sideinput/simple_sideinput",
24-
"examples/sideinput/simple_sideinput/udf", "examples/sink/async_log", "examples/sink/log",
25-
"examples/source/async_source", "examples/source/simple_source", "examples/sourcetransform/event_time_filter"
23+
"examples/reducestream/counter", "examples/reducestream/sum", "examples/sideinput/simple-sideinput",
24+
"examples/sideinput/simple-sideinput/udf", "examples/sink/async_log", "examples/sink/log",
25+
"examples/source/async-source", "examples/source/simple-source", "examples/sourcetransform/event_time_filter",
26+
"examples/batchmap/flatmap"
2627
]
2728

2829
steps:

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ proto:
3434
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
3535
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
3636
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto
37+
python3 -m grpc_tools.protoc -I=pynumaflow/proto/batchmapper --python_out=pynumaflow/proto/batchmapper --grpc_python_out=pynumaflow/proto/batchmapper pynumaflow/proto/batchmapper/*.proto
3738

3839

3940
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pre-commit install
4747
- [Map](https://github.com/numaproj/numaflow-python/tree/main/examples/map)
4848
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
4949
- [Map Stream](https://github.com/numaproj/numaflow-python/tree/main/examples/mapstream)
50+
- [Batch Map](https://github.com/numaproj/numaflow-python/tree/main/examples/batchmap)
5051
- [Implement User Defined Sinks](https://github.com/numaproj/numaflow-python/tree/main/examples/sink)
5152
- [Implement User Defined SideInputs](https://github.com/numaproj/numaflow-python/tree/main/examples/sideinput)
5253

@@ -95,7 +96,7 @@ This could be an alternative to creating multiple replicas of the same UDF conta
9596
9697
Thus this server type is useful for UDFs which are CPU intensive.
9798
```
98-
grpc_server = MapMultiProcServer(handler)
99+
grpc_server = MapMultiProcServer(mapper_instance=handler, server_count=2)
99100
```
100101
101102
#### Currently Supported Server Types for each functionality
@@ -111,6 +112,8 @@ These are the class names for the server types supported by each of the function
111112
- ReduceAsyncServer
112113
- MapStream
113114
- MapStreamAsyncServer
115+
- BatchMap
116+
- BatchMapAsyncServer
114117
- Source Transform
115118
- SourceTransformServer
116119
- SourceTransformMultiProcServer
@@ -147,6 +150,8 @@ The list of base handler classes for each of the functionalities is given below
147150
- MapStreamer
148151
- Source Transform
149152
- SourceTransformer
153+
- Batch Map
154+
- BatchMapper
150155
- UDSource
151156
- Sourcer
152157
- UDSink

examples/batchmap/README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
## BatchMap Interface
2+
The BatchMap interface allows developers to
3+
process multiple data items together in a single UDF handler.
4+
5+
6+
### What is BatchMap?
7+
BatchMap is an interface that allows developers to process multiple data items
8+
in a UDF single call, rather than each item in separate calls.
9+
10+
11+
The BatchMap interface can be helpful in scenarios
12+
where performing operations on a group of data can be more efficient.
13+
14+
15+
### Understanding the User Interface
16+
The BatchMap interface requires developers to implement a handler with a specific signature.
17+
Here is the signature of the BatchMap handler:
18+
19+
```python
20+
async def handler(datums: AsyncIterable[Datum]) -> BatchResponses:
21+
```
22+
The handler takes an iterable of `Datum` objects and returns
23+
`BatchResponses`.
24+
The `BatchResponses` object is a list of the *same length* as the input
25+
datums, with each item corresponding to the response for one request datum.
26+
27+
To clarify, let's say we have three data items:
28+
29+
```
30+
data_1 = {"name": "John", "age": 25}
31+
data_2 = {"name": "Jane", "age": 30}
32+
data_3 = {"name": "Bob", "age": 45}
33+
```
34+
35+
These data items will be grouped together by numaflow and
36+
passed to the handler as an iterable:
37+
38+
```python
39+
result = await handler([data_1, data_2, data_3])
40+
```
41+
42+
The result will be a BatchResponses object, which is a list of responses corresponding to each input data item's processing.
43+
44+
### Important Considerations
45+
When using BatchMap, there are a few important considerations to keep in mind:
46+
47+
- Ensure that the `BatchResponses` object is tagged with the *correct request ID*.
48+
Each Datum has a unique ID tag, which will be used by Numaflow to ensure correctness.
49+
50+
```python
51+
async for datum in datums:
52+
batch_response = BatchResponse.from_id(datum.id)
53+
```
54+
55+
56+
- Ensure that the length of the `BatchResponses`
57+
list is equal to the number of requests received.
58+
**This means that for every input data item**, there should be a corresponding
59+
response in the BatchResponses list.
60+
61+
Use batch processing only when it makes sense. In some
62+
scenarios, batch processing may not be the most
63+
efficient approach, and processing data items one by one
64+
could be a better option.
65+
The burden of concurrent processing of the data will rely on the
66+
UDF implementation in this use case.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup"
18+
19+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/batchmap/flatmap"
20+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
21+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
22+
23+
RUN apt-get update \
24+
&& apt-get install --no-install-recommends -y \
25+
curl \
26+
wget \
27+
# deps for building python deps
28+
build-essential \
29+
&& apt-get install -y git \
30+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
31+
\
32+
# install dumb-init
33+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
34+
&& chmod +x /dumb-init \
35+
&& curl -sSL https://install.python-poetry.org | python3 -
36+
37+
####################################################################################################
38+
# udf: used for running the udf vertices
39+
####################################################################################################
40+
FROM builder AS udf
41+
42+
WORKDIR $PYSETUP_PATH
43+
COPY ./ ./
44+
45+
WORKDIR $EXAMPLE_PATH
46+
RUN poetry install --no-cache --no-root && \
47+
rm -rf ~/.cache/pypoetry/
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
53+
54+
EXPOSE 5000

examples/batchmap/flatmap/Makefile

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/batch-map-flatmap:${TAG}
4+
DOCKER_FILE_PATH = examples/batchmap/flatmap/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

examples/batchmap/flatmap/entry.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from collections.abc import AsyncIterable
2+
3+
from pynumaflow.batchmapper import (
4+
Message,
5+
Datum,
6+
BatchMapper,
7+
BatchMapAsyncServer,
8+
BatchResponses,
9+
BatchResponse,
10+
)
11+
12+
13+
class Flatmap(BatchMapper):
14+
"""
15+
This is a class that inherits from the BatchMapper class.
16+
It implements a flatmap operation over a batch of input messages
17+
"""
18+
19+
async def handler(
20+
self,
21+
datums: AsyncIterable[Datum],
22+
) -> BatchResponses:
23+
batch_responses = BatchResponses()
24+
async for datum in datums:
25+
val = datum.value
26+
_ = datum.event_time
27+
_ = datum.watermark
28+
strs = val.decode("utf-8").split(",")
29+
batch_response = BatchResponse.from_id(datum.id)
30+
if len(strs) == 0:
31+
batch_response.append(Message.to_drop())
32+
else:
33+
for s in strs:
34+
batch_response.append(Message(str.encode(s)))
35+
batch_responses.append(batch_response)
36+
37+
return batch_responses
38+
39+
40+
if __name__ == "__main__":
41+
"""
42+
This example shows how to use the Batch Map Flatmap.
43+
We use a class as handler, but a function can be used as well.
44+
"""
45+
grpc_server = BatchMapAsyncServer(Flatmap())
46+
grpc_server.start()
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: flatmap
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
# A self data generating source
10+
generator:
11+
rpu: 500
12+
duration: 1s
13+
- name: batch-flatmap
14+
partitions: 2
15+
metadata:
16+
annotations:
17+
numaflow.numaproj.io/batch-map: "true"
18+
scale:
19+
min: 1
20+
udf:
21+
container:
22+
image: quay.io/numaio/numaflow-python/batch-map-flatmap:stable
23+
imagePullPolicy: Always
24+
- name: sink
25+
scale:
26+
min: 1
27+
sink:
28+
log: {}
29+
edges:
30+
- from: in
31+
to: batch-flatmap
32+
- from: batch-flatmap
33+
to: sink
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "batch-map-flatmap"
3+
version = "0.1.0"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = { path = "../../../"}
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"

0 commit comments

Comments
 (0)