Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c50656f
update
fangnx Sep 4, 2025
343dbfe
remove py.typed for now
fangnx Sep 4, 2025
1e7f2ac
update
fangnx Sep 5, 2025
69ca6b8
fix cimply and add types to serde producer/consumer
fangnx Sep 5, 2025
d789244
admin
fangnx Sep 5, 2025
0653fe0
address feedback
fangnx Sep 9, 2025
77c4965
add warning to stub and c files; admin typing more
fangnx Sep 9, 2025
cfa723f
add accidentally removed md files
fangnx Sep 9, 2025
1abe216
fix merge conflicts in md files, add types to admin and serialization…
fangnx Oct 15, 2025
8f789a0
finish admin init
fangnx Oct 15, 2025
9c99020
add types for AIO module
fangnx Oct 15, 2025
a387ea7
linter fix
fangnx Oct 15, 2025
c1e2f91
address mypy complaints
fangnx Oct 16, 2025
8bdb0af
revert some accidental doc change
fangnx Oct 16, 2025
26694e6
fix some suggestions by copilot
fangnx Oct 21, 2025
c7865d8
linter
fangnx Oct 21, 2025
791c4ad
fix
fangnx Oct 17, 2025
ffb118e
resolve conflict
fangnx Oct 21, 2025
d536a71
encryption clients
fangnx Oct 20, 2025
cdbd203
fix
fangnx Oct 23, 2025
6fa8730
revert incorrect merge conflict changes
fangnx Oct 23, 2025
485532f
fix many things
fangnx Oct 23, 2025
6262a73
more fixes in non sr modules
fangnx Oct 23, 2025
5ffe301
type encrypt_executor.py
fangnx Oct 23, 2025
0f51247
more typeignore removals
fangnx Oct 24, 2025
b4bf42c
update
fangnx Oct 24, 2025
1451647
handle union types in schemas
fangnx Oct 24, 2025
5e718d6
a bit more
fangnx Oct 24, 2025
0ee5103
revert some bad changes during merge, address copilot comments
fangnx Oct 24, 2025
4026889
minor
fangnx Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ Homepage = "https://github.com/confluentinc/confluent-kafka-python"
[tool.mypy]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = [
"confluent_kafka.schema_registry.avro",
"confluent_kafka.schema_registry.json_schema",
"confluent_kafka.schema_registry.protobuf",
]
disable_error_code = ["assignment", "no-redef"]

[[tool.mypy.overrides]]
module = [
"confluent_kafka.schema_registry.confluent.meta_pb2",
"confluent_kafka.schema_registry.confluent.types.decimal_pb2",
]
ignore_errors = true

[tool.setuptools]
include-package-data = false

Expand Down
2 changes: 2 additions & 0 deletions requirements/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# core test requirements
urllib3<3
flake8
mypy
types-cachetools
orjson
pytest
pytest-timeout
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import List, Optional, Any
from enum import Enum
from .. import cimpl
from ..cimpl import TopicPartition
Expand Down
1 change: 0 additions & 1 deletion src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,6 @@ def list_topics(self, *args: Any, **kwargs: Any) -> ClusterMetadata:
return super(AdminClient, self).list_topics(*args, **kwargs)

def list_groups(self, *args: Any, **kwargs: Any) -> List[GroupMetadata]:

return super(AdminClient, self).list_groups(*args, **kwargs)

def create_partitions( # type: ignore[override]
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MemberAssignment:
The topic partitions assigned to a group member.
"""

def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a mutable default argument (empty list) is dangerous as it will be shared across all instances. Use None as default and initialize inside the function instead.

Suggested change
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.topic_partitions = topic_partitions if topic_partitions is not None else []

Copilot uses AI. Check for mistakes.


Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_listoffsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Optional
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod

from .. import cimpl
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TopicMetadata(object):
# on other classes which raises a warning/error.

def __init__(self) -> None:
self.topic: Optional[str] = None
self.topic = None
"""Topic name"""
self.partitions: Dict[int, 'PartitionMetadata'] = {}
"""Map of partitions indexed by partition id. Value is a PartitionMetadata object."""
Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/admin/_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ResourceType(Enum):
BROKER = _cimpl.RESOURCE_BROKER #: Broker resource. Resource name is broker id.
TRANSACTIONAL_ID = _cimpl.RESOURCE_TRANSACTIONAL_ID #: Transactional ID resource.

def __lt__(self, other: object) -> Any:
def __lt__(self, other: object) -> bool:
if not isinstance(other, ResourceType):
return NotImplemented
return self.value < other.value
Expand All @@ -44,7 +44,7 @@ class ResourcePatternType(Enum):
LITERAL = _cimpl.RESOURCE_PATTERN_LITERAL #: Literal: A literal resource name
PREFIXED = _cimpl.RESOURCE_PATTERN_PREFIXED #: Prefixed: A prefixed resource name

def __lt__(self, other: object) -> Any:
def __lt__(self, other: object) -> bool:
if not isinstance(other, ResourcePatternType):
return NotImplemented
return self.value < other.value
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/_scram.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import List, Optional, Any
from enum import Enum

from .. import cimpl
Expand Down
7 changes: 3 additions & 4 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ TODO: Consider migrating to Cython in the future to eliminate this dual
maintenance burden and get type hints directly from the implementation.
"""

from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload, TYPE_CHECKING
from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload
from typing_extensions import Self, Literal
import builtins

from ._types import HeadersType
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata

if TYPE_CHECKING:
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata
from ._types import HeadersType

# Callback types with proper class references (defined locally to avoid circular imports)
DeliveryCallback = Callable[[Optional['KafkaError'], 'Message'], None]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,22 @@ def _produce_batch_and_poll() -> int:

async def flush_librdkafka_queue(self, timeout=-1):
"""Flush the librdkafka queue and wait for all messages to be delivered

This method awaits until all outstanding produce requests are completed
or the timeout is reached, unless the timeout is set to 0 (non-blocking).

Args:
timeout: Maximum time to wait in seconds:
- -1 = wait indefinitely (default)
- 0 = non-blocking, return immediately
- >0 = wait up to timeout seconds

Returns:
Number of messages still in queue after flush attempt
"""
return await _common.async_call(self._executor, self._producer.flush, timeout)

def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
Comment on lines +125 to +128
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This appears to be a pure formatting change (moving parameters to separate lines) unrelated to type hinting. Such formatting changes should typically be in a separate commit to keep type hinting changes focused.

Suggested change
def _handle_partial_failures(
self,
batch_messages: List[Dict[str, Any]]
) -> None:
def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:

Copilot uses AI. Check for mistakes.
"""Handle messages that failed during produce_batch

When produce_batch encounters messages that fail immediately (e.g.,
Expand Down
42 changes: 33 additions & 9 deletions src/confluent_kafka/schema_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,58 +72,82 @@
]


def topic_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
def topic_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {topic}-key|value.

Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
operation. **Required** - will raise ValueError if None.

record_name (Optional[str]): Record name.

Raises:
ValueError: If ctx is None.

"""
if ctx is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the name_strategy functions a bit: they have to follow the same function signature but SerializationContext is only required for some

raise ValueError(
"SerializationContext is required for topic_subject_name_strategy. "
"Either provide a SerializationContext or use record_subject_name_strategy."
)
Comment on lines +89 to +93
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a ValueError when ctx is None changes the API behavior. Previously, these functions accepted Optional[SerializationContext], and None would result in an AttributeError at line 94. The new explicit check is clearer, but this is a breaking change that could affect existing code that catches AttributeError. Consider documenting this as a breaking change.

Copilot uses AI. Check for mistakes.
return ctx.topic + "-" + ctx.field


def topic_record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
def topic_record_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {topic}-{record_name}.

Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
operation. **Required** - will raise ValueError if None.

record_name (Optional[str]): Record name.

Raises:
ValueError: If ctx is None.

"""
if ctx is None:
raise ValueError(
"SerializationContext is required for topic_record_subject_name_strategy. "
"Either provide a SerializationContext or use record_subject_name_strategy."
)
return ctx.topic + "-" + record_name if record_name is not None else None


def record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
def record_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {record_name}.

Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
operation. **Not used** by this strategy.

record_name (Optional[str]): Record name.

Note:
This strategy does not require SerializationContext and can be used
when ctx is None.

"""
return record_name if record_name is not None else None


def reference_subject_name_strategy(ctx, schema_ref: SchemaReference) -> Optional[str]:
def reference_subject_name_strategy(ctx: Optional[SerializationContext], schema_ref: SchemaReference) -> Optional[str]:
"""
Constructs a subject reference name in the form of {reference name}.

Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
operation. **Not used** by this strategy.

schema_ref (SchemaReference): SchemaReference instance.

Note:
This strategy does not require SerializationContext and can be used
when ctx is None.

"""
return schema_ref.name if schema_ref is not None else None

Expand Down Expand Up @@ -205,7 +229,7 @@ def dual_schema_id_deserializer(payload: bytes, ctx: Optional[SerializationConte

# Parse schema ID from determined source and return appropriate payload
if header_value is not None:
schema_id.from_bytes(io.BytesIO(header_value))
schema_id.from_bytes(io.BytesIO(header_value)) # type: ignore[arg-type]
return io.BytesIO(payload) # Return full payload when schema ID is in header
else:
return schema_id.from_bytes(io.BytesIO(payload)) # Parse from payload, return remainder
Expand Down
Loading