Skip to content

Commit 5f7b48b

Browse files
PS-10245 feature: Implement receiving binlog events in GTID mode (part 4) (#85)
https://perconadev.atlassian.net/browse/PS-10245 Implemented more sophisticated event data buffering and flushing at the 'binstv::storage' level. We now accumulate data from several events in the 'binstv::storage'internal memory buffer (16K initially but can grow) and perform actual writing only at transaction boundaries. This helps a lot with making sure that after a crash we can continue operations from a position that corresponds to a committed transaction. This is vitally important for GTID-based replication. 'binstv::storage::write_event()' method extended with an extra parameter that indicates whether the event data being written corresponds to the end of a transaction. Reworked 'binsrv::basic_storage_engine' interface: removed 'flush_stream()' method and all 'write_data_to_stream()' method overloads are now expected to immediately flush data. 'binsrv::filesystem_storage_backend' and 'binsrv::s3_storage_backend' concrete implementations updated to accommodate these changes. We also added "rdbuf()->pubsetbuf(nullptr, 0U)" calls before `std::fstream` read/writes inside 'binsrv::filesystem_storage_backend' to disable internal buffering. Fixed an issue with unnecessary data uploading upon stream closing / object destruction inside 'binsrv::s3_storage_backend'. 'binlog_streaming.binsrv' MTR test case adapted for new buffering behavior. Fixed problem with the 'checkpointing_both' combination in the 'binlog_streaming.checkpointing' MTR test case when checkpointing parameters were not set properly.
1 parent f4a0558 commit 5f7b48b

12 files changed

Lines changed: 88 additions & 41 deletions

mtr/binlog_streaming/t/binsrv.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ INSERT INTO t1 VALUES(DEFAULT);
4242
--let $binsrv_read_timeout = 60
4343
--let $binsrv_idle_time = 10
4444
--let $binsrv_verify_checksum = $extracted_init_connect_variable_value
45+
--let $binsrv_checkpoint_size = 1
4546
--source ../include/set_up_binsrv_environment.inc
4647

4748
--echo

mtr/binlog_streaming/t/checkpointing.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ if ($extracted_init_connect_variable_value == 'interval')
7070
{
7171
--let $binsrv_checkpoint_interval = 5s
7272
}
73-
if ($extracted_init_connect_variable_value == 'interval')
73+
if ($extracted_init_connect_variable_value == 'both')
7474
{
7575
--let $binsrv_checkpoint_size = 2M
7676
--let $binsrv_checkpoint_interval = 5s

src/app.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ void log_span_dump(binsrv::basic_logger &logger,
357357

358358
void process_binlog_event(const binsrv::event::event &current_event,
359359
util::const_byte_span portion,
360+
const binsrv::event::reader_context &context,
360361
binsrv::storage &storage, bool &skip_open_binlog) {
361362
const auto &current_common_header = current_event.get_common_header();
362363
const auto code = current_common_header.get_type_code();
@@ -399,7 +400,7 @@ void process_binlog_event(const binsrv::event::event &current_event,
399400
}
400401
}
401402
if (!is_artificial && !is_pseudo) {
402-
storage.write_event(portion);
403+
storage.write_event(portion, context.is_at_transaction_boundary());
403404
}
404405
if ((code == binsrv::event::code_type::rotate && !is_artificial) ||
405406
code == binsrv::event::code_type::stop) {
@@ -515,7 +516,8 @@ void receive_binlog_events(
515516
boost::lexical_cast<std::string>(context.get_transaction_gtid()));
516517
}
517518

518-
process_binlog_event(current_event, portion, storage, skip_open_binlog);
519+
process_binlog_event(current_event, portion, context, storage,
520+
skip_open_binlog);
519521
}
520522
if (termination_flag.test()) {
521523
logger.log(binsrv::log_severity::info,
@@ -531,6 +533,8 @@ void receive_binlog_events(
531533
util::exception_location().raise<std::logic_error>(
532534
"fetch operation did not reach EOF reading binlog events");
533535
}
536+
// TODO: here (upon timing out) we also need to flush internal buffers in
537+
// the storage
534538
logger.log(binsrv::log_severity::info,
535539
"timed out waiting for events and disconnected");
536540
}

src/binsrv/basic_storage_backend.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,6 @@ void basic_storage_backend::write_data_to_stream(util::const_byte_span data) {
5858
do_write_data_to_stream(data);
5959
}
6060

61-
void basic_storage_backend::flush_stream() {
62-
if (!stream_open_) {
63-
util::exception_location().raise<std::logic_error>(
64-
"cannot flush the stream as it has not been opened");
65-
}
66-
do_flush_stream();
67-
}
68-
6961
void basic_storage_backend::close_stream() {
7062
if (!stream_open_) {
7163
util::exception_location().raise<std::logic_error>(

src/binsrv/basic_storage_backend.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ class basic_storage_backend {
4343
void open_stream(std::string_view name,
4444
storage_backend_open_stream_mode mode);
4545
void write_data_to_stream(util::const_byte_span data);
46-
void flush_stream();
4746
void close_stream();
4847

4948
[[nodiscard]] std::string get_description() const;
@@ -59,7 +58,6 @@ class basic_storage_backend {
5958
virtual void do_open_stream(std::string_view name,
6059
storage_backend_open_stream_mode mode) = 0;
6160
virtual void do_write_data_to_stream(util::const_byte_span data) = 0;
62-
virtual void do_flush_stream() = 0;
6361
virtual void do_close_stream() = 0;
6462

6563
[[nodiscard]] virtual std::string do_get_description() const = 0;

src/binsrv/event/reader_context.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ class [[nodiscard]] reader_context {
5858
return transaction_gtid_;
5959
}
6060
[[nodiscard]] bool is_at_transaction_boundary() const noexcept {
61-
return state_ == state_type::any_other_expected &&
62-
current_transaction_length_ == expected_transaction_length_;
61+
return (state_ == state_type::any_other_expected &&
62+
current_transaction_length_ == expected_transaction_length_) ||
63+
(state_ == state_type::rotate_artificial_expected);
6364
}
6465

6566
private:

src/binsrv/filesystem_storage_backend.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ filesystem_storage_backend::filesystem_storage_backend(
4040
: root_path_{}, ofs_{} {
4141
// TODO: switch to utf8 file names
4242

43+
// setting "unbuffered mode" as we will be using our own buffer
44+
ofs_.rdbuf()->pubsetbuf(nullptr, 0U);
45+
4346
const auto &backend_uri = config.get<"uri">();
4447

4548
const auto uri_parse_result{boost::urls::parse_absolute_uri(backend_uri)};
@@ -109,8 +112,9 @@ filesystem_storage_backend::do_get_object(std::string_view name) {
109112
const auto object_path{get_object_path(name)};
110113

111114
// opening in binary mode
112-
std::ifstream object_ifs{object_path,
113-
std::ios_base::in | std::ios_base::binary};
115+
std::ifstream object_ifs{};
116+
object_ifs.rdbuf()->pubsetbuf(nullptr, 0U);
117+
object_ifs.open(object_path, std::ios_base::in | std::ios_base::binary);
114118
if (!object_ifs.is_open()) {
115119
util::exception_location().raise<std::runtime_error>(
116120
"cannot open underlying object file");
@@ -134,9 +138,10 @@ void filesystem_storage_backend::do_put_object(std::string_view name,
134138
util::const_byte_span content) {
135139
const auto object_path = get_object_path(name);
136140
// opening in binary mode with truncating
137-
std::ofstream object_ofs{object_path, std::ios_base::out |
138-
std::ios_base::binary |
139-
std::ios_base::trunc};
141+
std::ofstream object_ofs{};
142+
object_ofs.rdbuf()->pubsetbuf(nullptr, 0U);
143+
object_ofs.open(object_path, std::ios_base::out | std::ios_base::binary |
144+
std::ios_base::trunc);
140145
if (!object_ofs.is_open()) {
141146
util::exception_location().raise<std::runtime_error>(
142147
"cannot open underlying object file for writing");
@@ -174,10 +179,6 @@ void filesystem_storage_backend::do_write_data_to_stream(
174179
util::exception_location().raise<std::runtime_error>(
175180
"cannot write data to the underlying stream file");
176181
}
177-
}
178-
179-
void filesystem_storage_backend::do_flush_stream() {
180-
assert(ofs_.is_open());
181182
if (!ofs_.flush()) {
182183
util::exception_location().raise<std::runtime_error>(
183184
"cannot flush the underlying stream file");

src/binsrv/filesystem_storage_backend.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class [[nodiscard]] filesystem_storage_backend final
5151
void do_open_stream(std::string_view name,
5252
storage_backend_open_stream_mode mode) override;
5353
void do_write_data_to_stream(util::const_byte_span data) override;
54-
void do_flush_stream() override;
5554
void do_close_stream() override;
5655

5756
[[nodiscard]] std::string do_get_description() const override;

src/binsrv/s3_storage_backend.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -683,10 +683,9 @@ void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) {
683683
util::exception_location().raise<std::runtime_error>(
684684
"cannot flush the temporary file for S3 object");
685685
}
686+
upload_tmp_stream_internal();
686687
}
687688

688-
void s3_storage_backend::do_flush_stream() { upload_tmp_stream_internal(); }
689-
690689
void s3_storage_backend::do_close_stream() {
691690
assert(tmp_fstream_.is_open());
692691
close_stream_internal();
@@ -743,7 +742,6 @@ void s3_storage_backend::upload_tmp_stream_internal() {
743742
}
744743

745744
void s3_storage_backend::close_stream_internal() {
746-
upload_tmp_stream_internal();
747745
tmp_fstream_.close();
748746
// we allow std::filesystem::remove() here to fail - worst case scenario
749747
// we will have a temorary file not removed

src/binsrv/s3_storage_backend.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class [[nodiscard]] s3_storage_backend final : public basic_storage_backend {
7676
void do_open_stream(std::string_view name,
7777
storage_backend_open_stream_mode mode) override;
7878
void do_write_data_to_stream(util::const_byte_span data) override;
79-
void do_flush_stream() override;
8079
void do_close_stream() override;
8180

8281
[[nodiscard]] std::string do_get_description() const override;

0 commit comments

Comments
 (0)