- 
                Notifications
    You must be signed in to change notification settings 
- Fork 934
Add context manager support for Producer, Consumer, and AdminClient #2114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| 🎉 All Contributor License Agreements have been signed. Ready to merge.  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds context manager support to Producer, Consumer, and AdminClient classes in the confluent-kafka Python library, enabling automatic resource cleanup when exiting with blocks.
Key changes include:
- Context manager implementation for all three client types with automatic cleanup
- Comprehensive test coverage for context manager functionality
- Updated examples demonstrating context manager usage
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description | 
|---|---|
| tests/test_Producer.py | Adds comprehensive unit tests for Producer context manager functionality | 
| tests/test_Consumer.py | Adds unit tests covering Consumer context manager behavior and cleanup | 
| tests/test_Admin.py | Adds unit tests for AdminClient context manager with various Admin APIs | 
| src/confluent_kafka/src/Producer.c | Implements enter and exit methods with automatic flush and NULL checks | 
| src/confluent_kafka/src/Metadata.c | Adds NULL checks to prevent operations on closed handles | 
| src/confluent_kafka/src/Consumer.c | Implements context manager methods that call existing close() functionality | 
| src/confluent_kafka/src/Admin.c | Implements context manager methods with automatic cleanup and NULL checks | 
| src/confluent_kafka/cimpl.pyi | Updates type stubs with context manager method signatures | 
| examples/context_manager_example.py | New comprehensive example demonstrating all three context managers | 
| examples/avro_producer.py | Updates existing example to use context managers | 
| examples/README.md | Documents the new context manager example | 
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /* Flush any pending messages (wait indefinitely to ensure delivery) */ | ||
| err = rd_kafka_flush(self->rk, -1); | 
    
      
    
      Copilot
AI
    
    
    
      Oct 29, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an infinite timeout (-1) for flush during context exit could cause the application to hang indefinitely if there are network issues. Consider using a reasonable timeout with a warning message if flush times out.
| with SchemaRegistryClient(schema_registry_conf) as schema_registry_client: | ||
| avro_serializer = AvroSerializer(schema_registry_client, | ||
| schema_str, | ||
| user_to_dict) | 
    
      
    
      Copilot
AI
    
    
    
      Oct 29, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The avro_serializer is created inside the SchemaRegistryClient context but used in the Producer context. This creates a dependency where the serializer references a potentially closed schema registry client. Consider restructuring to ensure the schema registry client remains open for the lifetime of the serializer.
| 
 | 




Summary
This PR adds context manager (
withstatement) support toProducer,Consumer, andAdminClientclassesMotivation
This addresses most requested community issues Issue #2061 and Issue #1244.
Changes
Implementation
Producer Context Manager (
src/confluent_kafka/src/Producer.c):__enter__method that returns the producer instance__exit__method that automatically flushes pending messages and destroys the producerConsumer Context Manager (
src/confluent_kafka/src/Consumer.c):__enter__method that returns the consumer instance__exit__method that calls the existingclose()methodAdminClient Context Manager (
src/confluent_kafka/src/Admin.c):__enter__method that returns the admin client instance__exit__method that destroys the admin client and cleans up resourcesType Stubs
src/confluent_kafka/cimpl.pyiwith context manager method signatures for all three classesTesting
tests/test_Producer.py), (tests/test_Consumer.py), (tests/test_Admin.py) covering scenariosExamples and Documentation
examples/avro_producer.pyto use context managers for bothProducerandSchemaRegistryClientexamples/context_manager_example.pydemonstrating all three context managersexamples/README.mdto document the new context manager