Skip to content

We should probably set the log-call-back got from the properties before using it. #236

@leiwang008

Description

@leiwang008

Get the log-call-back and set it to local field so that it can be used by the rd_kafka. We should also probably use the string constant instead of hardcoded string :-).

if (properties.contains("log_cb"))

    // Log Callback
    if (properties.contains(Config::LOG_CB))
    {
        setLogCallback(properties.get<LogCallback>(Config::LOG_CB));

        rd_kafka_conf_set_log_cb(rk_conf.get(), KafkaClient::logCallback);
    }

    // Statistics Callback
    if (properties.contains(Config::STATS_CB))
    {
        setStatsCallback(properties.get<StatsCallback>(Config::STATS_CB));

        rd_kafka_conf_set_stats_cb(rk_conf.get(), KafkaClient::statsCallback);
    }

    // Error Callback
    if (properties.contains(Config::ERROR_CB))
    {
        setErrorCallback(properties.get<ErrorCallback>(Config::ERROR_CB));

        rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback);
    }

    // OAUTHBEARER Toker Refresh Callback
    if (properties.contains(Config::OAUTHBEARER_TOKEN_REFRESH_CB))
    {
        setOauthbearerTokenRefreshCallback(properties.get<OauthbearerTokenRefreshCallback>(Config::OAUTHBEARER_TOKEN_REFRESH_CB));

        rd_kafka_conf_set_oauthbearer_token_refresh_cb(rk_conf.get(), KafkaClient::oauthbearerTokenRefreshCallback);
    }

    // Interceptor
    if (properties.contains(Config::INTERCEPTORS))
    {
        setInterceptors(properties.get<Interceptors>(Config::INTERCEPTORS));

        const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) };
        KAFKA_THROW_IF_WITH_ERROR(result);
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions