- 
                Notifications
    You must be signed in to change notification settings 
- Fork 934
Set up type hinting: add correct types and apply type fixes in schema-registry module #2107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 26 commits
c50656f
              343dbfe
              1e7f2ac
              69ca6b8
              d789244
              0653fe0
              77c4965
              cfa723f
              1abe216
              8f789a0
              9c99020
              a387ea7
              c1e2f91
              8bdb0af
              26694e6
              c7865d8
              791c4ad
              ffb118e
              d536a71
              cdbd203
              6fa8730
              485532f
              6262a73
              5ffe301
              0f51247
              b4bf42c
              1451647
              5e718d6
              0ee5103
              4026889
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,6 +1,8 @@ | ||
| # core test requirements | ||
| urllib3<3 | ||
| flake8 | ||
| mypy | ||
| types-cachetools | ||
| orjson | ||
| pytest | ||
| pytest-timeout | ||
|  | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|  | @@ -110,22 +110,22 @@ def _produce_batch_and_poll() -> int: | |||||||||||
|  | ||||||||||||
| async def flush_librdkafka_queue(self, timeout=-1): | ||||||||||||
| """Flush the librdkafka queue and wait for all messages to be delivered | ||||||||||||
|  | ||||||||||||
| This method awaits until all outstanding produce requests are completed | ||||||||||||
| or the timeout is reached, unless the timeout is set to 0 (non-blocking). | ||||||||||||
|  | ||||||||||||
| Args: | ||||||||||||
| timeout: Maximum time to wait in seconds: | ||||||||||||
| - -1 = wait indefinitely (default) | ||||||||||||
| - 0 = non-blocking, return immediately | ||||||||||||
| - >0 = wait up to timeout seconds | ||||||||||||
|  | ||||||||||||
| Returns: | ||||||||||||
| Number of messages still in queue after flush attempt | ||||||||||||
| """ | ||||||||||||
| return await _common.async_call(self._executor, self._producer.flush, timeout) | ||||||||||||
|  | ||||||||||||
| def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None: | ||||||||||||
| def _handle_partial_failures( | ||||||||||||
| self, | ||||||||||||
| batch_messages: List[Dict[str, Any]] | ||||||||||||
| ) -> None: | ||||||||||||
| 
      Comment on lines
    
      +125
     to 
      +128
    
   
     | ||||||||||||
| def _handle_partial_failures( | |
| self, | |
| batch_messages: List[Dict[str, Any]] | |
| ) -> None: | |
| def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None: | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -72,58 +72,82 @@ | |
| ] | ||
|  | ||
|  | ||
| def topic_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]: | ||
| def topic_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]: | ||
| """ | ||
| Constructs a subject name in the form of {topic}-key|value. | ||
|  | ||
| Args: | ||
| ctx (SerializationContext): Metadata pertaining to the serialization | ||
| operation. | ||
| operation. **Required** - will raise ValueError if None. | ||
|  | ||
| record_name (Optional[str]): Record name. | ||
|  | ||
| Raises: | ||
| ValueError: If ctx is None. | ||
|  | ||
| """ | ||
| if ctx is None: | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I refactored the name_strategy functions a bit: they have to follow the same function signature but SerializationContext is only required for some | ||
| raise ValueError( | ||
| "SerializationContext is required for topic_subject_name_strategy. " | ||
| "Either provide a SerializationContext or use record_subject_name_strategy." | ||
| ) | ||
| 
      Comment on lines
    
      +89
     to 
      +93
    
   
     | ||
| return ctx.topic + "-" + ctx.field | ||
|  | ||
|  | ||
| def topic_record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]: | ||
| def topic_record_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]: | ||
| """ | ||
| Constructs a subject name in the form of {topic}-{record_name}. | ||
|  | ||
| Args: | ||
| ctx (SerializationContext): Metadata pertaining to the serialization | ||
| operation. | ||
| operation. **Required** - will raise ValueError if None. | ||
|  | ||
| record_name (Optional[str]): Record name. | ||
|  | ||
| Raises: | ||
| ValueError: If ctx is None. | ||
|  | ||
| """ | ||
| if ctx is None: | ||
| raise ValueError( | ||
| "SerializationContext is required for topic_record_subject_name_strategy. " | ||
| "Either provide a SerializationContext or use record_subject_name_strategy." | ||
| ) | ||
| return ctx.topic + "-" + record_name if record_name is not None else None | ||
|  | ||
|  | ||
| def record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]: | ||
| def record_subject_name_strategy(ctx: Optional[SerializationContext], record_name: Optional[str]) -> Optional[str]: | ||
| """ | ||
| Constructs a subject name in the form of {record_name}. | ||
|  | ||
| Args: | ||
| ctx (SerializationContext): Metadata pertaining to the serialization | ||
| operation. | ||
| operation. **Not used** by this strategy. | ||
|  | ||
| record_name (Optional[str]): Record name. | ||
|  | ||
| Note: | ||
| This strategy does not require SerializationContext and can be used | ||
| when ctx is None. | ||
|  | ||
| """ | ||
| return record_name if record_name is not None else None | ||
|  | ||
|  | ||
| def reference_subject_name_strategy(ctx, schema_ref: SchemaReference) -> Optional[str]: | ||
| def reference_subject_name_strategy(ctx: Optional[SerializationContext], schema_ref: SchemaReference) -> Optional[str]: | ||
| """ | ||
| Constructs a subject reference name in the form of {reference name}. | ||
|  | ||
| Args: | ||
| ctx (SerializationContext): Metadata pertaining to the serialization | ||
| operation. | ||
| operation. **Not used** by this strategy. | ||
|  | ||
| schema_ref (SchemaReference): SchemaReference instance. | ||
|  | ||
| Note: | ||
| This strategy does not require SerializationContext and can be used | ||
| when ctx is None. | ||
|  | ||
| """ | ||
| return schema_ref.name if schema_ref is not None else None | ||
|  | ||
|  | @@ -205,7 +229,7 @@ def dual_schema_id_deserializer(payload: bytes, ctx: Optional[SerializationConte | |
|  | ||
| # Parse schema ID from determined source and return appropriate payload | ||
| if header_value is not None: | ||
| schema_id.from_bytes(io.BytesIO(header_value)) | ||
| schema_id.from_bytes(io.BytesIO(header_value)) # type: ignore[arg-type] | ||
| return io.BytesIO(payload) # Return full payload when schema ID is in header | ||
| else: | ||
| return schema_id.from_bytes(io.BytesIO(payload)) # Parse from payload, return remainder | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a mutable default argument (empty list) is dangerous as it will be shared across all instances. Use None as default and initialize inside the function instead.