Skip to content

Commit 51652e3

Browse files
authored
Set up type hinting: add missing types to Python files and C extension stubs (#2041)
* update * remove py.typed for now * update * fix cimply and add types to serde producer/consumer * admin * address feedback * add warning to stub and c files; admin typing more * add accidentally removed md files * fix merge conflicts in md files, add types to admin and serialization entrypoint init files * finish admin init * add types for AIO module * add types for SR rules * linter fix * address mypy complaints * revert some accidental doc change * fix some suggestions by copilot * linter * address comments * bug fix * more fix and add missing method * linter fix
1 parent b207e06 commit 51652e3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1390
-476
lines changed

CHANGELOG.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ confluent-kafka-python v2.8.2 is based on librdkafka v2.8.0, see the
134134
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.8.0)
135135
for a complete list of changes, enhancements, fixes and upgrade considerations.
136136

137-
Note: Versioning is skipped due to breaking change in v2.8.1.
137+
Note: Versioning is skipped due to breaking change in v2.8.1.
138138
Do not run software with v2.8.1 installed.
139139

140140

@@ -166,7 +166,7 @@ We apologize for the inconvenience and appreciate the feedback that we have gott
166166

167167
v2.6.2 is a feature release with the following features, fixes and enhancements:
168168

169-
Note: This release modifies the dependencies of the Schema Registry client.
169+
Note: This release modifies the dependencies of the Schema Registry client.
170170
If you are using the Schema Registry client, please ensure that you install the
171171
extra dependencies using the following syntax:
172172

@@ -246,15 +246,15 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
246246
## v2.5.0 - 2024-07-10
247247

248248
> [!WARNING]
249-
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
249+
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
250250
>
251251
> You won't face any problem if:
252252
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
253253
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
254254
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
255255
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
256256
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
257-
>
257+
>
258258
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.
259259
260260
v2.5.0 is a feature release with the following features, fixes and enhancements:
@@ -628,4 +628,3 @@ v1.5.0 is a maintenance release with the following fixes and enhancements:
628628
confluent-kafka-python is based on librdkafka v1.5.0, see the
629629
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.5.0)
630630
for a complete list of changes, enhancements, fixes and upgrade considerations.
631-

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ pip install confluent-kafka
276276

277277
# With Schema Registry support
278278
pip install "confluent-kafka[avro,schemaregistry]" # Avro
279-
pip install "confluent-kafka[json,schemaregistry]" # JSON Schema
279+
pip install "confluent-kafka[json,schemaregistry]" # JSON Schema
280280
pip install "confluent-kafka[protobuf,schemaregistry]" # Protobuf
281281

282282
# With Data Contract rules (includes CSFLE support)

src/confluent_kafka/__init__.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,20 @@ class ThrottleEvent(object):
9898
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
9999
"""
100100

101-
def __init__(self, broker_name, broker_id, throttle_time):
101+
def __init__(self, broker_name: str,
102+
broker_id: int,
103+
throttle_time: float) -> None:
102104
self.broker_name = broker_name
103105
self.broker_id = broker_id
104106
self.throttle_time = throttle_time
105107

106-
def __str__(self):
107-
return "{}/{} throttled for {} ms".format(
108-
self.broker_name, self.broker_id, int(self.throttle_time * 1000)
109-
)
108+
def __str__(self) -> str:
109+
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
110+
int(self.throttle_time * 1000))
110111

111112

112-
def _resolve_plugins(plugins):
113-
"""Resolve embedded plugins from the wheel's library directory.
113+
def _resolve_plugins(plugins: str) -> str:
114+
""" Resolve embedded plugins from the wheel's library directory.
114115
115116
For internal module use only.
116117

src/confluent_kafka/_model/__init__.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import List, Optional
1516
from enum import Enum
1617
from .. import cimpl
18+
from ..cimpl import TopicPartition
1719

1820

1921
class Node:
@@ -35,14 +37,14 @@ class Node:
3537
The rack for this node.
3638
"""
3739

38-
def __init__(self, id, host, port, rack=None):
40+
def __init__(self, id: int, host: str, port: int, rack: Optional[str] = None) -> None:
3941
self.id = id
4042
self.id_string = str(id)
4143
self.host = host
4244
self.port = port
4345
self.rack = rack
4446

45-
def __str__(self):
47+
def __str__(self) -> str:
4648
return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}"
4749

4850

@@ -60,7 +62,7 @@ class ConsumerGroupTopicPartitions:
6062
List of topic partitions information.
6163
"""
6264

63-
def __init__(self, group_id, topic_partitions=None):
65+
def __init__(self, group_id: str, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
6466
self.group_id = group_id
6567
self.topic_partitions = topic_partitions
6668

@@ -89,8 +91,8 @@ class ConsumerGroupState(Enum):
8991
#: Consumer Group is empty.
9092
EMPTY = cimpl.CONSUMER_GROUP_STATE_EMPTY
9193

92-
def __lt__(self, other):
93-
if self.__class__ != other.__class__:
94+
def __lt__(self, other: object) -> bool:
95+
if not isinstance(other, ConsumerGroupState):
9496
return NotImplemented
9597
return self.value < other.value
9698

@@ -109,8 +111,8 @@ class ConsumerGroupType(Enum):
109111
#: Classic Type
110112
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC
111113

112-
def __lt__(self, other):
113-
if self.__class__ != other.__class__:
114+
def __lt__(self, other: object) -> bool:
115+
if not isinstance(other, ConsumerGroupType):
114116
return NotImplemented
115117
return self.value < other.value
116118

@@ -126,7 +128,7 @@ class TopicCollection:
126128
List of topic names.
127129
"""
128130

129-
def __init__(self, topic_names):
131+
def __init__(self, topic_names: List[str]) -> None:
130132
self.topic_names = topic_names
131133

132134

@@ -147,7 +149,7 @@ class TopicPartitionInfo:
147149
In-Sync-Replica brokers for the partition.
148150
"""
149151

150-
def __init__(self, id, leader, replicas, isr):
152+
def __init__(self, id: int, leader: Node, replicas: List[Node], isr: List[Node]) -> None:
151153
self.id = id
152154
self.leader = leader
153155
self.replicas = replicas
@@ -165,8 +167,8 @@ class IsolationLevel(Enum):
165167
READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets.
166168
READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.
167169

168-
def __lt__(self, other):
169-
if self.__class__ != other.__class__:
170+
def __lt__(self, other: object) -> bool:
171+
if not isinstance(other, IsolationLevel):
170172
return NotImplemented
171173
return self.value < other.value
172174

@@ -184,7 +186,7 @@ class ElectionType(Enum):
184186
#: Unclean election
185187
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN
186188

187-
def __lt__(self, other):
188-
if self.__class__ != other.__class__:
189+
def __lt__(self, other: object) -> bool:
190+
if not isinstance(other, ElectionType):
189191
return NotImplemented
190192
return self.value < other.value

src/confluent_kafka/_types.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2025 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
"""
20+
Common type definitions for confluent_kafka package.
21+
22+
This module provides centralized type aliases to maintain DRY principle
23+
and ensure consistency across the package.
24+
"""
25+
26+
from typing import Any, Optional, Dict, Union, Callable, List, Tuple
27+
28+
# Headers can be either dict format or list of tuples format
29+
HeadersType = Union[
30+
Dict[str, Union[str, bytes, None]],
31+
List[Tuple[str, Union[str, bytes, None]]]
32+
]
33+
34+
# Serializer/Deserializer callback types (will need SerializationContext import where used)
35+
Serializer = Callable[[Any, Any], bytes] # (obj, SerializationContext) -> bytes
36+
Deserializer = Callable[[Optional[bytes], Any], Any] # (Optional[bytes], SerializationContext) -> obj
37+
38+
# Forward declarations for callback types that reference classes from cimpl
39+
# These are defined here to avoid circular imports
40+
DeliveryCallback = Callable[[Optional[Any], Any], None] # (KafkaError, Message) -> None
41+
RebalanceCallback = Callable[[Any, List[Any]], None] # (Consumer, List[TopicPartition]) -> None

src/confluent_kafka/_util/conversion_util.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import Union, Type
1516
from enum import Enum
1617

1718

1819
class ConversionUtil:
1920
@staticmethod
20-
def convert_to_enum(val, enum_clazz):
21+
def convert_to_enum(val: Union[str, int, Enum], enum_clazz: Type[Enum]) -> Enum:
2122
if type(enum_clazz) is not type(Enum):
2223
raise TypeError("'enum_clazz' must be of type Enum")
2324

src/confluent_kafka/_util/validation_util.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,46 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import Any, List
1516
from ..cimpl import KafkaError
1617

1718
try:
18-
string_type = basestring
19+
string_type = basestring # type: ignore[name-defined]
1920
except NameError:
2021
string_type = str
2122

2223

2324
class ValidationUtil:
2425
@staticmethod
25-
def check_multiple_not_none(obj, vars_to_check):
26+
def check_multiple_not_none(obj: Any, vars_to_check: List[str]) -> None:
2627
for param in vars_to_check:
2728
ValidationUtil.check_not_none(obj, param)
2829

2930
@staticmethod
30-
def check_not_none(obj, param):
31+
def check_not_none(obj: Any, param: str) -> None:
3132
if getattr(obj, param) is None:
3233
raise ValueError("Expected %s to be not None" % (param,))
3334

3435
@staticmethod
35-
def check_multiple_is_string(obj, vars_to_check):
36+
def check_multiple_is_string(obj: Any, vars_to_check: List[str]) -> None:
3637
for param in vars_to_check:
3738
ValidationUtil.check_is_string(obj, param)
3839

3940
@staticmethod
40-
def check_is_string(obj, param):
41+
def check_is_string(obj: Any, param: str) -> None:
4142
param_value = getattr(obj, param)
4243
if param_value is not None and not isinstance(param_value, string_type):
4344
raise TypeError("Expected %s to be a string" % (param,))
4445

4546
@staticmethod
46-
def check_kafka_errors(errors):
47+
def check_kafka_errors(errors: List[KafkaError]) -> None:
4748
if not isinstance(errors, list):
4849
raise TypeError("errors should be None or a list")
4950
for error in errors:
5051
if not isinstance(error, KafkaError):
5152
raise TypeError("Expected list of KafkaError")
5253

5354
@staticmethod
54-
def check_kafka_error(error):
55+
def check_kafka_error(error: KafkaError) -> None:
5556
if not isinstance(error, KafkaError):
5657
raise TypeError("Expected error to be a KafkaError")

0 commit comments

Comments
 (0)