Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c50656f
update
fangnx Sep 4, 2025
343dbfe
remove py.typed for now
fangnx Sep 4, 2025
1e7f2ac
update
fangnx Sep 5, 2025
69ca6b8
fix cimply and add types to serde producer/consumer
fangnx Sep 5, 2025
d789244
admin
fangnx Sep 5, 2025
0653fe0
address feedback
fangnx Sep 9, 2025
77c4965
add warning to stub and c files; admin typing more
fangnx Sep 9, 2025
cfa723f
add accidentally removed md files
fangnx Sep 9, 2025
1abe216
fix merge conflicts in md files, add types to admin and serialization…
fangnx Oct 15, 2025
8f789a0
finish admin init
fangnx Oct 15, 2025
9c99020
add types for AIO module
fangnx Oct 15, 2025
a387ea7
linter fix
fangnx Oct 15, 2025
c1e2f91
address mypy complaints
fangnx Oct 16, 2025
8bdb0af
revert some accidental doc change
fangnx Oct 16, 2025
26694e6
fix some suggestions by copilot
fangnx Oct 21, 2025
c7865d8
linter
fangnx Oct 21, 2025
791c4ad
fix
fangnx Oct 17, 2025
ffb118e
resolve conflict
fangnx Oct 21, 2025
d536a71
encryption clients
fangnx Oct 20, 2025
cdbd203
fix
fangnx Oct 23, 2025
6fa8730
revert incorrect merge conflict changes
fangnx Oct 23, 2025
485532f
fix many things
fangnx Oct 23, 2025
6262a73
more fixes in non sr modules
fangnx Oct 23, 2025
5ffe301
type encrypt_executor.py
fangnx Oct 23, 2025
0f51247
more typeignore removals
fangnx Oct 24, 2025
b4bf42c
update
fangnx Oct 24, 2025
1451647
handle union types in schemas
fangnx Oct 24, 2025
5e718d6
a bit more
fangnx Oct 24, 2025
0ee5103
revert some bad changes during merge, address copilot comments
fangnx Oct 24, 2025
4026889
minor
fangnx Oct 30, 2025
4f0a609
support type hint substitution for unasync
fangnx Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ Homepage = "https://github.com/confluentinc/confluent-kafka-python"
[tool.mypy]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = [
"confluent_kafka.schema_registry.avro",
"confluent_kafka.schema_registry.json_schema",
"confluent_kafka.schema_registry.protobuf",
]
disable_error_code = ["assignment", "no-redef"]

[[tool.mypy.overrides]]
module = [
"confluent_kafka.schema_registry.confluent.meta_pb2",
"confluent_kafka.schema_registry.confluent.types.decimal_pb2",
]
ignore_errors = true

[tool.setuptools]
include-package-data = false

Expand Down
2 changes: 2 additions & 0 deletions requirements/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# core test requirements
urllib3<3
flake8
mypy
types-cachetools
orjson
pytest
pytest-timeout
Expand Down
18 changes: 9 additions & 9 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import List, Optional, Any
from enum import Enum
from .. import cimpl
from ..cimpl import TopicPartition
Expand Down Expand Up @@ -91,8 +91,8 @@ class ConsumerGroupState(Enum):
#: Consumer Group is empty.
EMPTY = cimpl.CONSUMER_GROUP_STATE_EMPTY

def __lt__(self, other: object) -> bool:
if not isinstance(other, ConsumerGroupState):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand All @@ -111,8 +111,8 @@ class ConsumerGroupType(Enum):
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other: object) -> bool:
if not isinstance(other, ConsumerGroupType):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand Down Expand Up @@ -167,8 +167,8 @@ class IsolationLevel(Enum):
READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets.
READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.

def __lt__(self, other: object) -> bool:
if not isinstance(other, IsolationLevel):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand All @@ -186,7 +186,7 @@ class ElectionType(Enum):
#: Unclean election
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other: object) -> bool:
if not isinstance(other, ElectionType):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
1 change: 0 additions & 1 deletion src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,6 @@ def list_topics(self, *args: Any, **kwargs: Any) -> ClusterMetadata:
return super(AdminClient, self).list_topics(*args, **kwargs)

def list_groups(self, *args: Any, **kwargs: Any) -> List[GroupMetadata]:

return super(AdminClient, self).list_groups(*args, **kwargs)

def create_partitions( # type: ignore[override]
Expand Down
12 changes: 6 additions & 6 deletions src/confluent_kafka/admin/_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class AclOperation(Enum):
ALTER_CONFIGS = _cimpl.ACL_OPERATION_ALTER_CONFIGS #: ALTER_CONFIGS operation
IDEMPOTENT_WRITE = _cimpl.ACL_OPERATION_IDEMPOTENT_WRITE #: IDEMPOTENT_WRITE operation

def __lt__(self, other: object) -> bool:
if not isinstance(other, AclOperation):
def __lt__(self, other: 'AclOperation') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand All @@ -59,8 +59,8 @@ class AclPermissionType(Enum):
DENY = _cimpl.ACL_PERMISSION_TYPE_DENY #: Disallows access
ALLOW = _cimpl.ACL_PERMISSION_TYPE_ALLOW #: Grants access

def __lt__(self, other: object) -> bool:
if not isinstance(other, AclPermissionType):
def __lt__(self, other: 'AclPermissionType') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand Down Expand Up @@ -161,8 +161,8 @@ def _to_tuple(self) -> Tuple[ResourceType, str, ResourcePatternType, str, str, A
def __hash__(self) -> int:
return hash(self._to_tuple())

def __lt__(self, other: object) -> bool:
if not isinstance(other, AclBinding):
def __lt__(self, other: 'AclBinding') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self._to_tuple() < other._to_tuple()

Expand Down
4 changes: 1 addition & 3 deletions src/confluent_kafka/admin/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ def __repr__(self) -> str:
def __hash__(self) -> int:
return hash((self.restype, self.name))

def __lt__(self, other: object) -> bool:
if not isinstance(other, ConfigResource):
return NotImplemented
def __lt__(self, other: 'ConfigResource') -> bool:
if self.restype < other.restype:
return True
return self.name.__lt__(other.name)
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MemberAssignment:
The topic partitions assigned to a group member.
"""

def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a mutable default argument (empty list) is dangerous as it will be shared across all instances. Use None as default and initialize inside the function instead.

Suggested change
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.topic_partitions = topic_partitions if topic_partitions is not None else []

Copilot uses AI. Check for mistakes.


Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/admin/_listoffsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Optional
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod

from .. import cimpl
Expand Down Expand Up @@ -68,7 +68,7 @@ def __new__(cls, index: int):
else:
return cls.for_timestamp(index)

def __lt__(self, other: object) -> bool:
def __lt__(self, other) -> Any:
if not isinstance(other, OffsetSpec):
return NotImplemented
return self._value < other._value
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TopicMetadata(object):
# on other classes which raises a warning/error.

def __init__(self) -> None:
self.topic: Optional[str] = None
self.topic = None
"""Topic name"""
self.partitions: Dict[int, 'PartitionMetadata'] = {}
"""Map of partitions indexed by partition id. Value is a PartitionMetadata object."""
Expand Down
8 changes: 4 additions & 4 deletions src/confluent_kafka/admin/_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class ResourceType(Enum):
BROKER = _cimpl.RESOURCE_BROKER #: Broker resource. Resource name is broker id.
TRANSACTIONAL_ID = _cimpl.RESOURCE_TRANSACTIONAL_ID #: Transactional ID resource.

def __lt__(self, other: object) -> Any:
if not isinstance(other, ResourceType):
def __lt__(self, other: 'ResourceType') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand All @@ -44,7 +44,7 @@ class ResourcePatternType(Enum):
LITERAL = _cimpl.RESOURCE_PATTERN_LITERAL #: Literal: A literal resource name
PREFIXED = _cimpl.RESOURCE_PATTERN_PREFIXED #: Prefixed: A prefixed resource name

def __lt__(self, other: object) -> Any:
if not isinstance(other, ResourcePatternType):
def __lt__(self, other: 'ResourcePatternType') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
6 changes: 3 additions & 3 deletions src/confluent_kafka/admin/_scram.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import List, Optional, Any
from enum import Enum

from .. import cimpl
Expand All @@ -26,8 +26,8 @@ class ScramMechanism(Enum):
SCRAM_SHA_256 = cimpl.SCRAM_MECHANISM_SHA_256 #: SCRAM-SHA-256 mechanism
SCRAM_SHA_512 = cimpl.SCRAM_MECHANISM_SHA_512 #: SCRAM-SHA-512 mechanism

def __lt__(self, other: object) -> bool:
if not isinstance(other, ScramMechanism):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

Expand Down
7 changes: 3 additions & 4 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ TODO: Consider migrating to Cython in the future to eliminate this dual
maintenance burden and get type hints directly from the implementation.
"""

from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload, TYPE_CHECKING
from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload
from typing_extensions import Self, Literal
import builtins

from ._types import HeadersType
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata

if TYPE_CHECKING:
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata
from ._types import HeadersType

# Callback types with proper class references (defined locally to avoid circular imports)
DeliveryCallback = Callable[[Optional['KafkaError'], 'Message'], None]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def flush(self, *args: Any, **kwargs: Any) -> Any:
# Update buffer activity since we just flushed
self._buffer_timeout_manager.mark_activity()

# Then flush the underlying producer and wait for delivery confirmation
# Then flush underlying producer and wait for delivery confirmation
return await self._call(self._producer.flush, *args, **kwargs)

async def purge(self, *args: Any, **kwargs: Any) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import confluent_kafka

from .. import _common

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -108,24 +106,10 @@ def _produce_batch_and_poll() -> int:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self._executor, _produce_batch_and_poll)

async def flush_librdkafka_queue(self, timeout=-1):
"""Flush the librdkafka queue and wait for all messages to be delivered

This method awaits until all outstanding produce requests are completed
or the timeout is reached, unless the timeout is set to 0 (non-blocking).

Args:
timeout: Maximum time to wait in seconds:
- -1 = wait indefinitely (default)
- 0 = non-blocking, return immediately
- >0 = wait up to timeout seconds

Returns:
Number of messages still in queue after flush attempt
"""
return await _common.async_call(self._executor, self._producer.flush, timeout)

def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
Comment on lines +125 to +128
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This appears to be a pure formatting change (moving parameters to separate lines) unrelated to type hinting. Such formatting changes should typically be in a separate commit to keep type hinting changes focused.

Suggested change
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:

Copilot uses AI. Check for mistakes.
"""Handle messages that failed during produce_batch

When produce_batch encounters messages that fail immediately (e.g.,
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/schema_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def dual_schema_id_deserializer(payload: bytes, ctx: Optional[SerializationConte

# Parse schema ID from determined source and return appropriate payload
if header_value is not None:
schema_id.from_bytes(io.BytesIO(header_value))
schema_id.from_bytes(io.BytesIO(header_value)) # type: ignore[arg-type]
return io.BytesIO(payload) # Return full payload when schema ID is in header
else:
return schema_id.from_bytes(io.BytesIO(payload)) # Parse from payload, return remainder
Expand Down
Loading