Skip to content

Commit e53834f

Browse files
committed
feat: example of integration with dataclasses-avroschema added
1 parent d928b39 commit e53834f

File tree

5 files changed

+40
-6
lines changed

5 files changed

+40
-6
lines changed

faust-project/Dockerfile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,3 @@ RUN pip3 install -e .
2121
ENTRYPOINT ["./wait_for_services.sh"]
2222

2323
CMD ["./run.sh", "${WORKER}", "${WORKER_PORT}", "${CONFIG_CLASS}"]
24-
25-

faust-project/example/codecs/avro.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from simple_settings import settings
55

6+
from example.users.models import AdvanceUserModel
67

78
# Initialize Schema Registry Client
89
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)
@@ -19,6 +20,14 @@
1920

2021
avro_user_serializer = FaustSerializer(client, "users", avro_user_schema)
2122

23+
# example of how to use it with dataclasses-avroschema
24+
avro_advance_user_serializer = FaustSerializer(
25+
client, "advance_users", AdvanceUserModel.avro_schema())
26+
2227

2328
def avro_user_codec():
2429
return avro_user_serializer
30+
31+
32+
def avro_advance_user_codec():
33+
return avro_advance_user_serializer

faust-project/example/users/agents.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import logging
22

33
from example.app import app
4-
from example.codecs.avro import avro_user_serializer
5-
from example.users.models import UserModel
4+
from example.codecs.avro import avro_user_serializer, avro_advance_user_serializer
5+
from example.users.models import UserModel, AdvanceUserModel
66

77
users_topic = app.topic('avro_users', partitions=1, value_type=UserModel)
8+
advance_users_topic = app.topic('advance_avro_users', partitions=1, value_type=AdvanceUserModel)
89

910
logger = logging.getLogger(__name__)
1011

@@ -20,4 +21,18 @@ async def users(users):
2021
async def publish_users():
2122
logger.info('PUBLISHING ON LEADER FOR USERS APP!')
2223
user = {"first_name": "foo", "last_name": "bar"}
23-
await users.send(value=user, value_serializer=avro_user_serializer)
24+
await users_topic.send(value=user, value_serializer=avro_user_serializer)
25+
26+
27+
@app.agent(advance_users_topic)
28+
async def advance_users(users):
29+
async for user in users:
30+
logger.info("Event received in topic avro_users")
31+
logger.info(f"First Name: {user.first_name}, last name {user.last_name} age {user.age}")
32+
33+
34+
@app.timer(5.0, on_leader=True)
35+
async def advance_publish_users():
36+
logger.info('PUBLISHING ON LEADER FOR USERS APP!')
37+
user = {"first_name": "foo", "last_name": "bar", "age": 20}
38+
await advance_users_topic.send(value=user, value_serializer=avro_advance_user_serializer)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import faust
22

3+
from dataclasses_avroschema import AvroModel
4+
35

46
class UserModel(faust.Record, serializer='avro_users'):
57
first_name: str
68
last_name: str
9+
10+
11+
class AdvanceUserModel(faust.Record, AvroModel, serializer='avro_advance_users'):
12+
first_name: str
13+
last_name: str
14+
age: int

faust-project/setup.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
requires = [
44
"colorlog>=3.1.4",
5-
"simple-settings",
5+
"dataclasses-avroschema",
66
"faust",
77
"python-schema-registry-client",
8+
"yarl<1.6.0,>=1.0",
9+
"multidict<5.0,>=4.5",
10+
"simple-settings",
811
"typing-extensions",
912
]
1013

@@ -34,6 +37,7 @@
3437
],
3538
'faust.codecs': [
3639
'avro_users = example.codecs.avro:avro_user_codec',
40+
'avro_advance_users = example.codecs.avro:avro_advance_user_codec',
3741
],
3842
},
3943
)

0 commit comments

Comments
 (0)