-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathkafka_resolver.py
More file actions
36 lines (34 loc) · 907 Bytes
/
kafka_resolver.py
File metadata and controls
36 lines (34 loc) · 907 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pydantic import BaseModel
from datasources import kafka
from chalk.streams import stream
# Pydantic models define the schema of the messages on the stream.
class TransactionMessage(BaseModel):
id: int
user_id: int
timestamp: datetime
vendor: str
description: str
amount: float
country: str
is_overdraft: bool
@stream(source=kafka)
def stream_resolver(message: TransactionMessage) -> Features[
Transaction.id,
Transaction.user_id,
Transaction.timestamp,
Transaction.vendor,
Transaction.description,
Transaction.amount,
Transaction.country,
Transaction.is_overdraft
]:
return Transaction(
id=message.id,
user_id=message.user_id
ts=message.timestamp,
vendor=message.vendor,
description=message.description,
amount=message.amount,
country=message.country,
is_overdraft=message.is_overdraft
)