Skip to content

Commit c3c3f19

Browse files
authored
feat: metadata implementation for source, map and sink (#274)
Signed-off-by: Sreekanth <[email protected]>
1 parent 83eeb23 commit c3c3f19

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1430
-246
lines changed

packages/pynumaflow/Makefile

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ setup:
2626
poetry install --with dev --no-root
2727

2828
proto:
29-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sinker=pynumaflow/proto/sinker --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sinker/*.proto
30-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/mapper=pynumaflow/proto/mapper --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/mapper/*.proto
31-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/reducer=pynumaflow/proto/reducer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/reducer/*.proto
32-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcetransformer=pynumaflow/proto/sourcetransformer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcetransformer/*.proto
33-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sideinput=pynumaflow/proto/sideinput --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sideinput/*.proto
34-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcer=pynumaflow/proto/sourcer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcer/*.proto
35-
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/accumulator=pynumaflow/proto/accumulator --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/accumulator/*.proto
29+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/common/*.proto
30+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sinker=pynumaflow/proto/sinker -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sinker/*.proto
31+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/mapper=pynumaflow/proto/mapper -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/mapper/*.proto
32+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/reducer=pynumaflow/proto/reducer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/reducer/*.proto
33+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcetransformer=pynumaflow/proto/sourcetransformer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcetransformer/*.proto
34+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sideinput=pynumaflow/proto/sideinput -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sideinput/*.proto
35+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcer=pynumaflow/proto/sourcer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcer/*.proto
36+
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/accumulator=pynumaflow/proto/accumulator -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/accumulator/*.proto

packages/pynumaflow/poetry.lock

Lines changed: 102 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from dataclasses import dataclass, field
2+
from typing import Optional
3+
from pynumaflow.proto.common import metadata_pb2
4+
5+
"""
6+
Metadata provides per-message metadata passed between vertices.
7+
8+
A vertex could create one or more set of key-value pairs per group-name.
9+
This is required because a vertex could forward a message to a
10+
Kafka sink with Kafka headers, and a metrics Sink with some key/value points.
11+
12+
There will be two kinds of metadata,
13+
14+
system - generated by the system, exposed as read-only to UDFs
15+
user - user generated with read-write access
16+
|
17+
| +-> [group-m] -> {k1:v1, ... }
18+
| |
19+
+-> [user] +-> [group-n] -> {k1:v1, ... }
20+
| |
21+
| +-> [group-o] -> {k1:v1, ... }
22+
|
23+
| +-> [group-h] -> {k1:v1, ... }
24+
| |
25+
+-> [sys] +-> [group-i] -> {k1:v1, ... }
26+
|
27+
"""
28+
29+
30+
@dataclass
31+
class SystemMetadata:
32+
"""
33+
System metadata is the mapping of group name to key-value pairs for a given group.
34+
System metadata wraps the system-generated metadata groups per message. It is read-only to UDFs.
35+
"""
36+
37+
_data: dict[str, dict[str, bytes]] = field(default_factory=dict)
38+
39+
def groups(self) -> list[str]:
40+
"""
41+
Returns the list of group names for the system metadata.
42+
"""
43+
return list(self._data.keys())
44+
45+
def keys(self, group: str) -> list[str]:
46+
"""
47+
Returns the list of keys for a given group.
48+
"""
49+
return list(self._data.get(group, {}).keys())
50+
51+
def value(self, group: str, key: str) -> Optional[bytes]:
52+
"""
53+
Returns the value for a given group and key.
54+
"""
55+
return self._data.get(group, {}).get(key)
56+
57+
58+
@dataclass
59+
class UserMetadata:
60+
"""
61+
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
62+
"""
63+
64+
_data: dict[str, dict[str, bytes]] = field(default_factory=dict)
65+
66+
def groups(self) -> list[str]:
67+
"""
68+
Returns the list of group names for the user metadata.
69+
"""
70+
return list(self._data.keys())
71+
72+
def keys(self, group: str) -> list[str]:
73+
"""
74+
Returns the list of keys for a given group.
75+
"""
76+
keys = self._data.get(group) or {}
77+
return list(keys.keys())
78+
79+
def __contains__(self, group: str) -> bool:
80+
"""
81+
Returns True if the group exists.
82+
"""
83+
return group in self._data
84+
85+
def __getitem__(self, group: str) -> dict[str, bytes]:
86+
"""
87+
Returns the data for a given group.
88+
Raises KeyError if the group does not exist.
89+
"""
90+
return self._data[group]
91+
92+
def __setitem__(self, group: str, data: dict[str, bytes]):
93+
"""
94+
Sets the data for a given group.
95+
"""
96+
self._data[group] = data
97+
98+
def __delitem__(self, group: str):
99+
"""
100+
Removes the group and all its keys and values.
101+
Raises KeyError if the group does not exist.
102+
"""
103+
del self._data[group]
104+
105+
def __len__(self) -> int:
106+
"""
107+
Returns the number of groups.
108+
"""
109+
return len(self._data)
110+
111+
def value(self, group: str, key: str) -> Optional[bytes]:
112+
"""
113+
Returns the value for a given group and key.
114+
If the group or key does not exist, returns None.
115+
"""
116+
value = self._data.get(group)
117+
if value is None:
118+
return None
119+
return value.get(key)
120+
121+
def add_key(self, group: str, key: str, value: bytes):
122+
"""
123+
Adds the value for a given group and key.
124+
"""
125+
self._data.setdefault(group, {})[key] = value
126+
127+
def remove_key(self, group: str, key: str) -> Optional[bytes]:
128+
"""
129+
Removes the key and its value for a given group and returns the value.
130+
If this key is the only key in the group, the group will be removed.
131+
Returns None if the group or key does not exist.
132+
"""
133+
group_data = self._data.pop(group, None)
134+
if group_data is None:
135+
return None
136+
value = group_data.pop(key, None)
137+
if group_data:
138+
self._data[group] = group_data
139+
return value
140+
141+
def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
142+
"""
143+
Removes the group and all its keys and values and returns the data.
144+
Returns None if the group does not exist.
145+
"""
146+
return self._data.pop(group, None)
147+
148+
def clear(self):
149+
"""
150+
Clears all the groups and all their keys and values.
151+
"""
152+
self._data.clear()
153+
154+
def _to_proto(self) -> metadata_pb2.Metadata:
155+
return metadata_pb2.Metadata(
156+
user_metadata={
157+
group: metadata_pb2.KeyValueGroup(key_value=value)
158+
for group, value in self._data.items()
159+
},
160+
)
161+
162+
163+
def _user_and_system_metadata_from_proto(
164+
proto: metadata_pb2.Metadata,
165+
) -> tuple[UserMetadata, SystemMetadata]:
166+
"""
167+
Converts the protobuf metadata to the UserMetadata and SystemMetadata objects.
168+
"""
169+
user_metadata = {group: dict(kv.key_value) for group, kv in proto.user_metadata.items()}
170+
system_metadata = {group: dict(kv.key_value) for group, kv in proto.sys_metadata.items()}
171+
return UserMetadata(user_metadata), SystemMetadata(system_metadata)

packages/pynumaflow/pynumaflow/mapper/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pynumaflow.mapper.sync_server import MapServer
44

55
from pynumaflow.mapper._dtypes import Message, Messages, Datum, DROP, Mapper
6+
from pynumaflow._metadata import UserMetadata, SystemMetadata
67

78
__all__ = [
89
"Message",
@@ -13,4 +14,6 @@
1314
"MapServer",
1415
"MapAsyncServer",
1516
"MapMultiprocServer",
17+
"UserMetadata",
18+
"SystemMetadata",
1619
]

0 commit comments

Comments
 (0)