@@ -1522,13 +1522,73 @@ static void log_cb (const rd_kafka_t *rk, int level,
15221522 CallState_resume (cs );
15231523}
15241524
1525+ /**
1526+ * @brief Translate Python \p key and \p value to C types and set on
1527+ * provided \p extensions char* array at the provided index.
1528+ *
1529+ * @returns 1 on success or 0 if an exception was raised.
1530+ */
1531+ static int py_extensions_to_c (char * * extensions , Py_ssize_t idx ,
1532+ PyObject * key , PyObject * value ) {
1533+ PyObject * ks , * ks8 , * vo8 = NULL ;
1534+ const char * k ;
1535+ const char * v ;
1536+ Py_ssize_t ksize = 0 ;
1537+ Py_ssize_t vsize = 0 ;
1538+
1539+ if (!(ks = cfl_PyObject_Unistr (key ))) {
1540+ PyErr_SetString (PyExc_TypeError ,
1541+ "expected extension key to be unicode "
1542+ "string" );
1543+ return 0 ;
1544+ }
1545+
1546+ k = cfl_PyUnistr_AsUTF8 (ks , & ks8 );
1547+ ksize = (Py_ssize_t )strlen (k );
1548+
1549+ if (cfl_PyUnistr (_Check (value ))) {
1550+ /* Unicode string, translate to utf-8. */
1551+ v = cfl_PyUnistr_AsUTF8 (value , & vo8 );
1552+ if (!v ) {
1553+ Py_DECREF (ks );
1554+ Py_XDECREF (ks8 );
1555+ return 0 ;
1556+ }
1557+ vsize = (Py_ssize_t )strlen (v );
1558+ } else {
1559+ PyErr_Format (PyExc_TypeError ,
1560+ "expected extension value to be "
1561+ "unicode string, not %s" ,
1562+ ((PyTypeObject * )PyObject_Type (value ))->
1563+ tp_name );
1564+ Py_DECREF (ks );
1565+ Py_XDECREF (ks8 );
1566+ return 0 ;
1567+ }
1568+
1569+ extensions [idx ] = (char * )malloc (ksize );
1570+ strcpy (extensions [idx ], k );
1571+ extensions [idx + 1 ] = (char * )malloc (vsize );
1572+ strcpy (extensions [idx + 1 ], v );
1573+
1574+ Py_DECREF (ks );
1575+ Py_XDECREF (ks8 );
1576+ Py_XDECREF (vo8 );
1577+
1578+ return 1 ;
1579+ }
1580+
15251581static void oauth_cb (rd_kafka_t * rk , const char * oauthbearer_config ,
15261582 void * opaque ) {
15271583 Handle * h = opaque ;
15281584 PyObject * eo , * result ;
15291585 CallState * cs ;
15301586 const char * token ;
15311587 double expiry ;
1588+ const char * principal = "" ;
1589+ PyObject * extensions = NULL ;
1590+ char * * rd_extensions = NULL ;
1591+ Py_ssize_t rd_extensions_size = 0 ;
15321592 char err_msg [2048 ];
15331593 rd_kafka_resp_err_t err_code ;
15341594
@@ -1539,26 +1599,57 @@ static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config,
15391599 Py_DECREF (eo );
15401600
15411601 if (!result ) {
1542- goto err ;
1602+ goto fail ;
15431603 }
1544- if (!PyArg_ParseTuple (result , "sd" , & token , & expiry )) {
1604+ if (!PyArg_ParseTuple (result , "sd|sO! " , & token , & expiry , & principal , & PyDict_Type , & extensions )) {
15451605 Py_DECREF (result );
1546- PyErr_Format (PyExc_TypeError ,
1606+ PyErr_SetString (PyExc_TypeError ,
15471607 "expect returned value from oauth_cb "
1548- "to be (token_str, expiry_time) tuple" );
1608+ "to be (token_str, expiry_time[, principal, extensions] ) tuple" );
15491609 goto err ;
15501610 }
1611+
1612+ if (extensions ) {
1613+ int len = (int )PyDict_Size (extensions );
1614+ rd_extensions = (char * * )malloc (2 * len * sizeof (char * ));
1615+ Py_ssize_t pos = 0 ;
1616+ PyObject * ko , * vo ;
1617+ while (PyDict_Next (extensions , & pos , & ko , & vo )) {
1618+ if (!py_extensions_to_c (rd_extensions , rd_extensions_size , ko , vo )) {
1619+ Py_DECREF (result );
1620+ free (rd_extensions );
1621+ goto err ;
1622+ }
1623+ rd_extensions_size = rd_extensions_size + 2 ;
1624+ }
1625+ }
1626+
15511627 err_code = rd_kafka_oauthbearer_set_token (h -> rk , token ,
15521628 (int64_t )(expiry * 1000 ),
1553- "" , NULL , 0 , err_msg ,
1629+ principal , ( const char * * ) rd_extensions , rd_extensions_size , err_msg ,
15541630 sizeof (err_msg ));
15551631 Py_DECREF (result );
1556- if (err_code ) {
1632+ if (rd_extensions ) {
1633+ for (int i = 0 ; i < rd_extensions_size ; i ++ ) {
1634+ free (rd_extensions [i ]);
1635+ }
1636+ free (rd_extensions );
1637+ }
1638+
1639+ if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR ) {
15571640 PyErr_Format (PyExc_ValueError , "%s" , err_msg );
1558- goto err ;
1641+ goto fail ;
15591642 }
15601643 goto done ;
15611644
1645+ fail :
1646+ err_code = rd_kafka_oauthbearer_set_token_failure (h -> rk , "OAuth callback raised exception" );
1647+ if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR ) {
1648+ PyErr_SetString (PyExc_ValueError , "Failed to set token failure" );
1649+ goto err ;
1650+ }
1651+ PyErr_Clear ();
1652+ goto done ;
15621653 err :
15631654 CallState_crash (cs );
15641655 rd_kafka_yield (h -> rk );
0 commit comments