Skip to content

Commit f21fcb7

Browse files
authored
refactor(parser): Improve Kafka models with examples and descriptions (#7293)
Enhances the Kafka parser models with field descriptions and examples using Pydantic's Field() functionality. This improvement provides better documentation and metadata for Kafka event parsing, following the pattern established in PR #7100. All field descriptions are based on official AWS MSK and Kafka documentation and include realistic examples from actual test events. Closes #7118
1 parent 218184f commit f21fcb7

File tree

1 file changed

+92
-18
lines changed
  • aws_lambda_powertools/utilities/parser/models

1 file changed

+92
-18
lines changed

aws_lambda_powertools/utilities/parser/models/kafka.py

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,76 @@
11
from datetime import datetime
22
from typing import Dict, List, Literal, Optional, Type, Union
33

4-
from pydantic import BaseModel, field_validator
4+
from pydantic import BaseModel, Field, field_validator
55

66
from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string, decode_header_bytes
77

88
SERVERS_DELIMITER = ","
99

1010

1111
class KafkaRecordSchemaMetadata(BaseModel):
12-
dataFormat: str
13-
schemaId: str
12+
dataFormat: str = Field(
13+
description="The data format of the schema (e.g., AVRO, JSON).",
14+
examples=["AVRO", "JSON", "PROTOBUF"],
15+
)
16+
schemaId: str = Field(
17+
description="The unique identifier of the schema.",
18+
examples=["1234", "5678", "schema-abc-123"],
19+
)
1420

1521

1622
class KafkaRecordModel(BaseModel):
17-
topic: str
18-
partition: int
19-
offset: int
20-
timestamp: datetime
21-
timestampType: str
22-
key: Optional[bytes] = None
23-
value: Union[str, Type[BaseModel]]
24-
headers: List[Dict[str, bytes]]
25-
keySchemaMetadata: Optional[KafkaRecordSchemaMetadata] = None
26-
valueSchemaMetadata: Optional[KafkaRecordSchemaMetadata] = None
23+
topic: str = Field(
24+
description="The Kafka topic name from which the record originated.",
25+
examples=["mytopic", "user-events", "order-processing", "mymessage-with-unsigned"],
26+
)
27+
partition: int = Field(
28+
description="The partition number within the topic from which the record was consumed.",
29+
examples=[0, 1, 5, 10],
30+
)
31+
offset: int = Field(
32+
description="The offset of the record within the partition.",
33+
examples=[15, 123, 456789, 1000000],
34+
)
35+
timestamp: datetime = Field(
36+
description="The timestamp of the record.",
37+
examples=[1545084650987, 1640995200000, 1672531200000],
38+
)
39+
timestampType: str = Field(
40+
description="The type of timestamp (CREATE_TIME or LOG_APPEND_TIME).",
41+
examples=["CREATE_TIME", "LOG_APPEND_TIME"],
42+
)
43+
key: Optional[bytes] = Field(
44+
default=None,
45+
description="The message key, base64-encoded. Can be null for messages without keys.",
46+
examples=["cmVjb3JkS2V5", "dXNlci0xMjM=", "b3JkZXItNDU2", None],
47+
)
48+
value: Union[str, Type[BaseModel]] = Field(
49+
description="The message value, base64-encoded.",
50+
examples=[
51+
"eyJrZXkiOiJ2YWx1ZSJ9",
52+
"eyJtZXNzYWdlIjogIkhlbGxvIEthZmthIn0=",
53+
"eyJ1c2VyX2lkIjogMTIzLCAiYWN0aW9uIjogImxvZ2luIn0=",
54+
],
55+
)
56+
headers: List[Dict[str, bytes]] = Field(
57+
description="A list of message headers as key-value pairs with byte array values.",
58+
examples=[
59+
[{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
60+
[{"contentType": [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]}],
61+
[],
62+
],
63+
)
64+
keySchemaMetadata: Optional[KafkaRecordSchemaMetadata] = Field(
65+
default=None,
66+
description="Schema metadata for the message key when using schema registry.",
67+
examples=[{"dataFormat": "AVRO", "schemaId": "1234"}, None],
68+
)
69+
valueSchemaMetadata: Optional[KafkaRecordSchemaMetadata] = Field(
70+
default=None,
71+
description="Schema metadata for the message value when using schema registry.",
72+
examples=[{"dataFormat": "AVRO", "schemaId": "1234"}, None],
73+
)
2774

2875
# key is optional; only decode if not None
2976
@field_validator("key", mode="before")
@@ -44,8 +91,23 @@ def decode_headers_list(cls, value):
4491

4592

4693
class KafkaBaseEventModel(BaseModel):
47-
bootstrapServers: List[str]
48-
records: Dict[str, List[KafkaRecordModel]]
94+
bootstrapServers: List[str] = Field(
95+
description="A list of Kafka bootstrap servers (broker endpoints).",
96+
examples=[
97+
["b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092"],
98+
[
99+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
100+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
101+
],
102+
],
103+
)
104+
records: Dict[str, List[KafkaRecordModel]] = Field(
105+
description="A dictionary mapping topic-partition combinations to lists of Kafka records.",
106+
examples=[
107+
{"mytopic-0": [{"topic": "mytopic", "partition": 0, "offset": 15}]},
108+
{"user-events-1": [{"topic": "user-events", "partition": 1, "offset": 123}]},
109+
],
110+
)
49111

50112
@field_validator("bootstrapServers", mode="before")
51113
def split_servers(cls, value):
@@ -59,7 +121,10 @@ class KafkaSelfManagedEventModel(KafkaBaseEventModel):
59121
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
60122
"""
61123

62-
eventSource: Literal["SelfManagedKafka"]
124+
eventSource: Literal["SelfManagedKafka"] = Field(
125+
description="The event source identifier for self-managed Kafka.",
126+
examples=["SelfManagedKafka"],
127+
)
63128

64129

65130
class KafkaMskEventModel(KafkaBaseEventModel):
@@ -69,5 +134,14 @@ class KafkaMskEventModel(KafkaBaseEventModel):
69134
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
70135
"""
71136

72-
eventSource: Literal["aws:kafka"]
73-
eventSourceArn: str
137+
eventSource: Literal["aws:kafka"] = Field(
138+
description="The AWS service that invoked the function.",
139+
examples=["aws:kafka"],
140+
)
141+
eventSourceArn: str = Field(
142+
description="The Amazon Resource Name (ARN) of the MSK cluster.",
143+
examples=[
144+
"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
145+
"arn:aws:kafka:eu-central-1:123456789012:cluster/MyCluster/xyz789-1234-5678-90ab-cdef12345678-2",
146+
],
147+
)

0 commit comments

Comments
 (0)