Skip to content

Commit de595ad

Browse files
authored
feat: source using rust sdk (#283)
Signed-off-by: Vigith Maurice <[email protected]>
1 parent d9c9192 commit de595ad

File tree

12 files changed

+1236
-4
lines changed

12 files changed

+1236
-4
lines changed

packages/pynumaflow-lite/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,7 @@ path = "tests/bin/accumulator.rs"
5151
[[bin]]
5252
name = "test_sink"
5353
path = "tests/bin/sink.rs"
54+
55+
[[bin]]
56+
name = "test_source"
57+
path = "tests/bin/source.rs"

packages/pynumaflow-lite/pynumaflow_lite/__init__.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,25 @@
3333
except Exception: # pragma: no cover
3434
accumulator = None
3535

36-
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, and Accumulator classes under the extension submodules for convenient access
36+
try:
37+
sinker = _import_module(__name__ + ".sinker")
38+
except Exception: # pragma: no cover
39+
sinker = None
40+
41+
try:
42+
sourcer = _import_module(__name__ + ".sourcer")
43+
except Exception: # pragma: no cover
44+
sourcer = None
45+
46+
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
3747
from ._map_dtypes import Mapper
3848
from ._batchmapper_dtypes import BatchMapper
3949
from ._mapstream_dtypes import MapStreamer
4050
from ._reduce_dtypes import Reducer
4151
from ._session_reduce_dtypes import SessionReducer
4252
from ._accumulator_dtypes import Accumulator
53+
from ._sink_dtypes import Sinker
54+
from ._source_dtypes import Sourcer
4355

4456
if mapper is not None:
4557
try:
@@ -77,8 +89,20 @@
7789
except Exception:
7890
pass
7991

92+
if sinker is not None:
93+
try:
94+
setattr(sinker, "Sinker", Sinker)
95+
except Exception:
96+
pass
97+
98+
if sourcer is not None:
99+
try:
100+
setattr(sourcer, "Sourcer", Sourcer)
101+
except Exception:
102+
pass
103+
80104
# Public API
81-
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "accumulator"]
105+
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "accumulator", "sinker", "sourcer"]
82106

83107
__doc__ = pynumaflow_lite.__doc__
84108
if hasattr(pynumaflow_lite, "__all__"):

packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ from . import reducer as reducer
99
from . import session_reducer as session_reducer
1010
from . import accumulator as accumulator
1111
from . import sinker as sinker
12+
from . import sourcer as sourcer
1213

13-
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker']
14+
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker', 'sourcer']
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from abc import ABCMeta, abstractmethod
2+
from collections.abc import AsyncIterator
3+
from pynumaflow_lite.sourcer import (
4+
Message,
5+
Offset,
6+
ReadRequest,
7+
AckRequest,
8+
NackRequest,
9+
PendingResponse,
10+
PartitionsResponse,
11+
)
12+
13+
14+
class Sourcer(metaclass=ABCMeta):
15+
"""
16+
Provides an interface to write a User Defined Source.
17+
18+
A Sourcer must implement the following handlers:
19+
- read_handler: Read messages from the source
20+
- ack_handler: Acknowledge processed messages
21+
- pending_handler: Return the number of pending messages
22+
- partitions_handler: Return the partitions this source handles
23+
24+
Optionally, you can implement:
25+
- nack_handler: Negatively acknowledge messages (default: no-op)
26+
"""
27+
28+
def __call__(self, *args, **kwargs):
29+
"""
30+
This allows to execute the handler function directly if
31+
class instance is sent as a callable.
32+
"""
33+
return self.read_handler(*args, **kwargs)
34+
35+
@abstractmethod
36+
async def read_handler(self, request: ReadRequest) -> AsyncIterator[Message]:
37+
"""
38+
Read messages from the source.
39+
40+
Args:
41+
request: ReadRequest containing num_records and timeout
42+
43+
Yields:
44+
Message: Messages to be sent to the next vertex
45+
46+
Example:
47+
async def read_handler(self, request: ReadRequest) -> AsyncIterator[Message]:
48+
for i in range(request.num_records):
49+
yield Message(
50+
payload=f"message-{i}".encode(),
51+
offset=Offset(str(i).encode(), partition_id=0),
52+
event_time=datetime.now(),
53+
keys=["key1"],
54+
headers={"x-txn-id": str(uuid.uuid4())}
55+
)
56+
"""
57+
pass
58+
59+
@abstractmethod
60+
async def ack_handler(self, request: AckRequest) -> None:
61+
"""
62+
Acknowledge that messages have been processed.
63+
64+
Args:
65+
request: AckRequest containing the list of offsets to acknowledge
66+
67+
Example:
68+
async def ack_handler(self, request: AckRequest) -> None:
69+
for offset in request.offsets:
70+
# Remove from pending set, mark as processed, etc.
71+
self.pending_offsets.remove(offset.offset)
72+
"""
73+
pass
74+
75+
@abstractmethod
76+
async def pending_handler(self) -> PendingResponse:
77+
"""
78+
Return the number of pending messages yet to be processed.
79+
80+
Returns:
81+
PendingResponse: Response containing the count of pending messages.
82+
Return count=-1 if the source doesn't support detecting backlog.
83+
84+
Example:
85+
async def pending_handler(self) -> PendingResponse:
86+
return PendingResponse(count=len(self.pending_offsets))
87+
"""
88+
pass
89+
90+
@abstractmethod
91+
async def partitions_handler(self) -> PartitionsResponse:
92+
"""
93+
Return the partitions associated with this source.
94+
95+
This is used by the platform to determine the partitions to which
96+
the watermark should be published. If your source doesn't have the
97+
concept of partitions, return the replica ID.
98+
99+
Returns:
100+
PartitionsResponse: Response containing the list of partition IDs
101+
102+
Example:
103+
async def partitions_handler(self) -> PartitionsResponse:
104+
return PartitionsResponse(partitions=[self.partition_id])
105+
"""
106+
pass
107+
108+
async def nack_handler(self, request: NackRequest) -> None:
109+
"""
110+
Negatively acknowledge messages (optional).
111+
112+
This is called when messages could not be processed and should be
113+
retried or handled differently. Default implementation is a no-op.
114+
115+
Args:
116+
request: NackRequest containing the list of offsets to nack
117+
118+
Example:
119+
async def nack_handler(self, request: NackRequest) -> None:
120+
for offset in request.offsets:
121+
# Add back to pending, mark for retry, etc.
122+
self.nacked_offsets.add(offset.offset)
123+
"""
124+
pass
125+
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from __future__ import annotations
2+
3+
from typing import Optional, List, Dict, Callable, Awaitable, Any
4+
import datetime as _dt
5+
6+
# Re-export the Python ABC for user convenience and typing
7+
from ._source_dtypes import Sourcer as Sourcer
8+
9+
10+
class Message:
11+
"""A message to be sent from the source."""
12+
payload: bytes
13+
offset: Offset
14+
event_time: _dt.datetime
15+
keys: List[str]
16+
headers: Dict[str, str]
17+
18+
def __init__(
19+
self,
20+
payload: bytes,
21+
offset: Offset,
22+
event_time: _dt.datetime,
23+
keys: Optional[List[str]] = ...,
24+
headers: Optional[Dict[str, str]] = ...,
25+
) -> None: ...
26+
27+
def __repr__(self) -> str: ...
28+
29+
def __str__(self) -> str: ...
30+
31+
32+
class Offset:
33+
"""The offset of a message."""
34+
offset: bytes
35+
partition_id: int
36+
37+
def __init__(
38+
self,
39+
offset: bytes,
40+
partition_id: int = ...,
41+
) -> None: ...
42+
43+
def __repr__(self) -> str: ...
44+
45+
def __str__(self) -> str: ...
46+
47+
48+
class ReadRequest:
49+
"""A request to read messages from the source."""
50+
num_records: int
51+
timeout_ms: int
52+
53+
def __init__(
54+
self,
55+
num_records: int,
56+
timeout_ms: int = ...,
57+
) -> None: ...
58+
59+
def __repr__(self) -> str: ...
60+
61+
62+
class AckRequest:
63+
"""A request to acknowledge messages."""
64+
offsets: List[Offset]
65+
66+
def __init__(
67+
self,
68+
offsets: List[Offset],
69+
) -> None: ...
70+
71+
def __repr__(self) -> str: ...
72+
73+
74+
class NackRequest:
75+
"""A request to negatively acknowledge messages."""
76+
offsets: List[Offset]
77+
78+
def __init__(
79+
self,
80+
offsets: List[Offset],
81+
) -> None: ...
82+
83+
def __repr__(self) -> str: ...
84+
85+
86+
class PendingResponse:
87+
"""Response for pending messages count."""
88+
count: int
89+
90+
def __init__(
91+
self,
92+
count: int = ...,
93+
) -> None: ...
94+
95+
def __repr__(self) -> str: ...
96+
97+
98+
class PartitionsResponse:
99+
"""Response for partitions."""
100+
partitions: List[int]
101+
102+
def __init__(
103+
self,
104+
partitions: List[int],
105+
) -> None: ...
106+
107+
def __repr__(self) -> str: ...
108+
109+
110+
class SourceAsyncServer:
111+
"""Async Source Server that can be started from Python code."""
112+
113+
def __init__(
114+
self,
115+
sock_file: str | None = ...,
116+
info_file: str | None = ...,
117+
) -> None: ...
118+
119+
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
120+
121+
def stop(self) -> None: ...
122+
123+
124+
__all__ = [
125+
"Message",
126+
"Offset",
127+
"ReadRequest",
128+
"AckRequest",
129+
"NackRequest",
130+
"PendingResponse",
131+
"PartitionsResponse",
132+
"SourceAsyncServer",
133+
"Sourcer",
134+
]
135+

packages/pynumaflow-lite/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod pyrs;
77
pub mod reduce;
88
pub mod session_reduce;
99
pub mod sink;
10+
pub mod source;
1011

1112
use pyo3::prelude::*;
1213

@@ -59,6 +60,13 @@ fn sinker(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
5960
Ok(())
6061
}
6162

63+
/// Submodule: pynumaflow_lite.sourcer
64+
#[pymodule]
65+
fn sourcer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
66+
crate::source::populate_py_module(m)?;
67+
Ok(())
68+
}
69+
6270
/// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`.
6371
#[pymodule]
6472
fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
@@ -70,6 +78,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
7078
m.add_wrapped(pyo3::wrap_pymodule!(session_reducer))?;
7179
m.add_wrapped(pyo3::wrap_pymodule!(accumulator))?;
7280
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
81+
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
7382

7483
// Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access
7584
let binding = m.getattr("mapper")?;
@@ -134,5 +143,14 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
134143
.getattr("modules")?
135144
.set_item(fullname, &sub)?;
136145

146+
// Ensure it's importable as `pynumaflow_lite.sourcer` as well
147+
let binding = m.getattr("sourcer")?;
148+
let sub = binding.downcast::<PyModule>()?;
149+
let fullname = "pynumaflow_lite.sourcer";
150+
sub.setattr("__name__", fullname)?;
151+
py.import("sys")?
152+
.getattr("modules")?
153+
.set_item(fullname, &sub)?;
154+
137155
Ok(())
138156
}

packages/pynumaflow-lite/src/sink/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,4 +398,3 @@ pub(crate) fn populate_py_module(m: &Bound<PyModule>) -> PyResult<()> {
398398

399399
Ok(())
400400
}
401-

0 commit comments

Comments
 (0)