diff --git a/examples/README.md b/examples/README.md index b8fb2ee67..c52e25920 100644 --- a/examples/README.md +++ b/examples/README.md @@ -8,6 +8,7 @@ The scripts in this directory provide various examples of using the Confluent Py - [producer.py](producer.py): Read lines from stdin and send them to a Kafka topic. - [consumer.py](consumer.py): Read messages from a Kafka topic. +- [context_manager_example.py](context_manager_example.py): **Demonstrates context manager (`with` statement) usage for Producer, Consumer, and AdminClient** - shows automatic resource cleanup when exiting the `with` block. ## AsyncIO Examples diff --git a/examples/avro_producer.py b/examples/avro_producer.py index e563d2e77..e4f5a46e8 100644 --- a/examples/avro_producer.py +++ b/examples/avro_producer.py @@ -114,43 +114,42 @@ def main(args): # If Confluent Cloud SR credentials are provided, add to config if args.sr_api_key and args.sr_api_secret: schema_registry_conf['basic.auth.user.info'] = f"{args.sr_api_key}:{args.sr_api_secret}" - schema_registry_client = SchemaRegistryClient(schema_registry_conf) - - avro_serializer = AvroSerializer(schema_registry_client, - schema_str, - user_to_dict) - - string_serializer = StringSerializer('utf_8') - - producer_conf = {'bootstrap.servers': args.bootstrap_servers} - - producer = Producer(producer_conf) - - print("Producing user records to topic {}. ^C to exit.".format(topic)) - while True: - # Serve on_delivery callbacks from previous calls to produce() - producer.poll(0.0) - try: - user_name = input("Enter name: ") - user_address = input("Enter address: ") - user_favorite_number = int(input("Enter favorite number: ")) - user_favorite_color = input("Enter favorite color: ") - user = User(name=user_name, - address=user_address, - favorite_color=user_favorite_color, - favorite_number=user_favorite_number) - producer.produce(topic=topic, - key=string_serializer(str(uuid4())), - value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)), - on_delivery=delivery_report) - except KeyboardInterrupt: - break - except ValueError: - print("Invalid input, discarding record...") - continue - - print("\nFlushing records...") - producer.flush() + + # Use context manager for SchemaRegistryClient to ensure proper cleanup + with SchemaRegistryClient(schema_registry_conf) as schema_registry_client: + avro_serializer = AvroSerializer(schema_registry_client, + schema_str, + user_to_dict) + + string_serializer = StringSerializer('utf_8') + + producer_conf = {'bootstrap.servers': args.bootstrap_servers} + + # Use context manager for Producer to ensure proper cleanup + with Producer(producer_conf) as producer: + print("Producing user records to topic {}. ^C to exit.".format(topic)) + while True: + # Serve on_delivery callbacks from previous calls to produce() + producer.poll(0.0) + try: + user_name = input("Enter name: ") + user_address = input("Enter address: ") + user_favorite_number = int(input("Enter favorite number: ")) + user_favorite_color = input("Enter favorite color: ") + user = User(name=user_name, + address=user_address, + favorite_color=user_favorite_color, + favorite_number=user_favorite_number) + producer.produce(topic=topic, + key=string_serializer(str(uuid4())), + value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)), + on_delivery=delivery_report) + except KeyboardInterrupt: + break + except ValueError: + print("Invalid input, discarding record...") + continue + # Producer will automatically flush on context exit if __name__ == '__main__': diff --git a/examples/context_manager_example.py b/examples/context_manager_example.py new file mode 100755 index 000000000..cbd23c044 --- /dev/null +++ b/examples/context_manager_example.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Example demonstrating context manager usage for Producer, Consumer, and AdminClient. +# Context managers ensure proper cleanup of resources when exiting the 'with' block. +# + +from confluent_kafka import Producer, Consumer, KafkaError +from confluent_kafka.admin import AdminClient, NewTopic +import sys + + +def main(): + if len(sys.argv) < 2: + sys.stderr.write('Usage: %s \n' % sys.argv[0]) + sys.exit(1) + + broker = sys.argv[1] + topic = 'context-manager-example' + + # Example 1: AdminClient with context manager + # Automatically destroys the admin client when exiting the 'with' block + print("=== AdminClient Context Manager Example ===") + admin_conf = {'bootstrap.servers': broker} + + with AdminClient(admin_conf) as admin: + # Create a topic using AdminClient + topic_obj = NewTopic(topic, num_partitions=1, replication_factor=1) + futures = admin.create_topics([topic_obj]) + + # Wait for the operation to complete + for topic_name, future in futures.items(): + try: + future.result() # The result itself is None + print(f"Topic '{topic_name}' created successfully") + except Exception as e: + print(f"Failed to create topic '{topic_name}': {e}") + + # Poll to ensure callbacks are processed + admin.poll(timeout=1.0) + + # AdminClient is automatically destroyed here, no need for manual cleanup + + # Example 2: Producer with context manager + # Automatically flushes pending messages and destroys the producer + print("\n=== Producer Context Manager Example ===") + producer_conf = {'bootstrap.servers': broker} + + def delivery_callback(err, msg): + if err: + print(f'Message failed delivery: {err}') + else: + print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') + + with Producer(producer_conf) as producer: + # Produce some messages + for i in range(5): + value = f'Message {i} from context manager example' + producer.produce( + topic, + key=f'key-{i}', + value=value.encode('utf-8'), + callback=delivery_callback + ) + # Poll for delivery callbacks + producer.poll(0) + + print(f"Produced 5 messages to topic '{topic}'") + + # Producer automatically flushes all pending messages and destroys here + # No need to call producer.flush() or manually clean up + + # Example 3: Consumer with context manager + # Automatically closes the consumer (leaves consumer group, commits offsets) + print("\n=== Consumer Context Manager Example ===") + consumer_conf = { + 'bootstrap.servers': broker, + 'group.id': 'context-manager-example-group', + 'auto.offset.reset': 'earliest' + } + + with Consumer(consumer_conf) as consumer: + # Subscribe to the topic + consumer.subscribe([topic]) + + # Consume messages + msg_count = 0 + try: + while msg_count < 5: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition, try next message + continue + else: + print(f'Consumer error: {msg.error()}') + break + + print(f'Consumed message: key={msg.key().decode("utf-8")}, ' + f'value={msg.value().decode("utf-8")}, ' + f'partition={msg.partition()}, offset={msg.offset()}') + msg_count += 1 + except KeyboardInterrupt: + print('Consumer interrupted by user') + + # Consumer automatically calls close() here (leaves group, commits offsets) + # No need to manually call consumer.close() + + print("\n=== All examples completed successfully! ===") + + +if __name__ == '__main__': + main() diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi index 99a888acc..d8ebc4583 100644 --- a/src/confluent_kafka/cimpl.pyi +++ b/src/confluent_kafka/cimpl.pyi @@ -157,6 +157,8 @@ class Producer: def set_sasl_credentials(self, username: str, password: str) -> None: ... def __len__(self) -> int: ... def __bool__(self) -> bool: ... + def __enter__(self) -> "Producer": ... + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ... class Consumer: def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ... @@ -208,6 +210,8 @@ class Consumer: timeout: float = -1 ) -> List[TopicPartition]: ... def close(self) -> None: ... + def __enter__(self) -> "Consumer": ... + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ... def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ... def offsets_for_times( self, @@ -223,6 +227,8 @@ class Consumer: class _AdminClientImpl: def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ... + def __enter__(self) -> "_AdminClientImpl": ... + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ... def create_topics( self, topics: List['NewTopic'], diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 4b8363006..7bbd0313c 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -137,6 +137,11 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, rd_kafka_error_t *err_obj = NULL; char errstr[512]; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed"); + return NULL; + } + c_options = rd_kafka_AdminOptions_new(self->rk, for_api); if (!c_options) { PyErr_Format(PyExc_RuntimeError, @@ -3245,6 +3250,11 @@ static PyObject *Admin_poll (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed"); + return NULL; + } + r = Admin_poll0(self, (int)(tmout * 1000)); if (r == -1) return NULL; @@ -3253,6 +3263,36 @@ static PyObject *Admin_poll (Handle *self, PyObject *args, } +static PyObject *Admin_enter (Handle *self) { + Py_INCREF(self); + return (PyObject *)self; +} + +static PyObject *Admin_exit (Handle *self, PyObject *args) { + PyObject *exc_type, *exc_value, *exc_traceback; + CallState cs; + + /* __exit__ always receives 3 arguments: exception type, exception value, and traceback. + * All three will be None if the with block completed without an exception. + * We unpack them here but don't need to use them - we just clean up regardless. */ + if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, + &exc_type, &exc_value, &exc_traceback)) + return NULL; + + /* Cleanup: destroy admin client */ + if (self->rk) { + CallState_begin(self, &cs); + + rd_kafka_destroy(self->rk); + self->rk = NULL; + + if (!CallState_end(self, &cs)) + return NULL; + } + + Py_RETURN_NONE; +} + static PyMethodDef Admin_methods[] = { { "create_topics", (PyCFunction)Admin_create_topics, @@ -3384,12 +3424,18 @@ static PyMethodDef Admin_methods[] = { { "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS, Admin_elect_leaders_doc }, + { "__enter__", (PyCFunction)Admin_enter, METH_NOARGS, + "Context manager entry." }, + { "__exit__", (PyCFunction)Admin_exit, METH_VARARGS, + "Context manager exit. Automatically destroys the admin client." }, { NULL } }; static Py_ssize_t Admin__len__ (Handle *self) { + if (!self->rk) + return 0; return rd_kafka_outq_len(self->rk); } diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 909d1ec3f..2b5fbfd09 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1147,6 +1147,29 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) { Py_RETURN_NONE; } +static PyObject *Consumer_enter (Handle *self) { + Py_INCREF(self); + return (PyObject *)self; +} + +static PyObject *Consumer_exit (Handle *self, PyObject *args) { + PyObject *exc_type, *exc_value, *exc_traceback; + + if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, + &exc_type, &exc_value, &exc_traceback)) + return NULL; + + /* Cleanup: call close() */ + if (self->rk) { + PyObject *result = Consumer_close(self, NULL); + if (!result) + return NULL; + Py_DECREF(result); + } + + Py_RETURN_NONE; +} + static PyObject * Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) { rd_kafka_consumer_group_metadata_t *cgmd; @@ -1527,8 +1550,10 @@ static PyMethodDef Consumer_methods[] = { { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, set_sasl_credentials_doc }, - - + { "__enter__", (PyCFunction)Consumer_enter, METH_NOARGS, + "Context manager entry." }, + { "__exit__", (PyCFunction)Consumer_exit, METH_VARARGS, + "Context manager exit. Automatically closes the consumer." }, { NULL } }; diff --git a/src/confluent_kafka/src/Metadata.c b/src/confluent_kafka/src/Metadata.c index cdbf23d2f..bcc49c002 100644 --- a/src/confluent_kafka/src/Metadata.c +++ b/src/confluent_kafka/src/Metadata.c @@ -370,6 +370,11 @@ list_topics (Handle *self, PyObject *args, PyObject *kwargs) { &topic, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Handle has been closed"); + return NULL; + } + if (topic != NULL) { if (!(only_rkt = rd_kafka_topic_new(self->rk, topic, NULL))) { @@ -605,6 +610,11 @@ list_groups (Handle *self, PyObject *args, PyObject *kwargs) { &group, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Handle has been closed"); + return NULL; + } + CallState_begin(self, &cs); err = rd_kafka_list_groups(self->rk, group, &group_list, diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 7b9d3f39a..c4015b8d8 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -305,6 +305,11 @@ static PyObject *Producer_produce (Handle *self, PyObject *args, if (!dr_cb || dr_cb == Py_None) dr_cb = self->u.Producer.default_dr_cb; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + return NULL; + } + /* Create msgstate if necessary, may return NULL if no callbacks * are wanted. */ msgstate = Producer_msgstate_new(self, dr_cb); @@ -375,6 +380,11 @@ static PyObject *Producer_poll (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + return NULL; + } + r = Producer_poll0(self, cfl_timeout_ms(tmout)); if (r == -1) return NULL; @@ -394,6 +404,11 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) return NULL; + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Producer has been closed"); + return NULL; + } + CallState_begin(self, &cs); err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout)); if (!CallState_end(self, &cs)) @@ -838,7 +853,47 @@ static void *Producer_purge (Handle *self, PyObject *args, return NULL; } - Py_RETURN_NONE; + Py_RETURN_NONE; +} + +static PyObject *Producer_enter (Handle *self) { + Py_INCREF(self); + return (PyObject *)self; +} + +static PyObject *Producer_exit (Handle *self, PyObject *args) { + PyObject *exc_type, *exc_value, *exc_traceback; + rd_kafka_resp_err_t err; + CallState cs; + + if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, + &exc_type, &exc_value, &exc_traceback)) + return NULL; + + /* Cleanup: flush pending messages and destroy producer */ + if (self->rk) { + CallState_begin(self, &cs); + + /* Flush any pending messages (wait indefinitely to ensure delivery) */ + err = rd_kafka_flush(self->rk, -1); + + /* Destroy the producer (even if flush had issues) */ + rd_kafka_destroy(self->rk); + self->rk = NULL; + + if (!CallState_end(self, &cs)) + return NULL; + + /* If flush failed, warn but don't suppress original exception */ + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + PyErr_WarnFormat(PyExc_RuntimeWarning, 1, + "Producer flush failed during context exit: %s", + rd_kafka_err2str(err)); + } + } + + /* Return None to propagate any exceptions from the with block */ + Py_RETURN_NONE; } @@ -1141,11 +1196,17 @@ static PyMethodDef Producer_methods[] = { { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, set_sasl_credentials_doc }, + { "__enter__", (PyCFunction)Producer_enter, METH_NOARGS, + "Context manager entry." }, + { "__exit__", (PyCFunction)Producer_exit, METH_VARARGS, + "Context manager exit. Automatically flushes and destroys the producer." }, { NULL } }; static Py_ssize_t Producer__len__ (Handle *self) { + if (!self->rk) + return 0; return rd_kafka_outq_len(self->rk); } diff --git a/tests/test_Admin.py b/tests/test_Admin.py index e6168cf9f..a3fe6bf67 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1307,3 +1307,216 @@ def throttle_cb_that_raises_kafka(throttle_event): # Verify that error_cb was called assert len(callbacks_called) > 0 assert 'error_cb_runtime' in callbacks_called + + +def test_admin_context_manager_basic(): + """Test basic AdminClient context manager usage and return value""" + config = { + 'socket.timeout.ms': 10 + } + + # Test __enter__ returns self + admin = AdminClient(config) + entered = admin.__enter__() + assert entered is admin + admin.__exit__(None, None, None) # Clean up + + # Test basic context manager usage + with AdminClient(config) as admin: + assert admin is not None + admin.poll(0.001) + + # AdminClient should be closed after exiting context + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.poll(0.001) + + +def test_admin_context_manager_exception_propagation(): + """Test exceptions propagate and admin client is cleaned up""" + config = { + 'socket.timeout.ms': 10 + } + + # Test exception propagation + exception_caught = False + try: + with AdminClient(config) as admin: + admin.poll(0.001) + raise ValueError("Test exception") + except ValueError as e: + assert str(e) == "Test exception" + exception_caught = True + + assert exception_caught, "Exception should have propagated" + + # AdminClient should be closed even after exception + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.poll(0.001) + + +def test_admin_context_manager_exit_with_exceptions(): + """Test __exit__ properly handles exception arguments""" + config = { + 'socket.timeout.ms': 10 + } + + admin = AdminClient(config) + admin.poll(0.001) + + # Simulate exception in with block + exc_type = ValueError + exc_value = ValueError("Test error") + exc_traceback = None + + # __exit__ should cleanup and return None (propagate exception) + result = admin.__exit__(exc_type, exc_value, exc_traceback) + assert result is None # None means propagate exception + + # AdminClient should be closed + with pytest.raises(RuntimeError): + admin.poll(0.001) + + +def test_admin_context_manager_after_exit(): + """Test AdminClient behavior after context manager exit""" + config = { + 'socket.timeout.ms': 10 + } + + # Normal exit + with AdminClient(config) as admin: + admin.poll(0.001) + + # All methods should fail after context exit + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.poll(0.001) + + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_topics([NewTopic("test", 1, 1)]) + + with pytest.raises(RuntimeError, match="Handle has been closed"): + admin.list_topics() + + # __len__ should return 0 for closed admin client + assert len(admin) == 0 + + # Test already-closed admin client edge case + # Using __enter__ and __exit__ directly on already-closed admin + entered = admin.__enter__() + assert entered is admin + + # Operations should still fail + with pytest.raises(RuntimeError): + admin.poll(0.001) + + # __exit__ should handle already-closed gracefully + result = admin.__exit__(None, None, None) + assert result is None + + +def test_admin_context_manager_multiple_instances(): + """Test AdminClient context manager with multiple instances""" + config = { + 'socket.timeout.ms': 10 + } + + # Test multiple sequential instances + with AdminClient(config) as admin1: + admin1.poll(0.001) + + with AdminClient(config) as admin2: + admin2.poll(0.001) + # Both should be independent + assert admin1 is not admin2 + + # Both should be closed + with pytest.raises(RuntimeError): + admin1.poll(0.001) + with pytest.raises(RuntimeError): + admin2.poll(0.001) + + # Test nested context managers + with AdminClient(config) as admin1: + with AdminClient(config) as admin2: + assert admin1 is not admin2 + admin1.poll(0.001) + admin2.poll(0.001) + # admin2 should be closed, admin1 still open + admin1.poll(0.001) + + # Both should be closed now + with pytest.raises(RuntimeError): + admin1.poll(0.001) + with pytest.raises(RuntimeError): + admin2.poll(0.001) + + +def test_admin_context_manager_with_admin_apis(): + """Test AdminClient context manager with various Admin APIs""" + config = { + 'socket.timeout.ms': 10 + } + + acl_binding = AclBinding( + ResourceType.TOPIC, "topic1", ResourcePatternType.LITERAL, + "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW + ) + + with AdminClient(config) as admin: + # Test 1: create_topics API + fs1 = admin.create_topics([NewTopic("test_topic", 1, 1)]) + assert fs1 is not None + assert "test_topic" in fs1 + + # Test 2: delete_topics API + fs2 = admin.delete_topics(["test_topic"]) + assert fs2 is not None + assert "test_topic" in fs2 + + # Test 3: describe_configs API + fs3 = admin.describe_configs([ConfigResource(ResourceType.TOPIC, "test_topic")]) + assert fs3 is not None + + # Test 4: list_topics API + try: + metadata = admin.list_topics(timeout=0.2) + assert metadata is not None + except KafkaException: + # Expected when broker is not available + pass + + # Test 5: list_groups API + try: + groups = admin.list_groups(timeout=0.2) + assert groups is not None + except KafkaException: + # Expected when broker is not available + pass + + # Test 6: create_acls API + fs4 = admin.create_acls([acl_binding]) + assert fs4 is not None + + # Test 7: describe_consumer_groups API + fs5 = admin.describe_consumer_groups(["test-group-1"]) + assert fs5 is not None + assert "test-group-1" in fs5 + + # Poll to process callbacks + admin.poll(0.001) + + # Test 8: All operations should fail after context exit + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_topics([NewTopic("another_topic", 1, 1)]) + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.delete_topics(["another_topic"]) + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_configs([ConfigResource(ResourceType.TOPIC, "test_topic")]) + with pytest.raises(RuntimeError, match="Handle has been closed"): + admin.list_topics(timeout=0.1) + with pytest.raises(RuntimeError, match="Handle has been closed"): + admin.list_groups(timeout=0.1) + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.create_acls([acl_binding]) + with pytest.raises(RuntimeError, match="AdminClient has been closed"): + admin.describe_consumer_groups(["test-group-2"]) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index cb3e832ff..c363ebbe8 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -464,3 +464,154 @@ def error_cb_runtime_error(error): consumer3.consume(timeout=0.1) assert "Runtime error:" in str(exc_info.value) consumer3.close() + + +def test_consumer_context_manager_basic(): + """Test basic Consumer context manager usage and return value""" + config = { + 'group.id': 'test', + 'socket.timeout.ms': 10, + 'session.timeout.ms': 100 + } + + # Test __enter__ returns self + consumer = Consumer(config) + entered = consumer.__enter__() + assert entered is consumer + consumer.__exit__(None, None, None) # Clean up + + # Test basic context manager usage + with Consumer(config) as consumer: + assert consumer is not None + consumer.subscribe(['mytopic']) + consumer.unsubscribe() + + # Consumer should be closed after exiting context + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.subscribe(['mytopic']) + + +def test_consumer_context_manager_exception_propagation(): + """Test exceptions propagate and consumer is cleaned up""" + config = { + 'group.id': 'test', + 'socket.timeout.ms': 10, + 'session.timeout.ms': 100 + } + + # Test exception propagation + exception_caught = False + try: + with Consumer(config) as consumer: + consumer.subscribe(['mytopic']) + raise ValueError("Test exception") + except ValueError as e: + assert str(e) == "Test exception" + exception_caught = True + + assert exception_caught, "Exception should have propagated" + + # Consumer should be closed even after exception + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.subscribe(['mytopic']) + + +def test_consumer_context_manager_exit_with_exceptions(): + """Test __exit__ properly handles exception arguments""" + config = { + 'group.id': 'test', + 'socket.timeout.ms': 10, + 'session.timeout.ms': 100 + } + + consumer = Consumer(config) + consumer.subscribe(['mytopic']) + + # Simulate exception in with block + exc_type = ValueError + exc_value = ValueError("Test error") + exc_traceback = None + + # __exit__ should cleanup and return None (propagate exception) + result = consumer.__exit__(exc_type, exc_value, exc_traceback) + assert result is None # None means propagate exception + + # Consumer should be closed + with pytest.raises(RuntimeError): + consumer.subscribe(['mytopic']) + + +def test_consumer_context_manager_after_exit(): + """Test Consumer behavior after context manager exit""" + config = { + 'group.id': 'test', + 'socket.timeout.ms': 10, + 'session.timeout.ms': 100 + } + + # Normal exit + with Consumer(config) as consumer: + consumer.subscribe(['mytopic']) + consumer.unsubscribe() + + # All methods should fail after context exit + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.subscribe(['mytopic']) + + with pytest.raises(RuntimeError, match="Consumer closed"): + consumer.poll() + + # close() is idempotent - calling it on already-closed consumer should not raise + consumer.close() # Should succeed silently + + # Test already-closed consumer edge case + # Using __enter__ and __exit__ directly on already-closed consumer + entered = consumer.__enter__() + assert entered is consumer + + # Operations should still fail + with pytest.raises(RuntimeError): + consumer.subscribe(['mytopic']) + + # __exit__ should handle already-closed gracefully + result = consumer.__exit__(None, None, None) + assert result is None + + +def test_consumer_context_manager_multiple_instances(): + """Test Consumer context manager with multiple instances""" + config = { + 'group.id': 'test', + 'socket.timeout.ms': 10, + 'session.timeout.ms': 100 + } + + # Test multiple sequential instances + with Consumer(config) as consumer1: + consumer1.subscribe(['mytopic']) + + with Consumer(config) as consumer2: + consumer2.subscribe(['mytopic']) + # Both should be independent + assert consumer1 is not consumer2 + + # Both should be closed + with pytest.raises(RuntimeError): + consumer1.subscribe(['mytopic']) + with pytest.raises(RuntimeError): + consumer2.subscribe(['mytopic']) + + # Test nested context managers + with Consumer(config) as consumer1: + with Consumer(config) as consumer2: + assert consumer1 is not consumer2 + consumer1.subscribe(['mytopic']) + consumer2.subscribe(['mytopic2']) + # consumer2 should be closed, consumer1 still open + consumer1.unsubscribe() + + # Both should be closed now + with pytest.raises(RuntimeError): + consumer1.subscribe(['mytopic']) + with pytest.raises(RuntimeError): + consumer2.subscribe(['mytopic2']) diff --git a/tests/test_Producer.py b/tests/test_Producer.py index afb784ebc..f535d0e44 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1151,3 +1151,195 @@ def delivery_cb_that_raises(err, msg): producer.flush(timeout=2.0) assert "Test exception from delivery_cb" in str(exc_info.value) + + +def test_producer_context_manager_basic(): + """Test basic Producer context manager usage and return value""" + config = { + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + # Test __enter__ returns self + producer = Producer(config) + entered = producer.__enter__() + assert entered is producer + producer.__exit__(None, None, None) # Clean up + + # Test basic context manager usage + with Producer(config) as producer: + assert producer is not None + producer.produce('mytopic', value=b'test message') + producer.poll(0) + + # Producer should be closed after exiting context + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.produce('mytopic', value=b'test message') + + +def test_producer_context_manager_exception_propagation(): + """Test exceptions propagate and producer is cleaned up""" + config = { + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + # Test exception propagation + exception_caught = False + try: + with Producer(config) as producer: + producer.produce('mytopic', value=b'test') + raise ValueError("Test exception") + except ValueError as e: + assert str(e) == "Test exception" + exception_caught = True + + assert exception_caught, "Exception should have propagated" + + # Producer should be closed even after exception + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.produce('mytopic', value=b'test') + + +def test_producer_context_manager_exit_with_exceptions(): + """Test __exit__ properly handles exception arguments""" + config = { + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + producer = Producer(config) + producer.produce('mytopic', value=b'test') + + # Simulate exception in with block + exc_type = ValueError + exc_value = ValueError("Test error") + exc_traceback = None + + # __exit__ should cleanup and return None (propagate exception) + result = producer.__exit__(exc_type, exc_value, exc_traceback) + assert result is None # None means propagate exception + + # Producer should be closed + with pytest.raises(RuntimeError): + producer.produce('mytopic', value=b'test') + + +def test_producer_context_manager_after_exit(): + """Test Producer behavior after context manager exit""" + config = { + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + # Normal exit + with Producer(config) as producer: + producer.produce('mytopic', value=b'test') + + # All methods should fail after context exit + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.produce('mytopic', value=b'test') + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.flush() + + with pytest.raises(RuntimeError, match="Producer has been closed"): + producer.poll(0) + + # __len__ should return 0 for closed producer + assert len(producer) == 0 + + # Test already-closed producer edge case + # Using __enter__ and __exit__ directly on already-closed producer + entered = producer.__enter__() + assert entered is producer + + # Operations should still fail + with pytest.raises(RuntimeError): + producer.produce('mytopic', value=b'test') + + # __exit__ should handle already-closed gracefully + result = producer.__exit__(None, None, None) + assert result is None + + +def test_producer_context_manager_multiple_instances(): + """Test Producer context manager with multiple instances""" + config = { + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + # Test multiple sequential instances + with Producer(config) as producer1: + producer1.produce('mytopic', value=b'message 1') + + with Producer(config) as producer2: + producer2.produce('mytopic', value=b'message 2') + # Both should be independent + assert producer1 is not producer2 + + # Both should be closed + with pytest.raises(RuntimeError): + producer1.produce('mytopic', value=b'test') + with pytest.raises(RuntimeError): + producer2.produce('mytopic', value=b'test') + + # Test nested context managers + with Producer(config) as producer1: + with Producer(config) as producer2: + assert producer1 is not producer2 + producer1.produce('mytopic', value=b'message 1') + producer2.produce('mytopic', value=b'message 2') + # producer2 should be closed, producer1 still open + producer1.produce('mytopic', value=b'message 3') + + # Both should be closed now + with pytest.raises(RuntimeError): + producer1.produce('mytopic', value=b'test') + with pytest.raises(RuntimeError): + producer2.produce('mytopic', value=b'test') + + +def test_producer_context_manager_with_callbacks(): + """Test Producer context manager properly handles delivery callbacks""" + config = { + 'bootstrap.servers': 'localhost:9092', + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + + delivered = [] + + def on_delivery(err, msg): + delivered.append((err, msg)) + + with Producer(config) as producer: + producer.produce('mytopic', + value=b'test message', + callback=on_delivery) + producer.poll(0) + # Context manager should flush, triggering callbacks + + # Callbacks should be invoked when context manager flushes + # if broker available, message may succeed (err=None) + # If broker unavailable, message will timeout (err with timeout/transport error) + assert len(delivered) > 0, "Delivery callback should have been called" + err, msg = delivered[0] + assert msg is not None + + # Handle both cases: broker available (err=None success) or unavailable (timeout/transport error) + if err is None: + # Success case - broker is available and message was delivered + pass # Message delivered successfully + else: + # Error case - broker unavailable or connection failed + assert err.code() in (KafkaError._MSG_TIMED_OUT, KafkaError._TRANSPORT, + KafkaError._TIMED_OUT), \ + f"Expected success (err=None) or timeout/transport error, got {err.code()}"