Skip to content

Commit 3720c95

Browse files
authored
chore: Refactor SDK to split into separate servers (#105)
Signed-off-by: Sidhant Kohli <sidhant_kohli@intuit.com>
1 parent b7c130f commit 3720c95

File tree

167 files changed

+5142
-3561
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+5142
-3561
lines changed

.codecov.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ coverage:
1111

1212
ignore:
1313
- "examples/"
14-
- "pynumaflow/function/proto/*"
15-
- "pynumaflow/sink/proto/*"
16-
- "pynumaflow/function/_udfunction_pb2.pyi"
14+
- "pynumaflow/mapper/proto/*"
15+
- "pynumaflow/sinker/proto/*"
16+
- "pynumaflow/mapstreamer/proto/*"
17+
- "pynumaflow/reducer/proto/*"
18+
- "pynumaflow/sourcetransformer/proto/*"
19+
- "pynumaflow/map/_udfunction_pb2.pyi"
1720
- "pynumaflow/sink/_udsink_pb2.pyi"

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ setup:
2929
proto:
3030
python3 -m grpc_tools.protoc -I=pynumaflow/function/proto --python_out=pynumaflow/function/proto --grpc_python_out=pynumaflow/function/proto pynumaflow/function/proto/*.proto
3131
python3 -m grpc_tools.protoc -I=pynumaflow/sink/proto --python_out=pynumaflow/sink/proto --grpc_python_out=pynumaflow/sink/proto pynumaflow/sink/proto/*.proto
32+
python3 -m grpc_tools.protoc -I=pynumaflow/map/proto --python_out=pynumaflow/map/proto --grpc_python_out=pynumaflow/map/proto pynumaflow/map/proto/*.proto
33+
python3 -m grpc_tools.protoc -I=pynumaflow/mapstream/proto --python_out=pynumaflow/mapstream/proto --grpc_python_out=pynumaflow/mapstream/proto pynumaflow/mapstream/proto/*.proto
34+
python3 -m grpc_tools.protoc -I=pynumaflow/reduce/proto --python_out=pynumaflow/reduce/proto --grpc_python_out=pynumaflow/reduce/proto pynumaflow/reduce/proto/*.proto
35+
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransform/proto --python_out=pynumaflow/sourcetransform/proto --grpc_python_out=pynumaflow/sourcetransform/proto pynumaflow/sourcetransform/proto/*.proto
36+
3237
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py

README.md

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pre-commit install
4646
### Map
4747

4848
```python
49-
from pynumaflow.function import Messages, Message, Datum, Server
49+
from pynumaflow.mapper import Messages, Message, Datum, Mapper
5050
5151
5252
def my_handler(keys: list[str], datum: Datum) -> Messages:
@@ -57,28 +57,28 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:
5757
5858
5959
if __name__ == "__main__":
60-
grpc_server = Server(map_handler=my_handler)
60+
grpc_server = Mapper(handler=my_handler)
6161
grpc_server.start()
6262
```
63-
### MapT - Map with event time assignment capability
64-
In addition to the regular Map function, MapT supports assigning a new event time to the message.
65-
MapT is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
63+
### SourceTransformer - Map with event time assignment capability
64+
In addition to the regular Map function, SourceTransformer supports assigning a new event time to the message.
65+
SourceTransformer is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
6666
6767
```python
6868
from datetime import datetime
69-
from pynumaflow.function import MessageTs, MessageT, Datum, Server
69+
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer
7070
7171
72-
def mapt_handler(keys: list[str], datum: Datum) -> MessageTs:
72+
def transform_handler(keys: list[str], datum: Datum) -> Messages:
7373
val = datum.value
7474
new_event_time = datetime.now()
7575
_ = datum.watermark
76-
message_t_s = MessageTs(MessageT(val, event_time=new_event_time, keys=keys))
76+
message_t_s = Messages(Message(val, event_time=new_event_time, keys=keys))
7777
return message_t_s
7878
7979
8080
if __name__ == "__main__":
81-
grpc_server = Server(mapt_handler=mapt_handler)
81+
grpc_server = SourceTransformer(handler=transform_handler)
8282
grpc_server.start()
8383
```
8484
@@ -87,11 +87,11 @@ if __name__ == "__main__":
8787
```python
8888
import aiorun
8989
from typing import Iterator, List
90-
from pynumaflow.function import Messages, Message, Datum, Metadata, AsyncServer
90+
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer
9191
9292
9393
async def my_handler(
94-
keys: List[str], datums: Iterator[Datum], md: Metadata
94+
keys: List[str], datums: Iterator[Datum], md: Metadata
9595
) -> Messages:
9696
interval_window = md.interval_window
9797
counter = 0
@@ -105,19 +105,19 @@ async def my_handler(
105105
106106
107107
if __name__ == "__main__":
108-
grpc_server = AsyncServer(reduce_handler=my_handler)
108+
grpc_server = AsyncReducer(handler=my_handler)
109109
aiorun.run(grpc_server.start())
110110
```
111111
112112
### Sample Image
113-
A sample UDF [Dockerfile](examples/function/forward_message/Dockerfile) is provided
114-
under [examples](examples/function/forward_message).
113+
A sample UDF [Dockerfile](examples/map/forward_message/Dockerfile) is provided
114+
under [examples](examples/map/forward_message).
115115
116116
## Implement a User Defined Sink (UDSink)
117117
118118
```python
119119
from typing import Iterator
120-
from pynumaflow.sink import Datum, Responses, Response, Sink
120+
from pynumaflow.sinker import Datum, Responses, Response, Sinker
121121
122122
123123
def my_handler(datums: Iterator[Datum]) -> Responses:
@@ -129,26 +129,11 @@ def my_handler(datums: Iterator[Datum]) -> Responses:
129129
130130
131131
if __name__ == "__main__":
132-
grpc_server = Sink(my_handler)
132+
grpc_server = Sinker(my_handler)
133133
grpc_server.start()
134134
```
135135
136136
### Sample Image
137137
138138
A sample UDSink [Dockerfile](examples/sink/log/Dockerfile) is provided
139-
under [examples](examples/sink/log).
140-
141-
### Datum Metadata
142-
The Datum object contains the message payload and metadata. Currently, there are two fields
143-
in metadata: the message ID, the message delivery count to indicate how many times the message
144-
has been delivered. You can use these metadata to implement customized logic. For example,
145-
```python
146-
...
147-
148-
149-
def my_handler(keys: list[str], datum: Datum) -> Messages:
150-
num_delivered = datum.metadata.num_delivered
151-
# Choose to do specific actions, if the message delivery count reaches a certain threshold.
152-
if num_delivered > 3:
153-
...
154-
```
139+
under [examples](examples/sink/log).
Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,15 @@
11
import aiorun
22
from collections.abc import Iterator
33

4-
from pynumaflow.function import (
4+
from pynumaflow.reducer import (
55
Messages,
66
Message,
77
Datum,
88
Metadata,
9-
AsyncServer,
9+
AsyncReducer,
1010
)
1111

1212

13-
async def map_handler(keys: list[str], datum: Datum) -> Messages:
14-
# forward a message
15-
val = datum.value
16-
_ = datum.event_time
17-
_ = datum.watermark
18-
messages = Messages()
19-
messages.append(Message.to_vtx(keys, val))
20-
return messages
21-
22-
2313
async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) -> Messages:
2414
# count the number of events
2515
interval_window = md.interval_window
@@ -31,9 +21,9 @@ async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) ->
3121
f"counter:{counter} interval_window_start:{interval_window.start} "
3222
f"interval_window_end:{interval_window.end}"
3323
)
34-
return Messages(Message.to_vtx(keys, str.encode(msg)))
24+
return Messages(Message(keys=keys, value=str.encode(msg)))
3525

3626

3727
if __name__ == "__main__":
38-
grpc_server = AsyncServer(map_handler=map_handler, reduce_handler=my_handler)
28+
grpc_server = AsyncReducer(handler=my_handler)
3929
aiorun.run(grpc_server.start())

examples/function/flatmap/Dockerfile

Lines changed: 0 additions & 54 deletions
This file was deleted.

examples/function/flatmap_stream/Dockerfile

Lines changed: 0 additions & 54 deletions
This file was deleted.

examples/function/forward_message/Dockerfile

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)