- 
                Notifications
    You must be signed in to change notification settings 
- Fork 99
Open
Description
I have a simple barebones oAuth callback registered as follows
 _props.put<const KAFKA_API::clients::OauthbearerTokenRefreshCallback> 
                (KAFKA_API::clients::Config::OAUTHBEARER_TOKEN_REFRESH_CB, 
               [](const std::string& oauthbearerConfig) 
               {
                   KAFKA_API::clients::SaslOauthbearerToken token_info;
                   return token_info;
               } );
Yet when the callback is invoked it gets an exception. I added some debug lines in KafkaClient.h to log the details
oauthbearerTokenRefreshCallback called
calling onOauthbearerTokenRefresh()
oauthbearerTokenRefreshCallback failed [basic_string::_M_construct null not valid]
What am I doing wrong in registering this simple oAuth callback ?
Here is the code from KafkaClient.h (library header) where the callback is being invoked
inline void
KafkaClient::oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /* opaque */)
{
    std::cerr << "oauthbearerTokenRefreshCallback called" << std::endl;
    
    SaslOauthbearerToken oauthbearerToken;
    try
    {
        std::cerr << "calling onOauthbearerTokenRefresh()" << std::endl;
        oauthbearerToken = kafkaClient(rk).onOauthbearerTokenRefresh(oauthbearerConfig);
        std::cerr << "oauthbearerTokenRefreshCallback succeeded" << std::endl;
    }
    catch (const std::exception& e)
    {
        rd_kafka_oauthbearer_set_token_failure(rk, e.what());
        std::cerr << "oauthbearerTokenRefreshCallback failed [" << e.what() << "]" << std::endl;
    }
    LogBuffer<LOG_BUFFER_SIZE> errInfo;
    std::vector<const char*> extensions;
    extensions.reserve(oauthbearerToken.extensions.size() * 2);
    for (const auto& kv: oauthbearerToken.extensions)
    {
        extensions.push_back(kv.first.c_str());
        extensions.push_back(kv.second.c_str());
    }
    if (rd_kafka_oauthbearer_set_token(rk,
                                       oauthbearerToken.value.c_str(),
                                       oauthbearerToken.mdLifetime.count(),
                                       oauthbearerToken.mdPrincipalName.c_str(),
                                       extensions.data(), extensions.size(),
                                       errInfo.str(), errInfo.capacity()) != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        rd_kafka_oauthbearer_set_token_failure(rk, errInfo.c_str());
        //std::cerr << "oauthbearerTokenRefreshCallback set token failed [" << errInfo.str() << "]" << std::endl;
    }
}
Metadata
Metadata
Assignees
Labels
No labels