diff --git a/DEVELOPER.md b/DEVELOPER.md index 8f241865e..faaa7324f 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -35,6 +35,12 @@ If librdkafka is installed in a non-standard location provide the include and li C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build ``` +On MacOS, If you installed librdkafka with brew, you can use the following +```bash +export C_INCLUDE_PATH=$(brew --prefix librdkafka)/include +export LIBRARY_PATH=$(brew --prefix librdkafka)/lib +``` + 4. **Install confluent-kafka-python with optional dependencies** ```bash pip3 install -e .[dev,tests,docs] @@ -87,6 +93,26 @@ python3 tools/unasync.py --check If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counter parts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the --check flag and fail the build. +## Local Setup with UV + +Tested with python 3.11 + +```bash +# Modify pyproject.toml to require python version >=3.11 +# This fixes the cel-python dependency conflict +uv venv --python 3.11 +source .venv/bin/activate + +uv sync --extra dev --extra tests +uv pip install trivup setuptools +pytest tests/ + +# When making changes, change project.version in pyproject.toml before re-running: +uv sync --extra dev --extra tests + +``` + + ## Tests diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index b6a51f510..fd1ae1bb3 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -393,6 +393,42 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, return cfl_PyInt_FromInt(qlen); } + +static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) { + + CallState cs; + + if (!self->rk) + Py_RETURN_TRUE; + + CallState_begin(self, &cs); + + /* Warn if there are pending messages */ + int outq_len = rd_kafka_outq_len(self->rk); + if (outq_len > 0) { + const char msg[150]; + sprintf(msg, "There are %d message(s) still in producer queue! " + "Use flush() or wait for delivery.", outq_len); + rd_kafka_log_print( + self->rk, + CK_LOG_WARNING, + "CLOSWARN", + msg + ); + } + rd_kafka_destroy(self->rk); + rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested"); + + self->rk = NULL; + + if (!CallState_end(self, &cs)) + return NULL; + + Py_RETURN_TRUE; + +} + + static PyObject *Producer_init_transactions (Handle *self, PyObject *args) { CallState cs; rd_kafka_error_t *error; @@ -609,7 +645,15 @@ static PyMethodDef Producer_methods[] = { " :rtype: int\n" "\n" }, - + { "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS, + ".. py:function:: close()\n" + "\n" + " Request to close the producer on demand.\n" + "\n" + " :rtype: bool\n" + " :returns: True if producer close requested successfully, False otherwise\n" + "\n" + }, { "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS, ".. py:function:: flush([timeout])\n" "\n" diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index b77adb451..f0f86be49 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -96,6 +96,16 @@ #endif +#define CK_LOG_EMERG 0 +#define CK_LOG_ALERT 1 +#define CK_LOG_CRIT 2 +#define CK_LOG_ERR 3 +#define CK_LOG_WARNING 4 +#define CK_LOG_NOTICE 5 +#define CK_LOG_INFO 6 +#define CK_LOG_DEBUG 7 + + /**************************************************************************** * diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 0f0d69e1d..309631a9e 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1,11 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import pytest +import json from struct import pack +import pytest + from confluent_kafka import Producer, KafkaError, KafkaException, \ TopicPartition, libversion - from tests.common import TestConsumer @@ -47,6 +48,8 @@ def on_delivery(err, msg): except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT) + assert p.close(), "Failed to validate that producer was closed." + def test_produce_timestamp(): """ Test produce() with timestamp arg """ @@ -239,6 +242,8 @@ def test_transaction_api(): assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False + assert p.close(), "The producer was not closed" + def test_purge(): """ @@ -274,6 +279,8 @@ def on_delivery(err, msg): p.flush(0.002) assert cb_detector["on_delivery_called"] + assert p.close(), "The producer was not closed" + def test_producer_bool_value(): """ @@ -283,3 +290,20 @@ def test_producer_bool_value(): p = Producer({}) assert bool(p) + assert p.close(), "The producer was not fully closed" + + +def test_producer_close(): + """ + Ensures the producer close can be requested on demand + """ + conf = { + 'debug': 'all', + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + producer = Producer(conf) + msg = {"test": "test"} + producer.produce(json.dumps(msg)) + assert producer.close(), "The producer could nto be closed on demand" \ No newline at end of file