Skip to content

Commit fea51d7

Browse files
PS-10245 feature: Implement receiving binlog events in GTID mode (part 2) (#82)
https://perconadev.atlassian.net/browse/PS-10245 Refactored 'easymysql::connection::switch_to_replication()' method. We now have 3 different methods: - one 'switch_to_gtid_replication()' for GTID mode - two 'switch_to_position_replication()' for position-based mode (one for starting from the very beginning and another one when binlog name / position are known). In the main application we now print more info in 'log_replication_info()' depending on the mode (GTID or position-mased).
1 parent aead96c commit fea51d7

4 files changed

Lines changed: 151 additions & 52 deletions

File tree

src/app.cpp

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@
5151
#include "binsrv/storage_backend_type.hpp" // IWYU pragma: keep
5252
#include "binsrv/time_unit.hpp"
5353

54+
#include "binsrv/gtids/common_types.hpp"
55+
#include "binsrv/gtids/gtid_set.hpp"
56+
5457
#include "binsrv/event/code_type.hpp"
5558
#include "binsrv/event/common_header_flag_type.hpp"
5659
#include "binsrv/event/event.hpp"
57-
#include "binsrv/event/protocol_traits_fwd.hpp"
5860
#include "binsrv/event/reader_context.hpp"
5961

6062
#include "easymysql/connection.hpp"
@@ -276,19 +278,42 @@ void log_connection_info(binsrv::basic_logger &logger,
276278

277279
void log_replication_info(
278280
binsrv::basic_logger &logger, std::uint32_t server_id,
279-
std::string_view current_binlog_name, std::uint64_t current_binlog_position,
281+
const binsrv::storage &storage, bool verify_checksum,
280282
easymysql::connection_replication_mode_type blocking_mode) {
281-
std::string msg{};
283+
const auto replication_mode{storage.get_replication_mode()};
284+
285+
std::string msg{"switched to replication (checksum "};
286+
msg += (verify_checksum ? "enabled" : "disabled");
287+
msg += ", ";
288+
msg += boost::lexical_cast<std::string>(replication_mode);
289+
msg += +" mode)";
290+
logger.log(binsrv::log_severity::info, msg);
291+
282292
msg = "replication info (server id ";
283293
msg += std::to_string(server_id);
284294
msg += ", ";
285-
msg += (current_binlog_name.empty() ? "<empty>" : current_binlog_name);
286-
msg += ":";
287-
msg += std::to_string(current_binlog_position);
288-
msg += ", ";
289295
msg += (blocking_mode == easymysql::connection_replication_mode_type::blocking
290296
? "blocking"
291297
: "non-blocking");
298+
msg += ", starting from ";
299+
if (replication_mode == binsrv::replication_mode_type::position) {
300+
if (storage.has_current_binlog_name()) {
301+
msg += "the very beginning";
302+
} else {
303+
msg += storage.get_current_binlog_name();
304+
msg += ":";
305+
msg += std::to_string(storage.get_current_position());
306+
}
307+
} else {
308+
const auto &gtids{storage.get_gtids()};
309+
if (gtids.is_empty()) {
310+
msg += "an empty";
311+
} else {
312+
msg += "the ";
313+
msg += boost::lexical_cast<std::string>(gtids);
314+
}
315+
msg += " GTID set";
316+
}
292317
msg += ")";
293318
logger.log(binsrv::log_severity::info, msg);
294319
}
@@ -384,8 +409,7 @@ void receive_binlog_events(
384409
const volatile std::atomic_flag &termination_flag,
385410
binsrv::basic_logger &logger, const easymysql::library &mysql_lib,
386411
const easymysql::connection_config &connection_config,
387-
std::uint32_t server_id, bool verify_checksum,
388-
binsrv::replication_mode_type replication_mode, binsrv::storage &storage) {
412+
std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) {
389413
easymysql::connection connection{};
390414
try {
391415
connection = mysql_lib.create_connection(connection_config);
@@ -403,23 +427,34 @@ void receive_binlog_events(
403427

404428
log_connection_info(logger, connection);
405429

406-
const auto current_binlog_name{storage.get_current_binlog_name()};
407-
// if storage binlog name is detected to be empty (empty data directory), we
408-
// start streaming from the position 'magic_binlog_offset' (4)
409-
const auto current_binlog_position{current_binlog_name.empty()
410-
? binsrv::event::magic_binlog_offset
411-
: storage.get_current_position()};
412-
430+
const auto replication_mode{storage.get_replication_mode()};
413431
const auto blocking_mode{
414432
operation_mode == binsrv::operation_mode_type::fetch
415433
? easymysql::connection_replication_mode_type::non_blocking
416434
: easymysql::connection_replication_mode_type::blocking};
417435

418436
try {
419-
connection.switch_to_replication(
420-
server_id, current_binlog_name, current_binlog_position,
421-
verify_checksum,
422-
replication_mode == binsrv::replication_mode_type::gtid, blocking_mode);
437+
if (replication_mode == binsrv::replication_mode_type::position) {
438+
if (storage.has_current_binlog_name()) {
439+
connection.switch_to_position_replication(
440+
server_id, storage.get_current_binlog_name(),
441+
storage.get_current_position(), verify_checksum, blocking_mode);
442+
} else {
443+
connection.switch_to_position_replication(server_id, verify_checksum,
444+
blocking_mode);
445+
}
446+
} else {
447+
const binsrv::gtids::gtid_set empty_gtids{};
448+
const auto encoded_size{empty_gtids.calculate_encoded_size()};
449+
450+
binsrv::gtids::gtid_set_storage buffer(encoded_size);
451+
util::byte_span destination{buffer};
452+
empty_gtids.encode_to(destination);
453+
454+
connection.switch_to_gtid_replication(server_id,
455+
util::const_byte_span{buffer},
456+
verify_checksum, blocking_mode);
457+
}
423458
} catch (const easymysql::core_error &) {
424459
if (operation_mode == binsrv::operation_mode_type::fetch) {
425460
throw;
@@ -428,13 +463,8 @@ void receive_binlog_events(
428463
return;
429464
}
430465

431-
logger.log(binsrv::log_severity::info,
432-
std::string{"switched to replication (checksum "} +
433-
(verify_checksum ? "enabled" : "disabled") + ", " +
434-
boost::lexical_cast<std::string>(replication_mode) + "mode)");
435-
436-
log_replication_info(logger, server_id, current_binlog_name,
437-
current_binlog_position, blocking_mode);
466+
log_replication_info(logger, server_id, storage, verify_checksum,
467+
blocking_mode);
438468

439469
// Network streams are requested with COM_BINLOG_DUMP and
440470
// each Binlog Event response is prepended with 00 OK-byte.
@@ -638,7 +668,7 @@ int main(int argc, char *argv[]) {
638668

639669
receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib,
640670
connection_config, server_id, verify_checksum,
641-
replication_mode, storage);
671+
storage);
642672

643673
if (operation_mode == binsrv::operation_mode_type::pull) {
644674
std::size_t iteration_number{1U};
@@ -659,7 +689,7 @@ int main(int argc, char *argv[]) {
659689

660690
receive_binlog_events(operation_mode, termination_flag, *logger,
661691
mysql_lib, connection_config, server_id,
662-
verify_checksum, replication_mode, storage);
692+
verify_checksum, storage);
663693
++iteration_number;
664694
}
665695
}

src/binsrv/storage.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "binsrv/replication_mode_type_fwd.hpp"
2828
#include "binsrv/storage_config_fwd.hpp"
2929

30+
#include "binsrv/gtids/gtid_set.hpp"
31+
3032
#include "util/byte_span_fwd.hpp"
3133

3234
namespace binsrv {
@@ -65,6 +67,10 @@ class [[nodiscard]] storage {
6567
return position_;
6668
}
6769

70+
[[nodiscard]] const gtids::gtid_set &get_gtids() const noexcept {
71+
return gtids_;
72+
}
73+
6874
[[nodiscard]] static bool
6975
check_binlog_name(std::string_view binlog_name) noexcept;
7076

@@ -81,6 +87,8 @@ class [[nodiscard]] storage {
8187
binlog_name_container binlog_names_;
8288
std::uint64_t position_{0ULL};
8389

90+
gtids::gtid_set gtids_{};
91+
8492
std::uint64_t checkpoint_size_bytes_{0ULL};
8593
std::uint64_t last_checkpoint_position_{0ULL};
8694

src/easymysql/connection.cpp

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <cassert>
1919
#include <concepts>
20+
#include <cstddef>
2021
#include <cstdint>
2122
#include <memory>
2223
#include <optional>
@@ -113,6 +114,10 @@ void connection::mysql_deleter::operator()(void *ptr) const noexcept {
113114

114115
class connection::rpl_impl {
115116
public:
117+
// https://github.com/mysql/mysql-server/blob/mysql-8.0.43/sql/log_event.h#L216
118+
// https://github.com/mysql/mysql-server/blob/mysql-8.4.6/sql/log_event.h#L214
119+
static constexpr std::uint64_t default_binlog_position{4U};
120+
116121
rpl_impl(connection &conn, std::uint32_t server_id,
117122
std::string_view file_name, std::uint64_t position,
118123
connection_replication_mode_type blocking_mode)
@@ -121,14 +126,39 @@ class connection::rpl_impl {
121126
.file_name = std::data(file_name),
122127
.start_position = position,
123128
.server_id = server_id,
124-
.flags = get_rpl_flags(blocking_mode),
129+
.flags = get_rpl_flags(false, blocking_mode),
125130
.gtid_set_encoded_size = 0U,
126131
.fix_gtid_set = nullptr,
127132
.gtid_set_arg = nullptr,
128133
.size = 0U,
129134
.buffer = nullptr} {
130135
if (mysql_binlog_open(conn_, &rpl_) != 0) {
131-
raise_core_error_from_connection("cannot open binary log", conn);
136+
raise_core_error_from_connection(
137+
"cannot open binary log in position mode", conn);
138+
}
139+
}
140+
rpl_impl(connection &conn, std::uint32_t server_id,
141+
util::const_byte_span encoded_gtid_set,
142+
connection_replication_mode_type blocking_mode)
143+
: conn_{mysql_deimpl::get(conn.mysql_impl_)},
144+
rpl_{.file_name_length = 0U,
145+
.file_name = nullptr,
146+
.start_position = default_binlog_position,
147+
.server_id = server_id,
148+
.flags = get_rpl_flags(true, blocking_mode),
149+
.gtid_set_encoded_size = std::size(encoded_gtid_set),
150+
.fix_gtid_set = nullptr,
151+
// it is OK to use const_cast here as MySQL client uses this
152+
// pointer only for reading (checked on MySQL 8.4.6)
153+
// NOLINTBEGIN(cppcoreguidelines-pro-type-const-cast)
154+
.gtid_set_arg =
155+
const_cast<std::byte *>(std::data(encoded_gtid_set)),
156+
// NOLINTEND(cppcoreguidelines-pro-type-const-cast)
157+
.size = 0U,
158+
.buffer = nullptr} {
159+
if (mysql_binlog_open(conn_, &rpl_) != 0) {
160+
raise_core_error_from_connection("cannot open binary log in GTID mode",
161+
conn);
132162
}
133163
}
134164

@@ -170,8 +200,9 @@ class connection::rpl_impl {
170200
// locally
171201
static constexpr unsigned int private_binlog_dump_non_block{1U};
172202
[[nodiscard]] static constexpr unsigned int
173-
get_rpl_flags(connection_replication_mode_type blocking_mode) noexcept {
174-
return MYSQL_RPL_SKIP_HEARTBEAT |
203+
get_rpl_flags(bool gtid_mode,
204+
connection_replication_mode_type blocking_mode) noexcept {
205+
return MYSQL_RPL_SKIP_HEARTBEAT | (gtid_mode ? MYSQL_RPL_GTID : 0U) |
175206
(blocking_mode == connection_replication_mode_type::non_blocking
176207
? private_binlog_dump_non_block
177208
: 0U);
@@ -180,6 +211,17 @@ class connection::rpl_impl {
180211

181212
connection::connection() noexcept = default;
182213

214+
void connection::set_binlog_checksum(bool verify_checksum) {
215+
// WL#2540: Replication event checksums
216+
// https://dev.mysql.com/worklog/task/?id=2540
217+
const std::string checksum_algorithm_label{verify_checksum ? "CRC32"
218+
: "NONE"};
219+
const auto set_binlog_checksum_query{
220+
"SET @source_binlog_checksum = '" + checksum_algorithm_label +
221+
"', @master_binlog_checksum = '" + checksum_algorithm_label + "'"};
222+
execute_generic_query_noresult(set_binlog_checksum_query);
223+
}
224+
183225
void connection::process_ssl_config(const ssl_config &config) {
184226
auto *casted_impl = mysql_deimpl::get(mysql_impl_);
185227

@@ -402,30 +444,39 @@ bool connection::ping() {
402444
return mysql_ping(casted_impl) == 0;
403445
}
404446

405-
void connection::switch_to_replication(
447+
void connection::switch_to_position_replication(
406448
std::uint32_t server_id, std::string_view file_name, std::uint64_t position,
407-
bool verify_checksum, bool gtid_mode,
408-
connection_replication_mode_type blocking_mode) {
449+
bool verify_checksum, connection_replication_mode_type blocking_mode) {
409450
assert(!is_empty());
410-
if (gtid_mode) {
451+
if (is_in_replication_mode()) {
411452
util::exception_location().raise<std::logic_error>(
412-
"switching to GTID replication is not yet implemented");
453+
"connection has already been swithed to replication");
413454
}
455+
456+
set_binlog_checksum(verify_checksum);
457+
rpl_impl_ = std::make_unique<rpl_impl>(*this, server_id, file_name, position,
458+
blocking_mode);
459+
}
460+
461+
void connection::switch_to_position_replication(
462+
std::uint32_t server_id, bool verify_checksum,
463+
connection_replication_mode_type blocking_mode) {
464+
switch_to_position_replication(server_id, {},
465+
rpl_impl::default_binlog_position,
466+
verify_checksum, blocking_mode);
467+
}
468+
469+
void connection::switch_to_gtid_replication(
470+
std::uint32_t server_id, util::const_byte_span encoded_gtid_set,
471+
bool verify_checksum, connection_replication_mode_type blocking_mode) {
472+
assert(!is_empty());
414473
if (is_in_replication_mode()) {
415474
util::exception_location().raise<std::logic_error>(
416475
"connection has already been swithed to replication");
417476
}
418477

419-
// WL#2540: Replication event checksums
420-
// https://dev.mysql.com/worklog/task/?id=2540
421-
const std::string checksum_algorithm_label{verify_checksum ? "CRC32"
422-
: "NONE"};
423-
const auto set_binlog_checksum_query{
424-
"SET @source_binlog_checksum = '" + checksum_algorithm_label +
425-
"', @master_binlog_checksum = '" + checksum_algorithm_label + "'"};
426-
execute_generic_query_noresult(set_binlog_checksum_query);
427-
428-
rpl_impl_ = std::make_unique<rpl_impl>(*this, server_id, file_name, position,
478+
set_binlog_checksum(verify_checksum);
479+
rpl_impl_ = std::make_unique<rpl_impl>(*this, server_id, encoded_gtid_set,
429480
blocking_mode);
430481
}
431482

src/easymysql/connection.hpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,18 @@ class [[nodiscard]] connection {
6464
return static_cast<bool>(rpl_impl_);
6565
}
6666

67-
void switch_to_replication(std::uint32_t server_id,
68-
std::string_view file_name, std::uint64_t position,
69-
bool verify_checksum, bool gtid_mode,
70-
connection_replication_mode_type blocking_mode);
67+
void switch_to_position_replication(
68+
std::uint32_t server_id, std::string_view file_name,
69+
std::uint64_t position, bool verify_checksum,
70+
connection_replication_mode_type blocking_mode);
71+
// a simplified version for starting from the very beginning
72+
void switch_to_position_replication(
73+
std::uint32_t server_id, bool verify_checksum,
74+
connection_replication_mode_type blocking_mode);
75+
76+
void switch_to_gtid_replication(
77+
std::uint32_t server_id, util::const_byte_span encoded_gtid_set,
78+
bool verify_checksum, connection_replication_mode_type blocking_mode);
7179

7280
// returns false on 'connection closed' / 'timeout'
7381
// returns true and sets 'portion' to en empty span on EOF (last event read)
@@ -76,6 +84,8 @@ class [[nodiscard]] connection {
7684
[[nodiscard]] bool fetch_binlog_event(util::const_byte_span &portion);
7785

7886
private:
87+
void set_binlog_checksum(bool verify_checksum);
88+
7989
void process_ssl_config(const ssl_config &config);
8090
void process_tls_config(const tls_config &config);
8191
void process_connection_config(const connection_config &config);

0 commit comments

Comments
 (0)