Skip to content

Commit 2b6475b

Browse files
committed
feat: Improve binlog parsing and config value formatting
This commit enhances binlog replication reliability and fixes config value formatting issues for required filters. **Binlog Reader Improvements:** - Fix binlog event parsing by skipping MySQL C API OK packet byte (0x00) - Add column name fetching from SHOW COLUMNS with per-table caching - Improve MySQL 8.0 ROWS_EVENT_V2 support for extra_row_info handling - Fix UPDATE_ROWS event parsing with proper boundary checks - Add comprehensive debug logging for binlog event parsing **Config Value Formatting:** - Fix integer values to exclude decimal points (e.g., "1" not "1.000000") - Fix float values to include decimal points (e.g., "99.99") - Properly handle boolean-like tinyint values **Snapshot Builder:** - Add GTID validation to prevent replication from undefined positions - Provide clear error messages when GTID is empty **Testing:** - Add required_filters_formatting_test.cpp for config value formatting - Add binlog_parsing_test.cpp for binlog event parsing edge cases
1 parent 61f851a commit 2b6475b

File tree

10 files changed

+828
-25
lines changed

10 files changed

+828
-25
lines changed

src/config/config.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ RequiredFilterConfig ParseRequiredFilterConfig(const json& json_obj) {
120120
// value can be string or number, convert to string
121121
if (json_obj["value"].is_string()) {
122122
config.value = json_obj["value"].get<std::string>();
123-
} else if (json_obj["value"].is_number()) {
123+
} else if (json_obj["value"].is_number_integer()) {
124+
// Integer types: format without decimal point
125+
config.value = std::to_string(json_obj["value"].get<int64_t>());
126+
} else if (json_obj["value"].is_number_float()) {
127+
// Floating point types: format with decimal point
124128
config.value = std::to_string(json_obj["value"].get<double>());
125129
} else if (json_obj["value"].is_boolean()) {
126130
config.value = json_obj["value"].get<bool>() ? "1" : "0";

src/mysql/binlog_reader.cpp

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -775,11 +775,16 @@ void BinlogReader::WriteGTIDToStateFile(const std::string& gtid) const {
775775
}
776776

777777
std::optional<BinlogEvent> BinlogReader::ParseBinlogEvent(const unsigned char* buffer, unsigned long length) {
778-
if ((buffer == nullptr) || length < 19) {
779-
// Minimum event size is 19 bytes (header)
778+
if ((buffer == nullptr) || length < 20) {
779+
// Minimum event size is 20 bytes (1 byte OK packet + 19 bytes binlog header)
780780
return std::nullopt;
781781
}
782782

783+
// MySQL C API prepends an OK packet byte (0x00) before the actual binlog event
784+
// Skip the OK byte to get to the actual binlog event data
785+
buffer++;
786+
length--;
787+
783788
// Binlog event header format (19 bytes):
784789
// timestamp (4 bytes)
785790
// event_type (1 byte)
@@ -791,7 +796,7 @@ std::optional<BinlogEvent> BinlogReader::ParseBinlogEvent(const unsigned char* b
791796
auto event_type = static_cast<MySQLBinlogEventType>(buffer[4]);
792797

793798
// Log event type for debugging
794-
spdlog::debug("Received binlog event: {}", GetEventTypeName(event_type));
799+
spdlog::debug("Received binlog event: {} (type={})", GetEventTypeName(event_type), static_cast<int>(buffer[4]));
795800

796801
// Handle different event types
797802
switch (event_type) {
@@ -811,6 +816,12 @@ std::optional<BinlogEvent> BinlogReader::ParseBinlogEvent(const unsigned char* b
811816
{
812817
auto metadata_opt = ParseTableMapEvent(buffer, length);
813818
if (metadata_opt) {
819+
// Fetch actual column names from SHOW COLUMNS (cached per table)
820+
// Binlog TABLE_MAP events don't include column names, only types
821+
if (!FetchColumnNames(metadata_opt.value())) {
822+
spdlog::warn("Failed to fetch column names for {}.{}, using col_N placeholders",
823+
metadata_opt->database_name, metadata_opt->table_name);
824+
}
814825
table_metadata_cache_.Add(metadata_opt->table_id, metadata_opt.value());
815826
spdlog::debug("Cached TABLE_MAP: {}.{} (table_id={})", metadata_opt->database_name, metadata_opt->table_name,
816827
metadata_opt->table_id);
@@ -1264,6 +1275,73 @@ std::optional<TableMetadata> BinlogReader::ParseTableMapEvent(const unsigned cha
12641275
return metadata;
12651276
}
12661277

1278+
bool BinlogReader::FetchColumnNames(TableMetadata& metadata) {
1279+
std::string cache_key = metadata.database_name + "." + metadata.table_name;
1280+
1281+
// Check cache first
1282+
{
1283+
std::lock_guard<std::mutex> lock(column_names_cache_mutex_);
1284+
auto cache_it = column_names_cache_.find(cache_key);
1285+
if (cache_it != column_names_cache_.end()) {
1286+
// Cache hit: update column names from cache
1287+
const auto& column_names = cache_it->second;
1288+
if (column_names.size() == metadata.columns.size()) {
1289+
for (size_t i = 0; i < metadata.columns.size(); i++) {
1290+
metadata.columns[i].name = column_names[i];
1291+
}
1292+
spdlog::debug("Column names for {}.{} loaded from cache", metadata.database_name, metadata.table_name);
1293+
return true;
1294+
}
1295+
// Cache mismatch (column count changed?), fall through to query
1296+
spdlog::warn("Cached column names for {}.{} have mismatched count (cached={}, current={})",
1297+
metadata.database_name, metadata.table_name, column_names.size(), metadata.columns.size());
1298+
column_names_cache_.erase(cache_it); // Remove stale cache entry
1299+
}
1300+
}
1301+
1302+
// Cache miss or stale: use SHOW COLUMNS (faster than INFORMATION_SCHEMA)
1303+
std::string query = "SHOW COLUMNS FROM `" + metadata.database_name + "`.`" + metadata.table_name + "`";
1304+
1305+
MYSQL_RES* result = connection_.Execute(query);
1306+
if (result == nullptr) {
1307+
spdlog::error("Failed to query column names for {}.{}: {}", metadata.database_name, metadata.table_name,
1308+
connection_.GetLastError());
1309+
return false;
1310+
}
1311+
1312+
std::vector<std::string> column_names;
1313+
column_names.reserve(metadata.columns.size());
1314+
1315+
MYSQL_ROW row = nullptr;
1316+
while ((row = mysql_fetch_row(result)) != nullptr) {
1317+
column_names.emplace_back(row[0]);
1318+
}
1319+
1320+
mysql_free_result(result);
1321+
1322+
if (column_names.size() != metadata.columns.size()) {
1323+
spdlog::error("Column count mismatch for {}.{}: SHOW COLUMNS returned {}, binlog has {}", metadata.database_name,
1324+
metadata.table_name, column_names.size(), metadata.columns.size());
1325+
return false;
1326+
}
1327+
1328+
// Update metadata with actual column names
1329+
for (size_t i = 0; i < metadata.columns.size(); i++) {
1330+
metadata.columns[i].name = column_names[i];
1331+
}
1332+
1333+
// Store in cache
1334+
{
1335+
std::lock_guard<std::mutex> lock(column_names_cache_mutex_);
1336+
column_names_cache_[cache_key] = std::move(column_names);
1337+
}
1338+
1339+
spdlog::info("Fetched {} column names for {}.{} from SHOW COLUMNS", metadata.columns.size(), metadata.database_name,
1340+
metadata.table_name);
1341+
1342+
return true;
1343+
}
1344+
12671345
void BinlogReader::FixGtidSetCallback(MYSQL_RPL* rpl, unsigned char* packet_gtid_set) {
12681346
// Copy pre-encoded GTID data into the packet buffer
12691347
auto* encoded_data = static_cast<std::vector<uint8_t>*>(rpl->gtid_set_arg);

src/mysql/binlog_reader.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ class BinlogReader {
177177
// Table metadata cache
178178
TableMetadataCache table_metadata_cache_;
179179

180+
// Column names cache: key = "database.table", value = vector of column names in order
181+
std::unordered_map<std::string, std::vector<std::string>> column_names_cache_;
182+
mutable std::mutex column_names_cache_mutex_;
183+
180184
// GTID encoding data (must persist during mysql_binlog_open call)
181185
std::vector<uint8_t> gtid_encoded_data_;
182186

@@ -229,6 +233,13 @@ class BinlogReader {
229233
*/
230234
static bool CompareFilterValue(const storage::FilterValue& value, const config::RequiredFilterConfig& filter);
231235

236+
/**
237+
* @brief Fetch column names from INFORMATION_SCHEMA and update TableMetadata
238+
* @param metadata Table metadata to update with actual column names
239+
* @return true if successful, false otherwise
240+
*/
241+
bool FetchColumnNames(TableMetadata& metadata);
242+
232243
/**
233244
* @brief Extract all filter columns (both required and optional) from row data
234245
* @param row_data Row data from binlog

0 commit comments

Comments
 (0)