diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index afb9f62..80e2867 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -23,6 +23,7 @@ namespace kspp { } void start(int64_t offset) override { + _thread = std::thread(&kafka_source_base::thread_f, this); _impl.start(offset); _started = true; } @@ -92,7 +93,7 @@ namespace kspp { : partition_source(nullptr, partition) , _started(false) , _exit(false) - , _thread(&kafka_source_base::thread_f, this) + , _thread_f_finished(false) , _impl(config, topic, partition, consumer_group) , _key_codec(key_codec) , _val_codec(val_codec) @@ -163,12 +164,14 @@ namespace kspp { _commit_chain_size.set(_commit_chain.size()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + _thread_f_finished = true; DLOG(INFO) << "exiting thread"; } size_t _max_incomming_queue_size=1000; bool _started; bool _exit; + bool _thread_f_finished; std::thread _thread; event_queue _incomming_msg; kafka_consumer _impl; @@ -212,6 +215,12 @@ namespace kspp { key_codec, val_codec) { } + + ~kafka_source() override + { + while (!this->_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { @@ -295,6 +304,12 @@ namespace kspp { val_codec) { } + ~kafka_source() override + { + while (!this->_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { if (!ref) @@ -353,6 +368,12 @@ namespace kspp { nullptr) { } + ~kafka_source() override + { + while (!this->_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { if (!ref || ref->key_len() == 0)