Skip to content

Commit 3fe79fe

Browse files
Adding docstring
1 parent e9a4d21 commit 3fe79fe

File tree

14 files changed

+766
-139
lines changed

14 files changed

+766
-139
lines changed

aws_lambda_powertools/utilities/kafka_consumer/consumer_records.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ def key(self) -> Any:
2727
key = self.get("key")
2828
if key and (self.deserialize and self.deserialize.key_schema_type):
2929
deserializer = get_deserializer(
30-
self.deserialize.value_schema_type,
31-
self.deserialize.value_schema_str,
30+
self.deserialize.key_schema_type,
31+
self.deserialize.key_schema_str,
3232
)
3333
deserialized_key = deserializer.deserialize(key)
3434

aws_lambda_powertools/utilities/kafka_consumer/deserializer/avro.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,65 @@
88

99
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.base import DeserializerBase
1010
from aws_lambda_powertools.utilities.kafka_consumer.exceptions import (
11-
KafkaConsumerAvroMissingSchemaError,
11+
KafkaConsumerAvroSchemaParserError,
1212
KafkaConsumerDeserializationError,
1313
)
1414

1515

1616
class AvroDeserializer(DeserializerBase):
17+
"""
18+
Deserializer for Apache Avro formatted data.
19+
20+
This class provides functionality to deserialize Avro binary data using
21+
a provided Avro schema definition.
22+
"""
23+
1724
def __init__(self, schema_str: str):
18-
if not schema_str:
19-
raise KafkaConsumerAvroMissingSchemaError("Schema string must be provided for Avro deserialization")
20-
self.parsed_schema = parse_schema(schema_str)
21-
self.reader = DatumReader(self.parsed_schema)
25+
try:
26+
self.parsed_schema = parse_schema(schema_str)
27+
self.reader = DatumReader(self.parsed_schema)
28+
except Exception as e:
29+
raise KafkaConsumerAvroSchemaParserError(
30+
f"Invalid Avro schema. Please ensure the provided avro schema is valid: {type(e).__name__}: {str(e)}",
31+
) from e
2232

2333
def deserialize(self, data: bytes | str) -> dict[str, Any]:
34+
"""
35+
Deserialize Avro binary data to a Python dictionary.
36+
37+
Parameters
38+
----------
39+
data : bytes or str
40+
The Avro binary data to deserialize. If provided as a string,
41+
it will be decoded to bytes first.
42+
43+
Returns
44+
-------
45+
dict[str, Any]
46+
Deserialized data as a dictionary.
47+
48+
Raises
49+
------
50+
KafkaConsumerDeserializationError
51+
When the data cannot be deserialized according to the schema,
52+
typically due to data format incompatibility.
53+
54+
Examples
55+
--------
56+
>>> deserializer = AvroDeserializer(schema_str)
57+
>>> avro_data = b'...' # binary Avro data
58+
>>> try:
59+
... result = deserializer.deserialize(avro_data)
60+
... # Process the deserialized data
61+
... except KafkaConsumerDeserializationError as e:
62+
... print(f"Failed to deserialize: {e}")
63+
"""
2464
try:
2565
value = self._decode_input(data)
2666
bytes_reader = io.BytesIO(value)
2767
decoder = BinaryDecoder(bytes_reader)
2868
return self.reader.read(decoder)
29-
except (TypeError, ValueError) as e:
69+
except Exception as e:
3070
raise KafkaConsumerDeserializationError(
31-
f"Avro deserialization error: {type(e).__name__}: {str(e)}",
71+
f"Error trying to deserializer avro data - {type(e).__name__}: {str(e)}",
3272
) from e

aws_lambda_powertools/utilities/kafka_consumer/deserializer/base.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,47 @@
66

77

88
class DeserializerBase(ABC):
9+
"""
10+
Abstract base class for deserializers.
11+
12+
This class defines the interface for all deserializers in the Kafka consumer utility
13+
and provides a common method for decoding input data.
14+
15+
Methods
16+
-------
17+
deserialize(data)
18+
Abstract method that must be implemented by subclasses to deserialize data.
19+
_decode_input(data)
20+
Helper method to decode input data to bytes.
21+
22+
Examples
23+
--------
24+
>>> class MyDeserializer(DeserializerBase):
25+
... def deserialize(self, data: bytes | str) -> dict[str, Any]:
26+
... value = self._decode_input(data)
27+
... # Custom deserialization logic here
28+
... return {"key": "value"}
29+
"""
30+
931
@abstractmethod
1032
def deserialize(self, data: bytes | str) -> dict[str, Any]:
11-
pass
33+
"""
34+
Deserialize input data to a Python dictionary.
35+
36+
This abstract method must be implemented by subclasses to provide
37+
specific deserialization logic.
38+
39+
Parameters
40+
----------
41+
data : bytes or str
42+
The data to deserialize, either as bytes or as a string.
43+
44+
Returns
45+
-------
46+
dict[str, Any]
47+
The deserialized data as a dictionary.
48+
"""
49+
raise NotImplementedError("Subclasses must implement the deserialize method")
1250

1351
def _decode_input(self, data: bytes | str) -> bytes:
1452
if isinstance(data, str):

aws_lambda_powertools/utilities/kafka_consumer/deserializer/deserializer.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,59 @@
22

33
from typing import TYPE_CHECKING, Any
44

5+
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.json import JsonDeserializer
6+
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.no_op import NoOpDeserializer
7+
58
if TYPE_CHECKING:
69
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.base import DeserializerBase
710

811

9-
def get_deserializer(schema_type: str, schema_value: Any) -> DeserializerBase:
12+
def get_deserializer(schema_type: str | object, schema_value: Any) -> DeserializerBase:
13+
"""
14+
Factory function to get the appropriate deserializer based on schema type.
15+
16+
This function creates and returns a deserializer instance that corresponds to the
17+
specified schema type. It handles lazy imports for optional dependencies.
18+
19+
Parameters
20+
----------
21+
schema_type : str
22+
The type of schema to use for deserialization.
23+
Supported values are: "AVRO", "PROTOBUF", "JSON", or any other value for no-op.
24+
schema_value : Any
25+
The schema definition to use for deserialization. The format depends on the
26+
schema_type:
27+
- For "AVRO": A string containing the Avro schema definition
28+
- For "PROTOBUF": A object containing the Protobuf schema definition
29+
- For "JSON": Not used (can be None)
30+
- For other types: Not used (can be None)
31+
32+
Returns
33+
-------
34+
DeserializerBase
35+
An instance of a deserializer that implements the DeserializerBase interface.
36+
37+
Examples
38+
--------
39+
>>> # Get an Avro deserializer
40+
>>> avro_schema = '''
41+
... {
42+
... "type": "record",
43+
... "name": "User",
44+
... "fields": [
45+
... {"name": "name", "type": "string"},
46+
... {"name": "age", "type": "int"}
47+
... ]
48+
... }
49+
... '''
50+
>>> deserializer = get_deserializer("AVRO", avro_schema)
51+
>>>
52+
>>> # Get a JSON deserializer
53+
>>> json_deserializer = get_deserializer("JSON", None)
54+
>>>
55+
>>> # Get a no-op deserializer for raw data
56+
>>> no_op_deserializer = get_deserializer("RAW", None)
57+
"""
1058
if schema_type == "AVRO":
1159
# Import here to avoid dependency if not used
1260
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.avro import AvroDeserializer
@@ -18,9 +66,6 @@ def get_deserializer(schema_type: str, schema_value: Any) -> DeserializerBase:
1866

1967
return ProtobufDeserializer(schema_value)
2068
elif schema_type == "JSON":
21-
# Import here to avoid dependency if not used
22-
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.json import JsonDeserializer
23-
2469
return JsonDeserializer()
25-
else:
26-
raise ValueError(f"Invalid schema_type: {schema_type}")
70+
71+
return NoOpDeserializer()

aws_lambda_powertools/utilities/kafka_consumer/deserializer/json.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,43 @@
77

88

99
class JsonDeserializer(DeserializerBase):
10+
"""
11+
Deserializer for JSON formatted data.
12+
13+
This class provides functionality to deserialize JSON data from bytes or string
14+
into Python dictionaries.
15+
"""
16+
1017
def deserialize(self, data: bytes | str) -> dict:
18+
"""
19+
Deserialize JSON data to a Python dictionary.
20+
21+
Parameters
22+
----------
23+
data : bytes or str
24+
The JSON data to deserialize. If provided as bytes, it will be decoded as UTF-8.
25+
If provided as a string, it's assumed to be base64-encoded and will be decoded first.
26+
27+
Returns
28+
-------
29+
dict
30+
Deserialized data as a dictionary.
31+
32+
Raises
33+
------
34+
KafkaConsumerDeserializationError
35+
When the data cannot be deserialized as valid JSON.
36+
37+
Examples
38+
--------
39+
>>> deserializer = JsonDeserializer()
40+
>>> json_data = '{"key": "value", "number": 123}'
41+
>>> try:
42+
... result = deserializer.deserialize(json_data)
43+
... print(result["key"]) # Output: value
44+
... except KafkaConsumerDeserializationError as e:
45+
... print(f"Failed to deserialize: {e}")
46+
"""
1147
try:
1248
value = self._decode_input(data)
1349
return json.loads(value.decode("utf-8"))
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.base import DeserializerBase
6+
7+
8+
class NoOpDeserializer(DeserializerBase):
9+
"""
10+
A pass-through deserializer that performs no transformation on the input data.
11+
12+
This deserializer simply returns the input data unchanged, which is useful when
13+
no deserialization is needed or when handling raw data formats.
14+
"""
15+
16+
def deserialize(self, data: bytes | str) -> dict[str, Any]:
17+
"""
18+
Return the input data unchanged.
19+
20+
This method implements the deserialize interface but performs no transformation,
21+
simply returning the input data as-is.
22+
23+
Parameters
24+
----------
25+
data : bytes or str
26+
The input data to "deserialize".
27+
28+
Returns
29+
-------
30+
dict[str, Any]
31+
The input data unchanged. Note that despite the type annotation,
32+
this method returns the exact same object that was passed in,
33+
preserving its original type.
34+
35+
Example
36+
--------
37+
>>> deserializer = NoOpDeserializer()
38+
>>>
39+
>>> # With string input
40+
>>> string_data = "Hello, world!"
41+
>>> result = deserializer.deserialize(string_data)
42+
>>> print(result == string_data) # Output: True
43+
>>>
44+
>>> # With bytes input
45+
>>> bytes_data = b"Binary data"
46+
>>> result = deserializer.deserialize(bytes_data)
47+
>>> print(result == bytes_data) # Output: True
48+
"""
49+
return data

aws_lambda_powertools/utilities/kafka_consumer/deserializer/protobuf.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,49 @@
1111

1212

1313
class ProtobufDeserializer(DeserializerBase):
14+
"""
15+
Deserializer for Protocol Buffer formatted data.
16+
17+
This class provides functionality to deserialize Protocol Buffer binary data
18+
into Python dictionaries using the provided Protocol Buffer message class.
19+
"""
20+
1421
def __init__(self, message_class: Any):
1522
self.message_class = message_class
1623

1724
def deserialize(self, data: bytes | str) -> dict:
25+
"""
26+
Deserialize Protocol Buffer binary data to a Python dictionary.
27+
28+
Parameters
29+
----------
30+
data : bytes or str
31+
The Protocol Buffer binary data to deserialize. If provided as a string,
32+
it's assumed to be base64-encoded and will be decoded first.
33+
34+
Returns
35+
-------
36+
dict
37+
Deserialized data as a dictionary with field names preserved from the
38+
Protocol Buffer definition.
39+
40+
Raises
41+
------
42+
KafkaConsumerDeserializationError
43+
When the data cannot be deserialized according to the message class,
44+
typically due to data format incompatibility or incorrect message class.
45+
46+
Example
47+
--------
48+
>>> # Assuming proper protobuf setup
49+
>>> deserializer = ProtobufDeserializer(my_proto_module.MyMessage)
50+
>>> proto_data = b'...' # binary protobuf data
51+
>>> try:
52+
... result = deserializer.deserialize(proto_data)
53+
... # Process the deserialized dictionary
54+
... except KafkaConsumerDeserializationError as e:
55+
... print(f"Failed to deserialize: {e}")
56+
"""
1857
try:
1958
value = self._decode_input(data)
2059
message = self.message_class()
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
class KafkaConsumerAvroSchemaMismatchError(Exception):
1+
class KafkaConsumerAvroSchemaParserError(Exception):
22
"""
3-
Avro schema mismatch
3+
Error raised when parsing Avro schema definition fails.
44
"""
55

66

77
class KafkaConsumerDeserializationError(Exception):
88
"""
9-
Avro schema impossible to deserialize
9+
Error raised when message deserialization fails.
1010
"""
1111

1212

13-
class KafkaConsumerAvroMissingSchemaError(Exception):
13+
class KafkaConsumerMissingSchemaError(Exception):
1414
"""
15-
Avro schema mismatch
15+
Error raised when a required schema is not provided.
1616
"""

0 commit comments

Comments
 (0)