Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The scripts in this directory provide various examples of using the Confluent Py

- [producer.py](producer.py): Read lines from stdin and send them to a Kafka topic.
- [consumer.py](consumer.py): Read messages from a Kafka topic.
- [context_manager_example.py](context_manager_example.py): **Demonstrates context manager (`with` statement) usage for Producer, Consumer, and AdminClient** - shows automatic resource cleanup when exiting the `with` block.

## AsyncIO Examples

Expand Down
73 changes: 36 additions & 37 deletions examples/avro_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,42 @@ def main(args):
# If Confluent Cloud SR credentials are provided, add to config
if args.sr_api_key and args.sr_api_secret:
schema_registry_conf['basic.auth.user.info'] = f"{args.sr_api_key}:{args.sr_api_secret}"
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)

string_serializer = StringSerializer('utf_8')

producer_conf = {'bootstrap.servers': args.bootstrap_servers}

producer = Producer(producer_conf)

print("Producing user records to topic {}. ^C to exit.".format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
user_name = input("Enter name: ")
user_address = input("Enter address: ")
user_favorite_number = int(input("Enter favorite number: "))
user_favorite_color = input("Enter favorite color: ")
user = User(name=user_name,
address=user_address,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic,
key=string_serializer(str(uuid4())),
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue

print("\nFlushing records...")
producer.flush()

# Use context manager for SchemaRegistryClient to ensure proper cleanup
with SchemaRegistryClient(schema_registry_conf) as schema_registry_client:
avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)
Comment on lines +119 to +122
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The avro_serializer is created inside the SchemaRegistryClient context but used in the Producer context. This creates a dependency where the serializer references a potentially closed schema registry client. Consider restructuring to ensure the schema registry client remains open for the lifetime of the serializer.

Copilot uses AI. Check for mistakes.

string_serializer = StringSerializer('utf_8')

producer_conf = {'bootstrap.servers': args.bootstrap_servers}

# Use context manager for Producer to ensure proper cleanup
with Producer(producer_conf) as producer:
print("Producing user records to topic {}. ^C to exit.".format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
user_name = input("Enter name: ")
user_address = input("Enter address: ")
user_favorite_number = int(input("Enter favorite number: "))
user_favorite_color = input("Enter favorite color: ")
user = User(name=user_name,
address=user_address,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic,
key=string_serializer(str(uuid4())),
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
# Producer will automatically flush on context exit


if __name__ == '__main__':
Expand Down
132 changes: 132 additions & 0 deletions examples/context_manager_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Example demonstrating context manager usage for Producer, Consumer, and AdminClient.
# Context managers ensure proper cleanup of resources when exiting the 'with' block.
#

from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import sys


def main():
if len(sys.argv) < 2:
sys.stderr.write('Usage: %s <bootstrap-brokers>\n' % sys.argv[0])
sys.exit(1)

broker = sys.argv[1]
topic = 'context-manager-example'

# Example 1: AdminClient with context manager
# Automatically destroys the admin client when exiting the 'with' block
print("=== AdminClient Context Manager Example ===")
admin_conf = {'bootstrap.servers': broker}

with AdminClient(admin_conf) as admin:
# Create a topic using AdminClient
topic_obj = NewTopic(topic, num_partitions=1, replication_factor=1)
futures = admin.create_topics([topic_obj])

# Wait for the operation to complete
for topic_name, future in futures.items():
try:
future.result() # The result itself is None
print(f"Topic '{topic_name}' created successfully")
except Exception as e:
print(f"Failed to create topic '{topic_name}': {e}")

# Poll to ensure callbacks are processed
admin.poll(timeout=1.0)

# AdminClient is automatically destroyed here, no need for manual cleanup

# Example 2: Producer with context manager
# Automatically flushes pending messages and destroys the producer
print("\n=== Producer Context Manager Example ===")
producer_conf = {'bootstrap.servers': broker}

def delivery_callback(err, msg):
if err:
print(f'Message failed delivery: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')

with Producer(producer_conf) as producer:
# Produce some messages
for i in range(5):
value = f'Message {i} from context manager example'
producer.produce(
topic,
key=f'key-{i}',
value=value.encode('utf-8'),
callback=delivery_callback
)
# Poll for delivery callbacks
producer.poll(0)

print(f"Produced 5 messages to topic '{topic}'")

# Producer automatically flushes all pending messages and destroys here
# No need to call producer.flush() or manually clean up

# Example 3: Consumer with context manager
# Automatically closes the consumer (leaves consumer group, commits offsets)
print("\n=== Consumer Context Manager Example ===")
consumer_conf = {
'bootstrap.servers': broker,
'group.id': 'context-manager-example-group',
'auto.offset.reset': 'earliest'
}

with Consumer(consumer_conf) as consumer:
# Subscribe to the topic
consumer.subscribe([topic])

# Consume messages
msg_count = 0
try:
while msg_count < 5:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition, try next message
continue
else:
print(f'Consumer error: {msg.error()}')
break

print(f'Consumed message: key={msg.key().decode("utf-8")}, '
f'value={msg.value().decode("utf-8")}, '
f'partition={msg.partition()}, offset={msg.offset()}')
msg_count += 1
except KeyboardInterrupt:
print('Consumer interrupted by user')

# Consumer automatically calls close() here (leaves group, commits offsets)
# No need to manually call consumer.close()

print("\n=== All examples completed successfully! ===")


if __name__ == '__main__':
main()

6 changes: 6 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class Producer:
def set_sasl_credentials(self, username: str, password: str) -> None: ...
def __len__(self) -> int: ...
def __bool__(self) -> bool: ...
def __enter__(self) -> "Producer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

class Consumer:
def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ...
Expand Down Expand Up @@ -208,6 +210,8 @@ class Consumer:
timeout: float = -1
) -> List[TopicPartition]: ...
def close(self) -> None: ...
def __enter__(self) -> "Consumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ...
def offsets_for_times(
self,
Expand All @@ -223,6 +227,8 @@ class Consumer:

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> "_AdminClientImpl": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
def create_topics(
self,
topics: List['NewTopic'],
Expand Down
46 changes: 46 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
rd_kafka_error_t *err_obj = NULL;
char errstr[512];

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
return NULL;
}

c_options = rd_kafka_AdminOptions_new(self->rk, for_api);
if (!c_options) {
PyErr_Format(PyExc_RuntimeError,
Expand Down Expand Up @@ -3245,6 +3250,11 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
return NULL;
}

r = Admin_poll0(self, (int)(tmout * 1000));
if (r == -1)
return NULL;
Expand All @@ -3253,6 +3263,36 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
}


static PyObject *Admin_enter (Handle *self) {
Py_INCREF(self);
return (PyObject *)self;
}

static PyObject *Admin_exit (Handle *self, PyObject *args) {
PyObject *exc_type, *exc_value, *exc_traceback;
CallState cs;

/* __exit__ always receives 3 arguments: exception type, exception value, and traceback.
* All three will be None if the with block completed without an exception.
* We unpack them here but don't need to use them - we just clean up regardless. */
if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
&exc_type, &exc_value, &exc_traceback))
return NULL;

/* Cleanup: destroy admin client */
if (self->rk) {
CallState_begin(self, &cs);

rd_kafka_destroy(self->rk);
self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;
}

Py_RETURN_NONE;
}


static PyMethodDef Admin_methods[] = {
{ "create_topics", (PyCFunction)Admin_create_topics,
Expand Down Expand Up @@ -3384,12 +3424,18 @@ static PyMethodDef Admin_methods[] = {
{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
Admin_elect_leaders_doc
},
{ "__enter__", (PyCFunction)Admin_enter, METH_NOARGS,
"Context manager entry." },
{ "__exit__", (PyCFunction)Admin_exit, METH_VARARGS,
"Context manager exit. Automatically destroys the admin client." },

{ NULL }
};


static Py_ssize_t Admin__len__ (Handle *self) {
if (!self->rk)
return 0;
return rd_kafka_outq_len(self->rk);
}

Expand Down
29 changes: 27 additions & 2 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,29 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
Py_RETURN_NONE;
}

static PyObject *Consumer_enter (Handle *self) {
Py_INCREF(self);
return (PyObject *)self;
}

static PyObject *Consumer_exit (Handle *self, PyObject *args) {
PyObject *exc_type, *exc_value, *exc_traceback;

if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
&exc_type, &exc_value, &exc_traceback))
return NULL;

/* Cleanup: call close() */
if (self->rk) {
PyObject *result = Consumer_close(self, NULL);
if (!result)
return NULL;
Py_DECREF(result);
}

Py_RETURN_NONE;
}

static PyObject *
Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) {
rd_kafka_consumer_group_metadata_t *cgmd;
Expand Down Expand Up @@ -1527,8 +1550,10 @@ static PyMethodDef Consumer_methods[] = {
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},


{ "__enter__", (PyCFunction)Consumer_enter, METH_NOARGS,
"Context manager entry." },
{ "__exit__", (PyCFunction)Consumer_exit, METH_VARARGS,
"Context manager exit. Automatically closes the consumer." },
{ NULL }
};

Expand Down
10 changes: 10 additions & 0 deletions src/confluent_kafka/src/Metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ list_topics (Handle *self, PyObject *args, PyObject *kwargs) {
&topic, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
return NULL;
}

if (topic != NULL) {
if (!(only_rkt = rd_kafka_topic_new(self->rk,
topic, NULL))) {
Expand Down Expand Up @@ -605,6 +610,11 @@ list_groups (Handle *self, PyObject *args, PyObject *kwargs) {
&group, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
return NULL;
}

CallState_begin(self, &cs);

err = rd_kafka_list_groups(self->rk, group, &group_list,
Expand Down
Loading