Skip to content

Commit ec557f3

Browse files
PS-11054 fix: cannot replicate because the source purged required binary logs (#118)
https://perconadev.atlassian.net/browse/PS-11054 It is now possible to start replication from MySQL servers that have non-empty purged GTID set ('@@global.gtid_purged'). Internally, if we identify that the specified storage is empty, we try to extract the set of GTIDs that were purged on the server side via 'SELECT '@@global.gtid_purged' and pass this info to the storage object. This helps to make sure that the very first binlog will have this purged GTID set stored in the 'previous_gtids' field of its metadata file (before this change, the 'previous_gtids' in the very first metadata file was always empty). Because of this change when we switch the connection to the replication mode we also pass this purged GTID set as the initial GTID state to the 'mysql_binlog_open()' client API call and don't get the 'Cannot replicate because the source purged required binary logs' errors anymore. 'easymysql::connection' class extended with the new 'execute_select_query_string_result()' method that can be used for executing single-value (single row + single column) queries returning a string value. 'binsrv::storage' class extended with new 'purged_gtids_' member that is used to store information about purged GTIDs identified when storage was initialized for the first time. Raised log message severity from 'info' to 'error' for some connection exceptions. Added new MTR test case 'binlog_streaming.gtid_purged' that checks if PBS can start replicating from a server that has the very first binlog file purged. Fixed README.md - the correct JSON filed name in the 'result' section of the query responses is 'previous_gtids' (not 'initial_gtids').
1 parent 8755dd8 commit ec557f3

File tree

9 files changed

+263
-14
lines changed

9 files changed

+263
-14
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ may print
197197
"uri": "s3://binsrv-bucket/storage/binlog.000001",
198198
"min_timestamp": "2026-02-09T17:22:01",
199199
"max_timestamp": "2026-02-09T17:22:08",
200-
"initial_gtids": "",
200+
"previous_gtids": "",
201201
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456"
202202
},
203203
{
@@ -206,7 +206,7 @@ may print
206206
"uri": "s3://binsrv-bucket/storage/binlog.000002",
207207
"min_timestamp": "2026-02-09T17:22:08",
208208
"max_timestamp": "2026-02-09T17:22:09",
209-
"initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
209+
"previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
210210
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912"
211211
}
212212
]
@@ -243,7 +243,7 @@ may print
243243
"uri": "s3://binsrv-bucket/storage/binlog.000001",
244244
"min_timestamp": "2026-02-09T17:22:01",
245245
"max_timestamp": "2026-02-09T17:22:08",
246-
"initial_gtids": "",
246+
"previous_gtids": "",
247247
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456"
248248
}
249249
]
@@ -264,7 +264,7 @@ may print
264264
"uri": "s3://binsrv-bucket/storage/binlog.000001",
265265
"min_timestamp": "2026-02-09T17:22:01",
266266
"max_timestamp": "2026-02-09T17:22:08",
267-
"initial_gtids": "",
267+
"previous_gtids": "",
268268
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
269269
},
270270
{
@@ -273,7 +273,7 @@ may print
273273
"uri": "s3://binsrv-bucket/storage/binlog.000002",
274274
"min_timestamp": "2026-02-09T17:22:08",
275275
"max_timestamp": "2026-02-09T17:22:09",
276-
"initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
276+
"previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
277277
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912"
278278
}
279279
]
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
*** Resetting replication at the very beginning of the test.
2+
3+
*** Determining the first binary log name.
4+
5+
*** Generating a configuration file in JSON format for the Binlog
6+
*** Server utility.
7+
8+
*** Determining binlog file directory from the server.
9+
10+
*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
11+
*** binlog files downloaded via the Binlog Server utility.
12+
13+
*** Creating a simple table.
14+
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
15+
16+
*** Filling the table with some data.
17+
INSERT INTO t1 VALUES();
18+
19+
*** Flushing the first binary log and switching to the second one.
20+
FLUSH BINARY LOGS;
21+
22+
*** Determining the second binary log name.
23+
24+
*** Filling the table with more data.
25+
INSERT INTO t1 VALUES();
26+
27+
*** Filling the table with more data.
28+
PURGE BINARY LOGS TO '<second_binlog>';
29+
30+
*** Executing the Binlog Server utility and fetching all events.
31+
32+
*** Executing the Binlog Server utility in the 'search_by_gtid_set'
33+
include/read_file_to_var.inc
34+
35+
*** Executing the Binlog Server utility one more time (fetching nothing)
36+
37+
*** Executing the Binlog Server utility in the 'search_by_gtid_set'
38+
one more time (expecting the same results)
39+
include/read_file_to_var.inc
40+
41+
*** Removing the search result file.
42+
43+
*** Dropping the table.
44+
DROP TABLE t1;
45+
46+
*** Removing the Binlog Server utility storage directory.
47+
48+
*** Removing the Binlog Server utility log file.
49+
50+
*** Removing the Binlog Server utility configuration file.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
--gtid-mode=on
2+
--enforce-gtid-consistency
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
--source ../include/have_binsrv.inc
2+
3+
--source ../include/v80_v84_compatibility_defines.inc
4+
5+
# in case of --repeat=N, we need to start from a fresh binary log to make
6+
# this test deterministic
7+
--echo *** Resetting replication at the very beginning of the test.
8+
--disable_query_log
9+
eval $stmt_reset_binary_logs_and_gtids;
10+
--enable_query_log
11+
12+
--echo
13+
--echo *** Determining the first binary log name.
14+
--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
15+
16+
# identifying backend storage type ('file' or 's3')
17+
--source ../include/identify_storage_backend.inc
18+
19+
# creating data directory, configuration file, etc.
20+
--let $binsrv_connect_timeout = 20
21+
--let $binsrv_read_timeout = 60
22+
--let $binsrv_idle_time = 10
23+
--let $binsrv_verify_checksum = TRUE
24+
--let $binsrv_replication_mode = gtid
25+
--let $binsrv_checkpoint_size = 1
26+
--source ../include/set_up_binsrv_environment.inc
27+
28+
--let $read_from_file = $MYSQL_TMP_DIR/search_result.json
29+
30+
--echo
31+
--echo *** Creating a simple table.
32+
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
33+
34+
--echo
35+
--echo *** Filling the table with some data.
36+
INSERT INTO t1 VALUES();
37+
38+
--echo
39+
--echo *** Flushing the first binary log and switching to the second one.
40+
FLUSH BINARY LOGS;
41+
42+
--echo
43+
--echo *** Determining the second binary log name.
44+
--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
45+
46+
--echo
47+
--echo *** Filling the table with more data.
48+
INSERT INTO t1 VALUES();
49+
50+
--echo
51+
--echo *** Filling the table with more data.
52+
--replace_result $second_binlog <second_binlog>
53+
eval PURGE BINARY LOGS TO '$second_binlog';
54+
55+
--let $captured_gtid_purged = `SELECT @@global.gtid_purged`
56+
--let $captured_second_insert_gtid = `SELECT GTID_SUBTRACT(@@global.gtid_executed, @@global.gtid_purged)`
57+
58+
--echo
59+
--echo *** Executing the Binlog Server utility and fetching all events.
60+
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null
61+
62+
--echo
63+
--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set'
64+
--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file
65+
66+
--source include/read_file_to_var.inc
67+
--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`)
68+
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`)
69+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`)
70+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`)
71+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`)
72+
73+
--echo
74+
--echo *** Executing the Binlog Server utility one more time (fetching nothing)
75+
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null
76+
77+
--echo
78+
--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set'
79+
--echo one more time (expecting the same results)
80+
--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file
81+
82+
--source include/read_file_to_var.inc
83+
--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`)
84+
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`)
85+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`)
86+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`)
87+
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`)
88+
89+
--echo
90+
--echo *** Removing the search result file.
91+
--remove_file $read_from_file
92+
93+
--echo
94+
--echo *** Dropping the table.
95+
DROP TABLE t1;
96+
97+
# cleaning up
98+
--source ../include/tear_down_binsrv_environment.inc

src/app.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ bool open_connection_and_switch_to_replication(
779779
if (operation_mode == binsrv::operation_mode_type::fetch) {
780780
throw;
781781
}
782-
logger.log(binsrv::log_severity::info,
782+
logger.log(binsrv::log_severity::error,
783783
"unable to establish connection to mysql server");
784784
return false;
785785
}
@@ -796,7 +796,20 @@ bool open_connection_and_switch_to_replication(
796796

797797
try {
798798
if (storage.is_in_gtid_replication_mode()) {
799-
const auto &gtids{storage.get_gtids()};
799+
if (storage.is_empty()) {
800+
static constexpr std::string_view select_gtid_purged_query{
801+
"SELECT @@GLOBAL.gtid_purged"};
802+
storage.set_purged_gtids(binsrv::gtids::gtid_set{
803+
connection.execute_select_query_string_result(
804+
select_gtid_purged_query)});
805+
logger.log(
806+
binsrv::log_severity::info,
807+
"extracted purged GTIDs from the mysql server for an empty "
808+
"storage: " +
809+
boost::lexical_cast<std::string>(storage.get_purged_gtids()));
810+
}
811+
812+
const auto gtids{storage.get_gtids()};
800813
const auto encoded_size{gtids.calculate_encoded_size()};
801814

802815
binsrv::gtids::gtid_set_storage encoded_gtids_buffer(encoded_size);
@@ -820,7 +833,7 @@ bool open_connection_and_switch_to_replication(
820833
if (operation_mode == binsrv::operation_mode_type::fetch) {
821834
throw;
822835
}
823-
logger.log(binsrv::log_severity::info, "unable to switch to replication");
836+
logger.log(binsrv::log_severity::error, "unable to switch to replication");
824837
return false;
825838
}
826839

src/binsrv/storage.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ storage::storage(const storage_config &config,
115115

116116
load_and_validate_binlog_metadata_set(storage_objects,
117117
storage_metadata_objects);
118+
assert(!binlog_records_.front().added_gtids.has_value() ||
119+
purged_gtids_ == binlog_records_.front().added_gtids);
118120
}
119121

120122
storage::~storage() {
@@ -127,6 +129,18 @@ storage::~storage() {
127129
}
128130
}
129131

132+
void storage::set_purged_gtids(const gtids::gtid_set &purged_gtids) {
133+
if (!is_in_gtid_replication_mode()) {
134+
util::exception_location().raise<std::logic_error>(
135+
"cannot set purged GTIDs in position-based replication mode");
136+
}
137+
if (!is_empty()) {
138+
util::exception_location().raise<std::logic_error>(
139+
"cannot set purged GTIDs in a non-empty storage");
140+
}
141+
purged_gtids_ = purged_gtids;
142+
}
143+
130144
[[nodiscard]] std::string storage::get_backend_description() const {
131145
return backend_->get_description();
132146
}
@@ -540,6 +554,13 @@ void storage::load_and_validate_binlog_metadata_set(
540554
util::exception_location().raise<std::logic_error>(
541555
"found metadata for a non-existing binlog");
542556
}
557+
558+
// if we are in GTID replication mode, then we can consider GTIDs from the
559+
// first binlog metadata as purged GTIDs for the whole storage
560+
const auto &optional_added_gtids{binlog_records_.front().added_gtids};
561+
if (optional_added_gtids.has_value()) {
562+
purged_gtids_ = *optional_added_gtids;
563+
}
543564
}
544565

545566
} // namespace binsrv

src/binsrv/storage.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ class [[nodiscard]] storage {
7070
// file to complete the rule of 5
7171
~storage();
7272

73+
[[nodiscard]] const gtids::gtid_set &get_purged_gtids() const noexcept {
74+
return purged_gtids_;
75+
}
76+
void set_purged_gtids(const gtids::gtid_set &purged_gtids);
77+
7378
[[nodiscard]] std::string get_backend_description() const;
7479

7580
[[nodiscard]] replication_mode_type get_replication_mode() const noexcept {
@@ -94,16 +99,19 @@ class [[nodiscard]] storage {
9499
}
95100

96101
[[nodiscard]] gtids::gtid_set get_gtids() const {
97-
gtids::gtid_set result{};
102+
if (!is_in_gtid_replication_mode()) {
103+
return {};
104+
}
105+
98106
if (is_empty()) {
99-
return result;
107+
return get_purged_gtids();
100108
}
109+
gtids::gtid_set result{};
101110
const auto &optional_previous_gtids{
102111
get_current_binlog_record().previous_gtids};
103-
if (!optional_previous_gtids.has_value()) {
104-
return result;
112+
if (optional_previous_gtids.has_value()) {
113+
result = *optional_previous_gtids;
105114
}
106-
result = *optional_previous_gtids;
107115
const auto &optional_added_gtids{get_current_binlog_record().added_gtids};
108116
if (optional_added_gtids.has_value()) {
109117
result.add(*optional_added_gtids);
@@ -133,6 +141,7 @@ class [[nodiscard]] storage {
133141

134142
replication_mode_type replication_mode_;
135143
composite_binlog_name binlog_name_sentinel_{};
144+
gtids::gtid_set purged_gtids_{};
136145
binlog_record_container binlog_records_{};
137146

138147
std::uint64_t checkpoint_size_bytes_{0ULL};

src/easymysql/connection.cpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,64 @@ void connection::execute_generic_query_noresult(std::string_view query) {
429429

430430
auto *casted_impl = mysql_deimpl::get(mysql_impl_);
431431
if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) {
432-
raise_core_error_from_connection("cannot execute query", *this);
432+
raise_core_error_from_connection("cannot execute noresult query", *this);
433433
}
434434
}
435435

436+
[[nodiscard]] std::string
437+
connection::execute_select_query_string_result(std::string_view query) {
438+
assert(!is_empty());
439+
if (is_in_replication_mode()) {
440+
util::exception_location().raise<std::logic_error>(
441+
"cannot execute query in replication mode");
442+
}
443+
444+
auto *casted_impl = mysql_deimpl::get(mysql_impl_);
445+
if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) {
446+
raise_core_error_from_connection("cannot execute string result query",
447+
*this);
448+
}
449+
450+
const auto mysql_res_deleter = [](MYSQL_RES *result_raw) {
451+
if (result_raw != nullptr) {
452+
mysql_free_result(result_raw);
453+
}
454+
};
455+
using mysql_res_ptr = std::unique_ptr<MYSQL_RES, decltype(mysql_res_deleter)>;
456+
457+
const mysql_res_ptr result{mysql_store_result(casted_impl),
458+
mysql_res_deleter};
459+
if (!result) {
460+
raise_core_error_from_connection("cannot store query result", *this);
461+
}
462+
if (mysql_num_rows(result.get()) != 1U) {
463+
raise_core_error_from_connection("query did not return exactly one row",
464+
*this);
465+
}
466+
467+
static constexpr std::size_t expected_num_fields{1U};
468+
if (mysql_num_fields(result.get()) != expected_num_fields) {
469+
raise_core_error_from_connection("query did not return exactly one column",
470+
*this);
471+
}
472+
473+
MYSQL_ROW row_raw{mysql_fetch_row(result.get())};
474+
assert(row_raw != nullptr);
475+
476+
const std::span<const char *const, expected_num_fields> row{
477+
row_raw, expected_num_fields};
478+
if (row.front() == nullptr) {
479+
raise_core_error_from_connection("query returned NULL value", *this);
480+
}
481+
482+
const auto *const lengths_raw{mysql_fetch_lengths(result.get())};
483+
assert(lengths_raw != nullptr);
484+
const std::span<const unsigned long, expected_num_fields> lengths{
485+
lengths_raw, expected_num_fields};
486+
487+
return std::string{row.front(), lengths.front()};
488+
}
489+
436490
bool connection::ping() {
437491
assert(!is_empty());
438492
if (is_in_replication_mode()) {

src/easymysql/connection.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class [[nodiscard]] connection {
5858
[[nodiscard]] std::string_view get_character_set_name() const noexcept;
5959

6060
void execute_generic_query_noresult(std::string_view query);
61+
[[nodiscard]] std::string
62+
execute_select_query_string_result(std::string_view query);
6163
[[nodiscard]] bool ping();
6264

6365
[[nodiscard]] bool is_in_replication_mode() const noexcept {

0 commit comments

Comments
 (0)