Skip to content

Commit 243d973

Browse files
Fix mypy stuff
1 parent 217f2a1 commit 243d973

File tree

12 files changed

+63
-58
lines changed

12 files changed

+63
-58
lines changed

aws_lambda_powertools/utilities/data_classes/kafka_event.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from collections.abc import Iterator
1111

1212

13-
class KafkaEventBase(DictWrapper):
13+
class KafkaEventRecordBase(DictWrapper):
1414
@property
1515
def topic(self) -> str:
1616
"""The Kafka topic."""
@@ -37,7 +37,7 @@ def timestamp_type(self) -> str:
3737
return self["timestampType"]
3838

3939

40-
class KafkaEventRecord(KafkaEventBase):
40+
class KafkaEventRecord(KafkaEventRecordBase):
4141
@property
4242
def key(self) -> str | None:
4343
"""
@@ -85,18 +85,7 @@ def decoded_headers(self) -> dict[str, bytes]:
8585
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items())
8686

8787

88-
class KafkaEvent(DictWrapper):
89-
"""Self-managed or MSK Apache Kafka event trigger
90-
Documentation:
91-
--------------
92-
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
93-
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
94-
"""
95-
96-
def __init__(self, data: dict[str, Any]):
97-
super().__init__(data)
98-
self._records: Iterator[KafkaEventRecord] | None = None
99-
88+
class KafkaEventBase(DictWrapper):
10089
@property
10190
def event_source(self) -> str:
10291
"""The AWS service from which the Kafka event record originated."""
@@ -117,6 +106,19 @@ def decoded_bootstrap_servers(self) -> list[str]:
117106
"""The decoded Kafka bootstrap URL."""
118107
return self.bootstrap_servers.split(",")
119108

109+
110+
class KafkaEvent(KafkaEventBase):
111+
"""Self-managed or MSK Apache Kafka event trigger
112+
Documentation:
113+
--------------
114+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
115+
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
116+
"""
117+
118+
def __init__(self, data: dict[str, Any]):
119+
super().__init__(data)
120+
self._records: Iterator[KafkaEventRecord] | None = None
121+
120122
@property
121123
def records(self) -> Iterator[KafkaEventRecord]:
122124
"""The Kafka records."""

aws_lambda_powertools/utilities/kafka_consumer/consumer_records.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22

3+
from functools import cached_property
34
from typing import TYPE_CHECKING, Any
45

56
from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict
6-
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEvent, KafkaEventBase
7+
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventBase, KafkaEventRecordBase
78
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.deserializer import get_deserializer
89
from aws_lambda_powertools.utilities.kafka_consumer.serialization.serialization import serialize_to_output_type
910

@@ -13,49 +14,49 @@
1314
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
1415

1516

16-
class ConsumerRecordRecords(KafkaEventBase):
17+
class ConsumerRecordRecords(KafkaEventRecordBase):
1718
"""
1819
A Kafka Consumer Record
1920
"""
2021

21-
def __init__(self, data: dict[str, Any], deserialize: SchemaConfig | None = None):
22+
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
2223
super().__init__(data)
23-
self.deserialize = deserialize
24+
self.schema_config = schema_config
2425

25-
@property
26+
@cached_property
2627
def key(self) -> Any:
2728
key = self.get("key")
28-
if key and (self.deserialize and self.deserialize.key_schema_type):
29+
if key and (self.schema_config and self.schema_config.key_schema_type):
2930
deserializer = get_deserializer(
30-
self.deserialize.key_schema_type,
31-
self.deserialize.key_schema_str,
31+
self.schema_config.key_schema_type,
32+
self.schema_config.key_schema_str,
3233
)
3334
deserialized_key = deserializer.deserialize(key)
3435

35-
if self.deserialize.key_output_serializer:
36+
if self.schema_config.key_output_serializer:
3637
return serialize_to_output_type(
3738
deserialized_key,
38-
self.deserialize.key_output_serializer,
39+
self.schema_config.key_output_serializer,
3940
)
4041

4142
return deserialized_key
4243

4344
return key
4445

45-
@property
46+
@cached_property
4647
def value(self) -> Any:
4748
value = self["value"]
48-
if value and (self.deserialize and self.deserialize.value_schema_type):
49+
if value and (self.schema_config and self.schema_config.value_schema_type):
4950
deserializer = get_deserializer(
50-
self.deserialize.value_schema_type,
51-
self.deserialize.value_schema_str,
51+
self.schema_config.value_schema_type,
52+
self.schema_config.value_schema_str,
5253
)
5354
deserialized_value = deserializer.deserialize(value)
5455

55-
if self.deserialize.value_output_serializer:
56+
if self.schema_config.value_output_serializer:
5657
return serialize_to_output_type(
5758
deserialized_value,
58-
self.deserialize.value_output_serializer,
59+
self.schema_config.value_output_serializer,
5960
)
6061

6162
return deserialized_value
@@ -80,35 +81,35 @@ def original_key(self) -> str | None:
8081
return self.get("key")
8182

8283
@property
83-
def headers(self) -> list[dict[str, list[int]]]:
84+
def original_headers(self) -> list[dict[str, list[int]]]:
8485
"""The raw Kafka record headers."""
85-
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items())
86+
return self["headers"]
8687

87-
@property
88-
def original_headers(self) -> dict[str, bytes]:
88+
@cached_property
89+
def headers(self) -> dict[str, bytes]:
8990
"""Decodes the headers as a single dictionary."""
90-
return self["headers"]
91+
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.original_headers for k, v in chunk.items())
9192

9293

93-
class ConsumerRecords(KafkaEvent):
94+
class ConsumerRecords(KafkaEventBase):
9495
"""Self-managed or MSK Apache Kafka event trigger
9596
Documentation:
9697
--------------
9798
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
9899
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
99100
"""
100101

101-
def __init__(self, data: dict[str, Any], deserialize: SchemaConfig | None = None):
102+
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
102103
super().__init__(data)
103104
self._records: Iterator[ConsumerRecordRecords] | None = None
104-
self.deserialize = deserialize
105+
self.schema_config = schema_config
105106

106107
@property
107108
def records(self) -> Iterator[ConsumerRecordRecords]:
108109
"""The Kafka records."""
109110
for chunk in self["records"].values():
110111
for record in chunk:
111-
yield ConsumerRecordRecords(data=record, deserialize=self.deserialize)
112+
yield ConsumerRecordRecords(data=record, schema_config=self.schema_config)
112113

113114
@property
114115
def record(self) -> ConsumerRecordRecords:

aws_lambda_powertools/utilities/kafka_consumer/deserializer/avro.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, schema_str: str):
3030
f"Invalid Avro schema. Please ensure the provided avro schema is valid: {type(e).__name__}: {str(e)}",
3131
) from e
3232

33-
def deserialize(self, data: bytes | str) -> dict[str, Any]:
33+
def deserialize(self, data: bytes | str) -> object:
3434
"""
3535
Deserialize Avro binary data to a Python dictionary.
3636

aws_lambda_powertools/utilities/kafka_consumer/deserializer/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import base64
44
from abc import ABC, abstractmethod
5-
from typing import Any
5+
from typing import Any, overload
66

77

88
class DeserializerBase(ABC):
@@ -29,7 +29,7 @@ class DeserializerBase(ABC):
2929
"""
3030

3131
@abstractmethod
32-
def deserialize(self, data: bytes | str) -> dict[str, Any]:
32+
def deserialize(self, data: bytes | str) -> dict[str, Any] | str | object:
3333
"""
3434
Deserialize input data to a Python dictionary.
3535

aws_lambda_powertools/utilities/kafka_consumer/deserializer/default.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class DefaultDeserializer(DeserializerBase):
1414
no customized deserialization is needed or when handling raw data formats.
1515
"""
1616

17-
def deserialize(self, data: bytes | str) -> dict[str, Any]:
17+
def deserialize(self, data: bytes | str) -> str:
1818
"""
1919
Return the input data base64 decoded.
2020

aws_lambda_powertools/utilities/kafka_consumer/deserializer/json.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ def deserialize(self, data: bytes | str) -> dict:
4646
... print(f"Failed to deserialize: {e}")
4747
"""
4848
try:
49-
value = self._decode_input(data)
50-
return json.loads(base64.b64decode(value).decode("utf-8"))
49+
return json.loads(base64.b64decode(data).decode("utf-8"))
5150
except Exception as e:
5251
raise KafkaConsumerDeserializationError(
5352
f"JSON deserialization error: {type(e).__name__}: {str(e)}",

aws_lambda_powertools/utilities/kafka_consumer/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,9 @@ class KafkaConsumerMissingSchemaError(Exception):
1414
"""
1515
Error raised when a required schema is not provided.
1616
"""
17+
18+
19+
class KafkaConsumerOutputSerializerError(Exception):
20+
"""
21+
Error raised when output serializer fails.
22+
"""

aws_lambda_powertools/utilities/kafka_consumer/schema_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ class SchemaConfig:
5757

5858
def __init__(
5959
self,
60-
value_schema_type: Literal["AVRO", "PROTOBUF", "JSON"] = None,
60+
value_schema_type: Literal["AVRO", "PROTOBUF", "JSON"] | None = None,
6161
value_schema: str | None = None,
6262
value_output_serializer: Any | None = None,
63-
key_schema_type: Literal["AVRO", "PROTOBUF", "JSON", None] = None,
63+
key_schema_type: Literal["AVRO", "PROTOBUF", "JSON", None] | None = None,
6464
key_schema: str | None = None,
6565
key_output_serializer: Any | None = None,
6666
):

aws_lambda_powertools/utilities/kafka_consumer/serialization/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ def serialize(self, data: dict[str, Any], output_class: type[T] | None = None) -
5151
An instance of output_class if provided, otherwise a processed dictionary.
5252
The generic type T represents the type of the output_class.
5353
"""
54-
pass
54+
raise NotImplementedError("Subclasses must implement this method")

aws_lambda_powertools/utilities/kafka_consumer/serialization/custom_dict.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any
44

5+
from aws_lambda_powertools.utilities.kafka_consumer.exceptions import KafkaConsumerOutputSerializerError
56
from aws_lambda_powertools.utilities.kafka_consumer.serialization.base import OutputSerializerBase
67

78
if TYPE_CHECKING:
@@ -13,14 +14,11 @@ def serialize(self, data: dict[str, Any], output_class: type[T] | None = None) -
1314
if output_class is None:
1415
return data
1516

16-
if not hasattr(output_class, "to_dict") and not hasattr(output_class, "from_dict"):
17-
raise ValueError("Output class must have to_dict or from_dict method")
18-
19-
if hasattr(output_class, "from_dict"):
20-
return output_class.from_dict(data)
17+
if not hasattr(output_class, "to_dict"):
18+
raise KafkaConsumerOutputSerializerError("The output serialization class must have to_dict method")
2119

2220
# Instantiate and then populate
23-
instance = output_class()
21+
instance = output_class
2422
for key, value in data.items():
2523
setattr(instance, key, value)
2624
return instance

0 commit comments

Comments
 (0)