@@ -939,43 +939,125 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
939939}
940940
941941#ifdef RD_KAFKA_V_HEADERS
942+
943+
944+ /**
945+ * @brief Convert Python list of tuples to rd_kafka_headers_t
946+ */
947+ static rd_kafka_headers_t * py_headers_list_to_c (PyObject * hdrs ) {
948+ int i , len ;
949+ rd_kafka_headers_t * rd_headers = NULL ;
950+
951+ len = (int )PyList_Size (hdrs );
952+ rd_headers = rd_kafka_headers_new (len );
953+
954+ for (i = 0 ; i < len ; i ++ ) {
955+ rd_kafka_resp_err_t err ;
956+ const char * header_key , * header_value = NULL ;
957+ int header_key_len = 0 , header_value_len = 0 ;
958+
959+ if (!PyArg_ParseTuple (PyList_GET_ITEM (hdrs , i ), "s#z#" ,
960+ & header_key , & header_key_len ,
961+ & header_value , & header_value_len )){
962+ rd_kafka_headers_destroy (rd_headers );
963+ PyErr_SetString (PyExc_TypeError ,
964+ "Headers are expected to be a "
965+ "tuple of (key, value)" );
966+ return NULL ;
967+ }
968+
969+ err = rd_kafka_header_add (rd_headers ,
970+ header_key , header_key_len ,
971+ header_value , header_value_len );
972+ if (err ) {
973+ cfl_PyErr_Format (err ,
974+ "Unable to add message header \"%s\": "
975+ "%s" ,
976+ header_key , rd_kafka_err2str (err ));
977+ rd_kafka_headers_destroy (rd_headers );
978+ return NULL ;
979+ }
980+ }
981+ return rd_headers ;
982+ }
983+
984+
985+ /**
986+ * @brief Convert Python dict to rd_kafka_headers_t
987+ */
988+ static rd_kafka_headers_t * py_headers_dict_to_c (PyObject * hdrs ) {
989+ int len ;
990+ Py_ssize_t pos = 0 ;
991+ rd_kafka_headers_t * rd_headers = NULL ;
992+ PyObject * ko , * vo ;
993+
994+ len = (int )PyDict_Size (hdrs );
995+ rd_headers = rd_kafka_headers_new (len );
996+
997+ while (PyDict_Next (hdrs , & pos , & ko , & vo )) {
998+ PyObject * ks , * ks8 ;
999+ const char * k ;
1000+ const void * v = NULL ;
1001+ Py_ssize_t vsize = 0 ;
1002+ rd_kafka_resp_err_t err ;
1003+
1004+ if (!(ks = cfl_PyObject_Unistr (ko ))) {
1005+ PyErr_SetString (PyExc_TypeError ,
1006+ "expected header key to be unicode "
1007+ "string" );
1008+ rd_kafka_headers_destroy (rd_headers );
1009+ return NULL ;
1010+ }
1011+
1012+ k = cfl_PyUnistr_AsUTF8 (ks , & ks8 );
1013+
1014+ if (vo != Py_None ) {
1015+ if (PyString_AsStringAndSize (vo , (char * * )& v ,
1016+ & vsize ) == -1 ) {
1017+ Py_DECREF (ks );
1018+ rd_kafka_headers_destroy (rd_headers );
1019+ return NULL ;
1020+ }
1021+ }
1022+
1023+ if ((err = rd_kafka_header_add (rd_headers , k , -1 , v , vsize ))) {
1024+ cfl_PyErr_Format (err ,
1025+ "Unable to add message header \"%s\": "
1026+ "%s" ,
1027+ k , rd_kafka_err2str (err ));
1028+ Py_DECREF (ks );
1029+ rd_kafka_headers_destroy (rd_headers );
1030+ return NULL ;
1031+ }
1032+
1033+ Py_DECREF (ks );
1034+ }
1035+
1036+ return rd_headers ;
1037+ }
1038+
1039+
9421040/**
9431041 * @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
9441042 *
9451043 * @returns The new Python list[(header_key, header_value),...] object.
9461044 */
947- rd_kafka_headers_t * py_headers_to_c (PyObject * headers_plist ) {
948- int i , len ;
949- rd_kafka_headers_t * rd_headers = NULL ;
950- rd_kafka_resp_err_t err ;
951- const char * header_key , * header_value = NULL ;
952- int header_key_len = 0 , header_value_len = 0 ;
953-
954- len = PyList_Size (headers_plist );
955- rd_headers = rd_kafka_headers_new (len );
956-
957- for (i = 0 ; i < len ; i ++ ) {
958-
959- if (!PyArg_ParseTuple (PyList_GET_ITEM (headers_plist , i ), "s#z#" , & header_key ,
960- & header_key_len , & header_value , & header_value_len )){
961- rd_kafka_headers_destroy (rd_headers );
962- PyErr_SetString (PyExc_TypeError ,
963- "Headers are expected to be a tuple of (key, value)" );
964- return NULL ;
965- }
966-
967- err = rd_kafka_header_add (rd_headers , header_key , header_key_len , header_value , header_value_len );
968- if (err ) {
969- rd_kafka_headers_destroy (rd_headers );
970- cfl_PyErr_Format (err ,
971- "Unable to create message headers: %s" ,
972- rd_kafka_err2str (err ));
973- return NULL ;
1045+ rd_kafka_headers_t * py_headers_to_c (PyObject * hdrs ) {
1046+
1047+ if (PyList_Check (hdrs )) {
1048+ return py_headers_list_to_c (hdrs );
1049+ } else if (PyDict_Check (hdrs )) {
1050+ return py_headers_dict_to_c (hdrs );
1051+ } else {
1052+ PyErr_Format (PyExc_TypeError ,
1053+ "expected headers to be "
1054+ "dict or list of (key, value) tuples, not %s" ,
1055+ ((PyTypeObject * )PyObject_Type (hdrs ))-> tp_name );
1056+ return NULL ;
9741057 }
975- }
976- return rd_headers ;
9771058}
9781059
1060+
9791061/**
9801062 * @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
9811063 *
@@ -989,7 +1071,7 @@ PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
9891071 size_t header_value_size ;
9901072 PyObject * header_list ;
9911073
992- header_size = rd_kafka_header_cnt (headers );
1074+ header_size = rd_kafka_header_cnt (headers );
9931075 header_list = PyList_New (header_size );
9941076
9951077 while (!rd_kafka_header_get_all (headers , idx ++ ,
0 commit comments