|
| 1 | +/** |
| 2 | + * @file |
| 3 | + * @author Michal Sedlak <[email protected]> |
| 4 | + * @brief Inserter class for inserting data into ClickHouse |
| 5 | + * @date 2025 |
| 6 | + * |
| 7 | + * Copyright(c) 2025 CESNET z.s.p.o. |
| 8 | + * SPDX-License-Identifier: BSD-3-Clause |
| 9 | + */ |
| 10 | + |
| 11 | +#include "inserter.h" |
| 12 | +#include <sstream> |
| 13 | +#include <string> |
| 14 | +#include <utility> |
| 15 | +#include <vector> |
| 16 | + |
| 17 | +static constexpr int ERR_TABLE_NOT_FOUND = 60; |
| 18 | +static constexpr int STOP_TIMEOUT_SECS = 10; |
| 19 | + |
| 20 | +static std::vector<std::pair<std::string, std::string>> describe_table(clickhouse::Client &client, const std::string &table) |
| 21 | +{ |
| 22 | + std::vector<std::pair<std::string, std::string>> name_and_type; |
| 23 | + try { |
| 24 | + client.Select("DESCRIBE TABLE " + table, [&](const clickhouse::Block &block) { |
| 25 | + if (block.GetColumnCount() > 0 && block.GetRowCount() > 0) { |
| 26 | + const auto &name = block[0]->As<clickhouse::ColumnString>(); |
| 27 | + const auto &type = block[1]->As<clickhouse::ColumnString>(); |
| 28 | + for (size_t i = 0; i < block.GetRowCount(); i++) { |
| 29 | + name_and_type.emplace_back(name->At(i), type->At(i)); |
| 30 | + } |
| 31 | + } |
| 32 | + }); |
| 33 | + } catch (const clickhouse::ServerException &exc) { |
| 34 | + if (exc.GetCode() == ERR_TABLE_NOT_FOUND) { |
| 35 | + throw Error("table \"{}\" does not exist", table); |
| 36 | + } else { |
| 37 | + throw; |
| 38 | + } |
| 39 | + } |
| 40 | + return name_and_type; |
| 41 | +} |
| 42 | + |
| 43 | +static void ensure_schema(clickhouse::Client &client, const std::string &table, const std::vector<Column> &columns) |
| 44 | +{ |
| 45 | + // Check that the database has the necessary columns |
| 46 | + auto db_columns = describe_table(client, table); |
| 47 | + |
| 48 | + auto schema_hint = [&](){ |
| 49 | + std::stringstream ss; |
| 50 | + ss << "hint:\n"; |
| 51 | + ss << "CREATE TABLE " << table << "(\n"; |
| 52 | + size_t i = 0; |
| 53 | + for (const auto& column : columns) { |
| 54 | + const auto &clickhouse_type = type_to_clickhouse(columns[i].datatype, columns[i].nullable); |
| 55 | + ss << " \"" << column.name << "\" " << clickhouse_type << (i < columns.size()-1 ? "," : "") << '\n'; |
| 56 | + i++; |
| 57 | + } |
| 58 | + ss << ");"; |
| 59 | + return ss.str(); |
| 60 | + }; |
| 61 | + |
| 62 | + if (columns.size() != db_columns.size()) { |
| 63 | + throw Error("config has {} columns but table \"{}\" has {}\n{}", columns.size(), table, db_columns.size(), schema_hint()); |
| 64 | + } |
| 65 | + |
| 66 | + for (size_t i = 0; i < db_columns.size(); i++) { |
| 67 | + const auto &expected_name = columns[i].name; |
| 68 | + const auto &expected_type = type_to_clickhouse(columns[i].datatype, columns[i].nullable); |
| 69 | + const auto &[actual_name, actual_type] = db_columns[i]; |
| 70 | + |
| 71 | + if (expected_name != actual_name) { |
| 72 | + throw Error("expected column #{} in table \"{}\" to be named \"{}\" but it is \"{}\"\n{}", |
| 73 | + i, table, expected_name, actual_name, schema_hint()); |
| 74 | + } |
| 75 | + |
| 76 | + if (expected_type != actual_type) { |
| 77 | + throw Error("expected column #{} in table \"{}\" to be of type \"{}\" but it is \"{}\"\n{}", |
| 78 | + i, table, expected_type, actual_type, schema_hint()); |
| 79 | + } |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +Inserter::Inserter( |
| 84 | + int id, |
| 85 | + Logger logger, |
| 86 | + clickhouse::ClientOptions client_opts, |
| 87 | + std::string table_name, |
| 88 | + const std::vector<Column> &columns, |
| 89 | + SyncQueue<Block *> &input_blocks, |
| 90 | + SyncQueue<Block *> &avail_blocks) |
| 91 | + : m_id(id) |
| 92 | + , m_logger(logger) |
| 93 | + , m_client_opts(client_opts) |
| 94 | + , m_table_name(table_name) |
| 95 | + , m_columns(columns) |
| 96 | + , m_input_blocks(input_blocks) |
| 97 | + , m_avail_blocks(avail_blocks) |
| 98 | +{} |
| 99 | + |
| 100 | +bool Inserter::insert(clickhouse::Block &block) |
| 101 | +{ |
| 102 | + bool needs_reconnect = false; |
| 103 | + while (true) { |
| 104 | + if (stop_requested() && secs_since_stop_requested() > STOP_TIMEOUT_SECS) { |
| 105 | + return false; |
| 106 | + } |
| 107 | + |
| 108 | + try { |
| 109 | + if (needs_reconnect) { |
| 110 | + m_client->ResetConnectionEndpoint(); |
| 111 | + ensure_schema(*m_client.get(), m_table_name, m_columns); |
| 112 | + m_logger.warning("[Worker %d] Connected to %s:%d due to error with previous endpoint", m_id, |
| 113 | + m_client->GetCurrentEndpoint()->host.c_str(), m_client->GetCurrentEndpoint()->port); |
| 114 | + } |
| 115 | + m_logger.debug("[Worker %d] Inserting %d rows", m_id, block.GetRowCount()); |
| 116 | + m_client->Insert(m_table_name, block); |
| 117 | + break; |
| 118 | + |
| 119 | + } catch (const std::exception &ex) { |
| 120 | + m_logger.error("[Worker %d] Insert failed: %s - retrying in 1 second", m_id, ex.what()); |
| 121 | + needs_reconnect = true; |
| 122 | + } |
| 123 | + |
| 124 | + if (stop_requested() && secs_since_stop_requested() > STOP_TIMEOUT_SECS) { |
| 125 | + return false; |
| 126 | + } |
| 127 | + |
| 128 | + std::this_thread::sleep_for(std::chrono::seconds(1)); |
| 129 | + } |
| 130 | + |
| 131 | + return true; |
| 132 | +} |
| 133 | + |
| 134 | +void Inserter::run() |
| 135 | +{ |
| 136 | + m_client = std::make_unique<clickhouse::Client>(m_client_opts); |
| 137 | + ensure_schema(*m_client.get(), m_table_name, m_columns); |
| 138 | + m_logger.info("[Worker %d] Connected to %s:%d", m_id, |
| 139 | + m_client->GetCurrentEndpoint()->host.c_str(), m_client->GetCurrentEndpoint()->port); |
| 140 | + |
| 141 | + while (true) { |
| 142 | + Block *block = m_input_blocks.get(); |
| 143 | + if (!block) { |
| 144 | + // We might get null as a way to get unblocked and process stop signal. |
| 145 | + // Nulls are inserted only after all the valid blocks are queued. |
| 146 | + break; |
| 147 | + } |
| 148 | + |
| 149 | + if (stop_requested() && secs_since_stop_requested() > STOP_TIMEOUT_SECS) { |
| 150 | + break; |
| 151 | + } |
| 152 | + |
| 153 | + block->block.RefreshRowCount(); |
| 154 | + bool ok = insert(block->block); |
| 155 | + if (!ok) { |
| 156 | + // Do not clear the block as it could not have been inserted. |
| 157 | + // It will be used to count the number of dropped records. |
| 158 | + break; |
| 159 | + } |
| 160 | + |
| 161 | + for (auto &column : block->columns) { |
| 162 | + column->Clear(); |
| 163 | + } |
| 164 | + block->rows = 0; |
| 165 | + m_avail_blocks.put(block); |
| 166 | + } |
| 167 | +} |
0 commit comments