Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ may print
"uri": "s3://binsrv-bucket/storage/binlog.000001",
"min_timestamp": "2026-02-09T17:22:01",
"max_timestamp": "2026-02-09T17:22:08",
"initial_gtids": "",
"previous_gtids": "",
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456"
},
{
Expand All @@ -206,7 +206,7 @@ may print
"uri": "s3://binsrv-bucket/storage/binlog.000002",
"min_timestamp": "2026-02-09T17:22:08",
"max_timestamp": "2026-02-09T17:22:09",
"initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
"previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912"
}
]
Expand Down Expand Up @@ -243,7 +243,7 @@ may print
"uri": "s3://binsrv-bucket/storage/binlog.000001",
"min_timestamp": "2026-02-09T17:22:01",
"max_timestamp": "2026-02-09T17:22:08",
"initial_gtids": "",
"previous_gtids": "",
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456"
}
]
Expand All @@ -264,7 +264,7 @@ may print
"uri": "s3://binsrv-bucket/storage/binlog.000001",
"min_timestamp": "2026-02-09T17:22:01",
"max_timestamp": "2026-02-09T17:22:08",
"initial_gtids": "",
"previous_gtids": "",
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
},
{
Expand All @@ -273,7 +273,7 @@ may print
"uri": "s3://binsrv-bucket/storage/binlog.000002",
"min_timestamp": "2026-02-09T17:22:08",
"max_timestamp": "2026-02-09T17:22:09",
"initial_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
"previous_gtids": "11111111-aaaa-1111-aaaa-111111111111:1-123456",
"added_gtids": "11111111-aaaa-1111-aaaa-111111111111:123457-246912"
}
]
Expand Down
50 changes: 50 additions & 0 deletions mtr/binlog_streaming/r/gtid_purged.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
*** Resetting replication at the very beginning of the test.

*** Determining the first binary log name.

*** Generating a configuration file in JSON format for the Binlog
*** Server utility.

*** Determining binlog file directory from the server.

*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
*** binlog files downloaded via the Binlog Server utility.

*** Creating a simple table.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;

*** Filling the table with some data.
INSERT INTO t1 VALUES();

*** Flushing the first binary log and switching to the second one.
FLUSH BINARY LOGS;

*** Determining the second binary log name.

*** Filling the table with more data.
INSERT INTO t1 VALUES();

*** Filling the table with more data.
PURGE BINARY LOGS TO '<second_binlog>';

*** Executing the Binlog Server utility and fetching all events.

*** Executing the Binlog Server utility in the 'search_by_gtid_set'
include/read_file_to_var.inc

*** Executing the Binlog Server utility one more time (fetching nothing)

*** Executing the Binlog Server utility in the 'search_by_gtid_set'
one more time (expecting the same results)
include/read_file_to_var.inc

*** Removing the search result file.

*** Dropping the table.
DROP TABLE t1;

*** Removing the Binlog Server utility storage directory.

*** Removing the Binlog Server utility log file.

*** Removing the Binlog Server utility configuration file.
2 changes: 2 additions & 0 deletions mtr/binlog_streaming/t/gtid_purged-master.opt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--gtid-mode=on
--enforce-gtid-consistency
98 changes: 98 additions & 0 deletions mtr/binlog_streaming/t/gtid_purged.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
--source ../include/have_binsrv.inc

--source ../include/v80_v84_compatibility_defines.inc

# in case of --repeat=N, we need to start from a fresh binary log to make
# this test deterministic
--echo *** Resetting replication at the very beginning of the test.
--disable_query_log
eval $stmt_reset_binary_logs_and_gtids;
--enable_query_log

--echo
--echo *** Determining the first binary log name.
--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1)

# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

# creating data directory, configuration file, etc.
--let $binsrv_connect_timeout = 20
--let $binsrv_read_timeout = 60
--let $binsrv_idle_time = 10
--let $binsrv_verify_checksum = TRUE
--let $binsrv_replication_mode = gtid
--let $binsrv_checkpoint_size = 1
--source ../include/set_up_binsrv_environment.inc

--let $read_from_file = $MYSQL_TMP_DIR/search_result.json

--echo
--echo *** Creating a simple table.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;

--echo
--echo *** Filling the table with some data.
INSERT INTO t1 VALUES();

--echo
--echo *** Flushing the first binary log and switching to the second one.
FLUSH BINARY LOGS;

--echo
--echo *** Determining the second binary log name.
--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1)

--echo
--echo *** Filling the table with more data.
INSERT INTO t1 VALUES();

--echo
--echo *** Filling the table with more data.
--replace_result $second_binlog <second_binlog>
eval PURGE BINARY LOGS TO '$second_binlog';

--let $captured_gtid_purged = `SELECT @@global.gtid_purged`
--let $captured_second_insert_gtid = `SELECT GTID_SUBTRACT(@@global.gtid_executed, @@global.gtid_purged)`

--echo
--echo *** Executing the Binlog Server utility and fetching all events.
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null

--echo
--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set'
--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file

--source include/read_file_to_var.inc
--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`)
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`)

--echo
--echo *** Executing the Binlog Server utility one more time (fetching nothing)
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null

--echo
--echo *** Executing the Binlog Server utility in the 'search_by_gtid_set'
--echo one more time (expecting the same results)
--exec $BINSRV search_by_gtid_set $binsrv_config_file_path $captured_second_insert_gtid > $read_from_file

--source include/read_file_to_var.inc
--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`)
--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$captured_gtid_purged'`)
--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$captured_second_insert_gtid'`)

--echo
--echo *** Removing the search result file.
--remove_file $read_from_file

--echo
--echo *** Dropping the table.
DROP TABLE t1;

# cleaning up
--source ../include/tear_down_binsrv_environment.inc
19 changes: 16 additions & 3 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ bool open_connection_and_switch_to_replication(
if (operation_mode == binsrv::operation_mode_type::fetch) {
throw;
}
logger.log(binsrv::log_severity::info,
logger.log(binsrv::log_severity::error,
"unable to establish connection to mysql server");
return false;
}
Expand All @@ -796,7 +796,20 @@ bool open_connection_and_switch_to_replication(

try {
if (storage.is_in_gtid_replication_mode()) {
const auto &gtids{storage.get_gtids()};
if (storage.is_empty()) {
static constexpr std::string_view select_gtid_purged_query{
"SELECT @@GLOBAL.gtid_purged"};
storage.set_purged_gtids(binsrv::gtids::gtid_set{
connection.execute_select_query_string_result(
select_gtid_purged_query)});
logger.log(
binsrv::log_severity::info,
"extracted purged GTIDs from the mysql server for an empty "
"storage: " +
boost::lexical_cast<std::string>(storage.get_purged_gtids()));
}

const auto gtids{storage.get_gtids()};
const auto encoded_size{gtids.calculate_encoded_size()};

binsrv::gtids::gtid_set_storage encoded_gtids_buffer(encoded_size);
Expand All @@ -820,7 +833,7 @@ bool open_connection_and_switch_to_replication(
if (operation_mode == binsrv::operation_mode_type::fetch) {
throw;
}
logger.log(binsrv::log_severity::info, "unable to switch to replication");
logger.log(binsrv::log_severity::error, "unable to switch to replication");
return false;
}

Expand Down
21 changes: 21 additions & 0 deletions src/binsrv/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ storage::storage(const storage_config &config,

load_and_validate_binlog_metadata_set(storage_objects,
storage_metadata_objects);
assert(!binlog_records_.front().added_gtids.has_value() ||
purged_gtids_ == binlog_records_.front().added_gtids);
}

storage::~storage() {
Expand All @@ -127,6 +129,18 @@ storage::~storage() {
}
}

void storage::set_purged_gtids(const gtids::gtid_set &purged_gtids) {
if (!is_in_gtid_replication_mode()) {
util::exception_location().raise<std::logic_error>(
"cannot set purged GTIDs in position-based replication mode");
}
if (!is_empty()) {
util::exception_location().raise<std::logic_error>(
"cannot set purged GTIDs in a non-empty storage");
}
purged_gtids_ = purged_gtids;
}

[[nodiscard]] std::string storage::get_backend_description() const {
return backend_->get_description();
}
Expand Down Expand Up @@ -540,6 +554,13 @@ void storage::load_and_validate_binlog_metadata_set(
util::exception_location().raise<std::logic_error>(
"found metadata for a non-existing binlog");
}

// if we are in GTID replication mode, then we can consider GTIDs from the
// first binlog metadata as purged GTIDs for the whole storage
const auto &optional_added_gtids{binlog_records_.front().added_gtids};
if (optional_added_gtids.has_value()) {
purged_gtids_ = *optional_added_gtids;
}
}

} // namespace binsrv
19 changes: 14 additions & 5 deletions src/binsrv/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class [[nodiscard]] storage {
// file to complete the rule of 5
~storage();

[[nodiscard]] const gtids::gtid_set &get_purged_gtids() const noexcept {
return purged_gtids_;
}
void set_purged_gtids(const gtids::gtid_set &purged_gtids);

[[nodiscard]] std::string get_backend_description() const;

[[nodiscard]] replication_mode_type get_replication_mode() const noexcept {
Expand All @@ -94,16 +99,19 @@ class [[nodiscard]] storage {
}

[[nodiscard]] gtids::gtid_set get_gtids() const {
gtids::gtid_set result{};
if (!is_in_gtid_replication_mode()) {
return {};
}

if (is_empty()) {
return result;
return get_purged_gtids();
}
gtids::gtid_set result{};
const auto &optional_previous_gtids{
get_current_binlog_record().previous_gtids};
if (!optional_previous_gtids.has_value()) {
return result;
if (optional_previous_gtids.has_value()) {
result = *optional_previous_gtids;
}
result = *optional_previous_gtids;
const auto &optional_added_gtids{get_current_binlog_record().added_gtids};
if (optional_added_gtids.has_value()) {
result.add(*optional_added_gtids);
Expand Down Expand Up @@ -133,6 +141,7 @@ class [[nodiscard]] storage {

replication_mode_type replication_mode_;
composite_binlog_name binlog_name_sentinel_{};
gtids::gtid_set purged_gtids_{};
binlog_record_container binlog_records_{};

std::uint64_t checkpoint_size_bytes_{0ULL};
Expand Down
56 changes: 55 additions & 1 deletion src/easymysql/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,64 @@ void connection::execute_generic_query_noresult(std::string_view query) {

auto *casted_impl = mysql_deimpl::get(mysql_impl_);
if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) {
raise_core_error_from_connection("cannot execute query", *this);
raise_core_error_from_connection("cannot execute noresult query", *this);
}
}

[[nodiscard]] std::string
connection::execute_select_query_string_result(std::string_view query) {
assert(!is_empty());
if (is_in_replication_mode()) {
util::exception_location().raise<std::logic_error>(
"cannot execute query in replication mode");
}

auto *casted_impl = mysql_deimpl::get(mysql_impl_);
if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) {
raise_core_error_from_connection("cannot execute string result query",
*this);
}

const auto mysql_res_deleter = [](MYSQL_RES *result_raw) {
if (result_raw != nullptr) {
mysql_free_result(result_raw);
}
};
using mysql_res_ptr = std::unique_ptr<MYSQL_RES, decltype(mysql_res_deleter)>;

const mysql_res_ptr result{mysql_store_result(casted_impl),
mysql_res_deleter};
if (!result) {
raise_core_error_from_connection("cannot store query result", *this);
}
if (mysql_num_rows(result.get()) != 1U) {
raise_core_error_from_connection("query did not return exactly one row",
*this);
}

static constexpr std::size_t expected_num_fields{1U};
if (mysql_num_fields(result.get()) != expected_num_fields) {
raise_core_error_from_connection("query did not return exactly one column",
*this);
}

MYSQL_ROW row_raw{mysql_fetch_row(result.get())};
assert(row_raw != nullptr);

const std::span<const char *const, expected_num_fields> row{
row_raw, expected_num_fields};
if (row.front() == nullptr) {
raise_core_error_from_connection("query returned NULL value", *this);
}

const auto *const lengths_raw{mysql_fetch_lengths(result.get())};
assert(lengths_raw != nullptr);
const std::span<const unsigned long, expected_num_fields> lengths{
lengths_raw, expected_num_fields};

return std::string{row.front(), lengths.front()};
}

bool connection::ping() {
assert(!is_empty());
if (is_in_replication_mode()) {
Expand Down
2 changes: 2 additions & 0 deletions src/easymysql/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class [[nodiscard]] connection {
[[nodiscard]] std::string_view get_character_set_name() const noexcept;

void execute_generic_query_noresult(std::string_view query);
[[nodiscard]] std::string
execute_select_query_string_result(std::string_view query);
[[nodiscard]] bool ping();

[[nodiscard]] bool is_in_replication_mode() const noexcept {
Expand Down
Loading