Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The scripts in this directory provide various examples of using the Confluent Py

- [producer.py](producer.py): Read lines from stdin and send them to a Kafka topic.
- [consumer.py](consumer.py): Read messages from a Kafka topic.
- [context_manager_example.py](context_manager_example.py): **Demonstrates context manager (`with` statement) usage for Producer, Consumer, and AdminClient** - shows automatic resource cleanup when exiting the `with` block.

## AsyncIO Examples

Expand Down
73 changes: 36 additions & 37 deletions examples/avro_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,42 @@ def main(args):
# If Confluent Cloud SR credentials are provided, add to config
if args.sr_api_key and args.sr_api_secret:
schema_registry_conf['basic.auth.user.info'] = f"{args.sr_api_key}:{args.sr_api_secret}"
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)

string_serializer = StringSerializer('utf_8')

producer_conf = {'bootstrap.servers': args.bootstrap_servers}

producer = Producer(producer_conf)

print("Producing user records to topic {}. ^C to exit.".format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
user_name = input("Enter name: ")
user_address = input("Enter address: ")
user_favorite_number = int(input("Enter favorite number: "))
user_favorite_color = input("Enter favorite color: ")
user = User(name=user_name,
address=user_address,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic,
key=string_serializer(str(uuid4())),
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue

print("\nFlushing records...")
producer.flush()

# Use context manager for SchemaRegistryClient to ensure proper cleanup
with SchemaRegistryClient(schema_registry_conf) as schema_registry_client:
avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)
Comment on lines +119 to +122
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The avro_serializer is created inside the SchemaRegistryClient context but used in the Producer context. This creates a dependency where the serializer references a potentially closed schema registry client. Consider restructuring to ensure the schema registry client remains open for the lifetime of the serializer.

Copilot uses AI. Check for mistakes.

string_serializer = StringSerializer('utf_8')

producer_conf = {'bootstrap.servers': args.bootstrap_servers}

# Use context manager for Producer to ensure proper cleanup
with Producer(producer_conf) as producer:
print("Producing user records to topic {}. ^C to exit.".format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
user_name = input("Enter name: ")
user_address = input("Enter address: ")
user_favorite_number = int(input("Enter favorite number: "))
user_favorite_color = input("Enter favorite color: ")
user = User(name=user_name,
address=user_address,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic,
key=string_serializer(str(uuid4())),
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
# Producer will automatically flush on context exit


if __name__ == '__main__':
Expand Down
131 changes: 131 additions & 0 deletions examples/context_manager_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Example demonstrating context manager usage for Producer, Consumer, and AdminClient.
# Context managers ensure proper cleanup of resources when exiting the 'with' block.
#

from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import sys


def main():
if len(sys.argv) < 2:
sys.stderr.write('Usage: %s <bootstrap-brokers>\n' % sys.argv[0])
sys.exit(1)

broker = sys.argv[1]
topic = 'context-manager-example'

# Example 1: AdminClient with context manager
# Automatically destroys the admin client when exiting the 'with' block
print("=== AdminClient Context Manager Example ===")
admin_conf = {'bootstrap.servers': broker}

with AdminClient(admin_conf) as admin:
# Create a topic using AdminClient
topic_obj = NewTopic(topic, num_partitions=1, replication_factor=1)
futures = admin.create_topics([topic_obj])

# Wait for the operation to complete
for topic_name, future in futures.items():
try:
future.result() # The result itself is None
print(f"Topic '{topic_name}' created successfully")
except Exception as e:
print(f"Failed to create topic '{topic_name}': {e}")

# Poll to ensure callbacks are processed
admin.poll(timeout=1.0)

# AdminClient is automatically destroyed here, no need for manual cleanup

# Example 2: Producer with context manager
# Automatically flushes pending messages and destroys the producer
print("\n=== Producer Context Manager Example ===")
producer_conf = {'bootstrap.servers': broker}

def delivery_callback(err, msg):
if err:
print(f'Message failed delivery: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')

with Producer(producer_conf) as producer:
# Produce some messages
for i in range(5):
value = f'Message {i} from context manager example'
producer.produce(
topic,
key=f'key-{i}',
value=value.encode('utf-8'),
callback=delivery_callback
)
# Poll for delivery callbacks
producer.poll(0)

print(f"Produced 5 messages to topic '{topic}'")

# Producer automatically flushes all pending messages and destroys here
# No need to call producer.flush() or manually clean up

# Example 3: Consumer with context manager
# Automatically closes the consumer (leaves consumer group, commits offsets)
print("\n=== Consumer Context Manager Example ===")
consumer_conf = {
'bootstrap.servers': broker,
'group.id': 'context-manager-example-group',
'auto.offset.reset': 'earliest'
}

with Consumer(consumer_conf) as consumer:
# Subscribe to the topic
consumer.subscribe([topic])

# Consume messages
msg_count = 0
try:
while msg_count < 5:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition, try next message
continue
else:
print(f'Consumer error: {msg.error()}')
break

print(f'Consumed message: key={msg.key().decode("utf-8")}, '
f'value={msg.value().decode("utf-8")}, '
f'partition={msg.partition()}, offset={msg.offset()}')
msg_count += 1
except KeyboardInterrupt:
print('Consumer interrupted by user')

# Consumer automatically calls close() here (leaves group, commits offsets)
# No need to manually call consumer.close()

print("\n=== All examples completed successfully! ===")


if __name__ == '__main__':
main()
6 changes: 6 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class Producer:
def set_sasl_credentials(self, username: str, password: str) -> None: ...
def __len__(self) -> int: ...
def __bool__(self) -> bool: ...
def __enter__(self) -> "Producer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

class Consumer:
def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ...
Expand Down Expand Up @@ -208,6 +210,8 @@ class Consumer:
timeout: float = -1
) -> List[TopicPartition]: ...
def close(self) -> None: ...
def __enter__(self) -> "Consumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ...
def offsets_for_times(
self,
Expand All @@ -223,6 +227,8 @@ class Consumer:

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> "_AdminClientImpl": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
def create_topics(
self,
topics: List['NewTopic'],
Expand Down
46 changes: 46 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
rd_kafka_error_t *err_obj = NULL;
char errstr[512];

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
return NULL;
}

c_options = rd_kafka_AdminOptions_new(self->rk, for_api);
if (!c_options) {
PyErr_Format(PyExc_RuntimeError,
Expand Down Expand Up @@ -3245,6 +3250,11 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
return NULL;
}

r = Admin_poll0(self, (int)(tmout * 1000));
if (r == -1)
return NULL;
Expand All @@ -3253,6 +3263,36 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
}


static PyObject *Admin_enter (Handle *self) {
Py_INCREF(self);
return (PyObject *)self;
}

static PyObject *Admin_exit (Handle *self, PyObject *args) {
PyObject *exc_type, *exc_value, *exc_traceback;
CallState cs;

/* __exit__ always receives 3 arguments: exception type, exception value, and traceback.
* All three will be None if the with block completed without an exception.
* We unpack them here but don't need to use them - we just clean up regardless. */
if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
&exc_type, &exc_value, &exc_traceback))
return NULL;

/* Cleanup: destroy admin client */
if (self->rk) {
CallState_begin(self, &cs);

rd_kafka_destroy(self->rk);
self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;
}

Py_RETURN_NONE;
}


static PyMethodDef Admin_methods[] = {
{ "create_topics", (PyCFunction)Admin_create_topics,
Expand Down Expand Up @@ -3384,12 +3424,18 @@ static PyMethodDef Admin_methods[] = {
{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
Admin_elect_leaders_doc
},
{ "__enter__", (PyCFunction)Admin_enter, METH_NOARGS,
"Context manager entry." },
{ "__exit__", (PyCFunction)Admin_exit, METH_VARARGS,
"Context manager exit. Automatically destroys the admin client." },

{ NULL }
};


static Py_ssize_t Admin__len__ (Handle *self) {
if (!self->rk)
return 0;
return rd_kafka_outq_len(self->rk);
}

Expand Down
29 changes: 27 additions & 2 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,29 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
Py_RETURN_NONE;
}

static PyObject *Consumer_enter (Handle *self) {
Py_INCREF(self);
return (PyObject *)self;
}

static PyObject *Consumer_exit (Handle *self, PyObject *args) {
PyObject *exc_type, *exc_value, *exc_traceback;

if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
&exc_type, &exc_value, &exc_traceback))
return NULL;

/* Cleanup: call close() */
if (self->rk) {
PyObject *result = Consumer_close(self, NULL);
if (!result)
return NULL;
Py_DECREF(result);
}

Py_RETURN_NONE;
}

static PyObject *
Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) {
rd_kafka_consumer_group_metadata_t *cgmd;
Expand Down Expand Up @@ -1527,8 +1550,10 @@ static PyMethodDef Consumer_methods[] = {
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},


{ "__enter__", (PyCFunction)Consumer_enter, METH_NOARGS,
"Context manager entry." },
{ "__exit__", (PyCFunction)Consumer_exit, METH_VARARGS,
"Context manager exit. Automatically closes the consumer." },
{ NULL }
};

Expand Down
10 changes: 10 additions & 0 deletions src/confluent_kafka/src/Metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ list_topics (Handle *self, PyObject *args, PyObject *kwargs) {
&topic, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
return NULL;
}

if (topic != NULL) {
if (!(only_rkt = rd_kafka_topic_new(self->rk,
topic, NULL))) {
Expand Down Expand Up @@ -605,6 +610,11 @@ list_groups (Handle *self, PyObject *args, PyObject *kwargs) {
&group, &tmout))
return NULL;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
return NULL;
}

CallState_begin(self, &cs);

err = rd_kafka_list_groups(self->rk, group, &group_list,
Expand Down
Loading