diff --git a/src/connection.cc b/src/connection.cc index f76e012f..95e2ddff 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -12,6 +12,7 @@ #include #include "src/connection.h" +#include "kafka-operation-result.h" #include "src/workers.h" using RdKafka::Conf; @@ -100,11 +101,11 @@ RdKafka::Handle* Connection::GetClient() { return m_client; } -Baton Connection::CreateTopic(std::string topic_name) { +KafkaOperationResult Connection::CreateTopic(std::string topic_name) { return CreateTopic(topic_name, NULL); } -Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { +KafkaOperationResult Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { std::string errstr; RdKafka::Topic* topic = NULL; @@ -114,19 +115,19 @@ Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { if (IsConnected()) { topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr); } else { - return Baton(RdKafka::ErrorCode::ERR__STATE); + return KafkaOperationResult(RdKafka::ErrorCode::ERR__STATE); } } else { - return Baton(RdKafka::ErrorCode::ERR__STATE); + return KafkaOperationResult(RdKafka::ErrorCode::ERR__STATE); } if (!errstr.empty()) { - return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr); + return KafkaOperationResult(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr); } // Maybe do it this way later? Then we don't need to do static_cast // - return Baton(topic); + return KafkaOperationResult(topic); } Baton Connection::QueryWatermarkOffsets( @@ -189,15 +190,15 @@ Baton Connection::OffsetsForTimes( Baton Connection::GetMetadata( bool all_topics, std::string topic_name, int timeout_ms) { - RdKafka::Topic* topic = NULL; + std::unique_ptr topic{}; RdKafka::ErrorCode err; std::string errstr; if (!topic_name.empty()) { - Baton b = CreateTopic(topic_name); + KafkaOperationResult b = CreateTopic(topic_name); if (b.err() == RdKafka::ErrorCode::ERR_NO_ERROR) { - topic = b.data(); + topic = b.take_ownership(); } } @@ -211,7 +212,7 @@ Baton Connection::GetMetadata( scoped_shared_read_lock lock(m_connection_lock); if (IsConnected()) { // Always send true - we - err = m_client->metadata(all_topics, topic, &metadata, timeout_ms); + err = m_client->metadata(all_topics, topic.get(), &metadata, timeout_ms); } else { err = RdKafka::ERR__STATE; } diff --git a/src/connection.h b/src/connection.h index bba8a7c7..602e8d17 100644 --- a/src/connection.h +++ b/src/connection.h @@ -21,6 +21,7 @@ #include "src/errors.h" #include "src/config.h" #include "src/callbacks.h" +#include "src/kafka-operation-result.h" namespace NodeKafka { @@ -56,8 +57,8 @@ class Connection : public Nan::ObjectWrap { bool IsClosing(); // Baton - Baton CreateTopic(std::string); - Baton CreateTopic(std::string, RdKafka::Conf*); + KafkaOperationResult CreateTopic(std::string); + KafkaOperationResult CreateTopic(std::string, RdKafka::Conf*); Baton GetMetadata(bool, std::string, int); Baton QueryWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*, int); Baton OffsetsForTimes(std::vector &, int); diff --git a/src/kafka-operation-result.h b/src/kafka-operation-result.h new file mode 100644 index 00000000..cc240efa --- /dev/null +++ b/src/kafka-operation-result.h @@ -0,0 +1,63 @@ +#ifndef SRC_KAFKA_OPERATION_RESULT_H_ +#define SRC_KAFKA_OPERATION_RESULT_H_ + +#include +#include +#include + +#include "rdkafkacpp.h" + +namespace NodeKafka { +/** + * Type-safe wrapper for the result of an RDKafka library operation. + */ +template +class KafkaOperationResult { + public: + /** + * Constructor for a successful operation result. + * Takes ownership of the data pointer. + */ + explicit KafkaOperationResult(T* data) + : m_data(data), m_err(RdKafka::ErrorCode::ERR_NO_ERROR) {} + explicit KafkaOperationResult(RdKafka::ErrorCode err) + : m_data(nullptr), m_err(err) {} + explicit KafkaOperationResult(RdKafka::ErrorCode err, std::string errstr) + : m_data(nullptr), m_err(err), m_errstr(errstr) {} + + /** + * Get a non-owning pointer to the result data. + * Only should be called for non-error results. + */ + T* data() const { + assert(m_data != nullptr); + return m_data.get(); + } + + /** + * Transfer ownership of the result data to the caller. + * Only should be called for non-error results. + */ + std::unique_ptr take_ownership() { + assert(m_data != nullptr); + std::unique_ptr data = std::move(m_data); + m_data.reset(); + return data; + } + + RdKafka::ErrorCode err() const { + return m_err; + } + + std::string errstr() const { + return m_errstr.empty() ? RdKafka::err2str(m_err) : m_errstr; + } + + private: + std::unique_ptr m_data; + RdKafka::ErrorCode m_err; + std::string m_errstr; +}; +} // namespace NodeKafka + +#endif // SRC_KAFKA_OPERATION_RESULT_H_ diff --git a/src/producer.cc b/src/producer.cc index 8e20320a..90e214fb 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -12,6 +12,7 @@ #include "src/producer.h" #include "src/kafka-consumer.h" +#include "src/kafka-operation-result.h" #include "src/workers.h" namespace NodeKafka { @@ -617,22 +618,17 @@ NAN_METHOD(Producer::NodeProduce) { Topic* topic = ObjectWrap::Unwrap(info[0].As()); // Unwrap it and turn it into an RdKafka::Topic* - Baton topic_baton = topic->toRDKafkaTopic(producer); + KafkaOperationResult topic_result = topic->toRDKafkaTopic(producer); - if (topic_baton.err() != RdKafka::ERR_NO_ERROR) { + if (topic_result.err() != RdKafka::ERR_NO_ERROR) { // Let the JS library throw if we need to so the error can be more rich - error_code = static_cast(topic_baton.err()); + error_code = static_cast(topic_result.err()); return info.GetReturnValue().Set(Nan::New(error_code)); } - RdKafka::Topic* rd_topic = topic_baton.data(); - Baton b = producer->Produce(message_buffer_data, message_buffer_length, - rd_topic, partition, key_buffer_data, key_buffer_length, opaque); - - // Delete the topic when we are done. - delete rd_topic; + topic_result.data(), partition, key_buffer_data, key_buffer_length, opaque); error_code = static_cast(b.err()); } diff --git a/src/topic.cc b/src/topic.cc index 4c9d663a..98f6c568 100644 --- a/src/topic.cc +++ b/src/topic.cc @@ -45,7 +45,7 @@ std::string Topic::name() { return m_topic_name; } -Baton Topic::toRDKafkaTopic(Connection* handle) { +KafkaOperationResult Topic::toRDKafkaTopic(Connection* handle) { if (m_config) { return handle->CreateTopic(m_topic_name, m_config); } else { diff --git a/src/topic.h b/src/topic.h index 425bf7fd..4518e8fc 100644 --- a/src/topic.h +++ b/src/topic.h @@ -16,6 +16,7 @@ #include "rdkafkacpp.h" #include "src/config.h" +#include "src/kafka-operation-result.h" namespace NodeKafka { @@ -24,7 +25,7 @@ class Topic : public Nan::ObjectWrap { static void Init(v8::Local); static v8::Local NewInstance(v8::Local arg); - Baton toRDKafkaTopic(Connection *handle); + KafkaOperationResult toRDKafkaTopic(Connection *handle); protected: static Nan::Persistent constructor;