Skip to content

Commit f4a0558

Browse files
PS-10245 feature: Implement receiving binlog events in GTID mode (part 3) (#83)
https://perconadev.atlassian.net/browse/PS-10245 Implemented more sophisticated model of 'binstv::event::reader_context' state machine. Instead of (ROTATE(artificial) FORMAT_DESCRIPTION <ANY>* (ROTATE|STOP)?)+ we now also track GTID-related events and transaction boundaries ( ROTATE(artificial) FORMAT_DESCRIPTION PREVIOUS_GTIDS_LOG? ((ANONYMOUS_GTID_LOG | GTID_LOG | GTID_TAGGED_LOG) <ANY>*)* (ROTATE | STOP)? )+ Implemented additional method 'is_at_transaction_boundary()' that can be used to check if we reached the end of transaction after processing the event. Main application extended with simplified transaction event logging at 'info' level: - we now write simple messages with event code types and event flags if any (e.g. 'artificial') - we also write corresponding GTIDs at the end of transactions. Added new 'binlog_streaming.kill_and_restart' MTR test case which checks for different endings of binlog files after rotation: - normal 'ROTATE' event (after 'FLUSH BINARY LOGS') - normal 'STOP' event (after server shutdown) - termination without 'ROTATE' / 'STOP' event (in case server crashed / was killed and restarted) - currently open binlog file.
1 parent fea51d7 commit f4a0558

9 files changed

Lines changed: 599 additions & 76 deletions
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
*** Resetting replication at the very beginning of the test.
2+
3+
*** Determining the first fresh binary log name.
4+
5+
*** Creating a simple table and filling it with some data.
6+
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
7+
INSERT INTO t1 VALUES(DEFAULT);
8+
9+
*** Restarting the server gracefully
10+
# restart
11+
12+
*** Determining the second binary log name after restart
13+
14+
*** Filling the table with some more data.
15+
INSERT INTO t1 VALUES(DEFAULT);
16+
17+
*** Killing and restarting the server
18+
# Kill and restart
19+
20+
*** Determining the third binary log name after killing and restarting
21+
22+
*** Filling the table with some more data.
23+
INSERT INTO t1 VALUES(DEFAULT);
24+
25+
*** Flushing the third binary log and switching to the fourth one.
26+
FLUSH BINARY LOGS;
27+
28+
*** Determining the fourth binary log name after binlog rotation
29+
30+
*** Filling the table with some more data.
31+
INSERT INTO t1 VALUES(DEFAULT);
32+
33+
*** Checking if the first binlog file ends with the STOP event.
34+
35+
*** Checking if the second binlog file ends with neither stop, nor ROTATE event (XID in this case).
36+
37+
*** Checking if the third binlog file ends with the ROTATE event.
38+
39+
*** Checking if the fourth (still open) binlog file ends with the commit of a transaction (XID).
40+
41+
*** Generating a configuration file in JSON format for the Binlog
42+
*** Server utility.
43+
44+
*** Determining binlog file directory from the server.
45+
46+
*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
47+
*** binlog files downloaded via the Binlog Server utility.
48+
49+
*** Executing the Binlog Server utility to download all binlog data
50+
*** from the server to the <BINSRV_STORAGE_PATH> directory
51+
52+
*** Dropping the table.
53+
DROP TABLE t1;
54+
55+
*** Removing the Binlog Server utility storage directory.
56+
57+
*** Removing the Binlog Server utility log file.
58+
59+
*** Removing the Binlog Server utility configuration file.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
--source ../include/have_binsrv.inc
2+
3+
--source ../include/v80_v84_compatibility_defines.inc
4+
5+
# in case of --repeat=N, we need to start from a fresh binary log to make
6+
# this test deterministic
7+
--echo *** Resetting replication at the very beginning of the test.
8+
--disable_query_log
9+
eval $stmt_reset_binary_logs_and_gtids;
10+
--enable_query_log
11+
12+
--echo
13+
--echo *** Determining the first fresh binary log name.
14+
--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
15+
16+
--echo
17+
--echo *** Creating a simple table and filling it with some data.
18+
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
19+
INSERT INTO t1 VALUES(DEFAULT);
20+
21+
--echo
22+
--echo *** Restarting the server gracefully
23+
--source include/restart_mysqld.inc
24+
25+
--echo
26+
--echo *** Determining the second binary log name after restart
27+
--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
28+
29+
--echo
30+
--echo *** Filling the table with some more data.
31+
INSERT INTO t1 VALUES(DEFAULT);
32+
33+
--echo
34+
--echo *** Killing and restarting the server
35+
--source include/kill_and_restart_mysqld.inc
36+
37+
--echo
38+
--echo *** Determining the third binary log name after killing and restarting
39+
--let $third_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
40+
41+
--echo
42+
--echo *** Filling the table with some more data.
43+
INSERT INTO t1 VALUES(DEFAULT);
44+
45+
--echo
46+
--echo *** Flushing the third binary log and switching to the fourth one.
47+
FLUSH BINARY LOGS;
48+
49+
--echo
50+
--echo *** Determining the fourth binary log name after binlog rotation
51+
--let $fourth_binlog = query_get_value($stmt_show_binary_log_status, File, 1)
52+
53+
--echo
54+
--echo *** Filling the table with some more data.
55+
INSERT INTO t1 VALUES(DEFAULT);
56+
57+
--echo
58+
--echo *** Checking if the first binlog file ends with the STOP event.
59+
# SHOW BINLOG EVENTS IN 'binlog.000001';
60+
# Log_name Pos Event_type Server_id End_log_pos Info
61+
# binlog.000001 4 Format_desc 1 126 Server ver: 8.0.43-34, Binlog ver: 4
62+
# binlog.000001 126 Previous_gtids 1 157
63+
# binlog.000001 157 Anonymous_Gtid 1 236 SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
64+
# binlog.000001 236 Query 1 411 use `test`; CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB /* xid=6 */
65+
# binlog.000001 411 Anonymous_Gtid 1 490 SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
66+
# binlog.000001 490 Query 1 565 BEGIN
67+
# binlog.000001 565 Table_map 1 613 table_id: 125 (test.t1)
68+
# binlog.000001 613 Write_rows 1 653 table_id: 125 flags: STMT_END_F
69+
# binlog.000001 653 Xid 1 684 COMMIT /* xid=7 */
70+
# binlog.000001 684 Stop 1 707
71+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$first_binlog', Event_type, 10)
72+
--assert($extracted_event_type == "Stop")
73+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$first_binlog', Event_type, 11)
74+
--assert($extracted_event_type == "No such row")
75+
76+
--echo
77+
--echo *** Checking if the second binlog file ends with neither stop, nor ROTATE event (XID in this case).
78+
# SHOW BINLOG EVENTS IN 'binlog.000002';
79+
# Log_name Pos Event_type Server_id End_log_pos Info
80+
# binlog.000002 4 Format_desc 1 126 Server ver: 8.0.43-34, Binlog ver: 4
81+
# binlog.000002 126 Previous_gtids 1 157
82+
# binlog.000002 157 Anonymous_Gtid 1 236 SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
83+
# binlog.000002 236 Query 1 311 BEGIN
84+
# binlog.000002 311 Table_map 1 359 table_id: 89 (test.t1)
85+
# binlog.000002 359 Write_rows 1 399 table_id: 89 flags: STMT_END_F
86+
# binlog.000002 399 Xid 1 430 COMMIT /* xid=9 */
87+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$second_binlog', Event_type, 7)
88+
--assert($extracted_event_type == "Xid")
89+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$second_binlog', Event_type, 8)
90+
--assert($extracted_event_type == "No such row")
91+
92+
--echo
93+
--echo *** Checking if the third binlog file ends with the ROTATE event.
94+
# SHOW BINLOG EVENTS IN 'binlog.000003';
95+
# Log_name Pos Event_type Server_id End_log_pos Info
96+
# binlog.000003 4 Format_desc 1 126 Server ver: 8.0.43-34, Binlog ver: 4
97+
# binlog.000003 126 Previous_gtids 1 157
98+
# binlog.000003 157 Anonymous_Gtid 1 236 SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
99+
# binlog.000003 236 Query 1 311 BEGIN
100+
# binlog.000003 311 Table_map 1 359 table_id: 89 (test.t1)
101+
# binlog.000003 359 Write_rows 1 399 table_id: 89 flags: STMT_END_F
102+
# binlog.000003 399 Xid 1 430 COMMIT /* xid=9 */
103+
# binlog.000003 430 Rotate 1 474 binlog.000004;pos=4
104+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$third_binlog', Event_type, 8)
105+
--assert($extracted_event_type == "Rotate")
106+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$third_binlog', Event_type, 9)
107+
--assert($extracted_event_type == "No such row")
108+
109+
--echo
110+
--echo *** Checking if the fourth (still open) binlog file ends with the commit of a transaction (XID).
111+
# SHOW BINLOG EVENTS IN 'binlog.000004';
112+
# Log_name Pos Event_type Server_id End_log_pos Info
113+
# binlog.000004 4 Format_desc 1 126 Server ver: 8.0.43-34, Binlog ver: 4
114+
# binlog.000004 126 Previous_gtids 1 157
115+
# binlog.000004 157 Anonymous_Gtid 1 236 SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
116+
# binlog.000004 236 Query 1 311 BEGIN
117+
# binlog.000004 311 Table_map 1 359 table_id: 89 (test.t1)
118+
# binlog.000004 359 Write_rows 1 399 table_id: 89 flags: STMT_END_F
119+
# binlog.000004 399 Xid 1 430 COMMIT /* xid=12 */
120+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$fourth_binlog', Event_type, 7)
121+
--assert($extracted_event_type == "Xid")
122+
--let $extracted_event_type = query_get_value(SHOW BINLOG EVENTS IN '$fourth_binlog', Event_type, 8)
123+
--assert($extracted_event_type == "No such row")
124+
125+
# identifying backend storage type ('file' or 's3')
126+
--source ../include/identify_storage_backend.inc
127+
128+
# creating data directory, configuration file, etc.
129+
--let $binsrv_connect_timeout = 20
130+
--let $binsrv_read_timeout = 60
131+
--let $binsrv_idle_time = 10
132+
--let $binsrv_verify_checksum = TRUE
133+
--source ../include/set_up_binsrv_environment.inc
134+
135+
--echo
136+
--echo *** Executing the Binlog Server utility to download all binlog data
137+
--echo *** from the server to the <BINSRV_STORAGE_PATH> directory
138+
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null
139+
140+
--echo
141+
--echo *** Dropping the table.
142+
DROP TABLE t1;
143+
144+
# cleaning up
145+
--source ../include/tear_down_binsrv_environment.inc

src/app.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,11 @@ void log_replication_info(
298298
msg += ", starting from ";
299299
if (replication_mode == binsrv::replication_mode_type::position) {
300300
if (storage.has_current_binlog_name()) {
301-
msg += "the very beginning";
302-
} else {
303301
msg += storage.get_current_binlog_name();
304302
msg += ":";
305303
msg += std::to_string(storage.get_current_position());
304+
} else {
305+
msg += "the very beginning";
306306
}
307307
} else {
308308
const auto &gtids{storage.get_gtids()};
@@ -320,6 +320,9 @@ void log_replication_info(
320320

321321
void log_span_dump(binsrv::basic_logger &logger,
322322
util::const_byte_span portion) {
323+
logger.log(binsrv::log_severity::debug,
324+
"fetched " + std::to_string(std::size(portion)) +
325+
"-byte(s) event from binlog");
323326
static constexpr std::size_t bytes_per_dump_line{16U};
324327
std::size_t offset{0U};
325328
while (offset < std::size(portion)) {
@@ -489,19 +492,28 @@ void receive_binlog_events(
489492
"unexpected event prefix");
490493
}
491494
portion = portion.subspan(1U);
492-
logger.log(binsrv::log_severity::info,
493-
"fetched " + std::to_string(std::size(portion)) +
494-
"-byte(s) event from binlog");
495495
log_span_dump(logger, portion);
496496

497497
// TODO: just for redirection to another byte stream we need to parse
498498
// the ROTATE and FORMAT_DESCRIPTION events only, every other one
499499
// can be just considered as a data portion (unless we want to do
500500
// basic integrity checks like event sizes / position and CRC)
501501
const binsrv::event::event current_event{context, portion};
502+
const auto &current_header{current_event.get_common_header()};
503+
auto readable_flags{current_header.get_readable_flags()};
504+
logger.log(
505+
binsrv::log_severity::info,
506+
"event: " + std::string{current_header.get_readable_type_code()} +
507+
(readable_flags.empty() ? "" : "(" + readable_flags + ")"));
502508
logger.log(binsrv::log_severity::debug,
503509
"Parsed event:\n" +
504510
boost::lexical_cast<std::string>(current_event));
511+
if (context.is_at_transaction_boundary()) {
512+
logger.log(
513+
binsrv::log_severity::info,
514+
"encountered the end of transaction " +
515+
boost::lexical_cast<std::string>(context.get_transaction_gtid()));
516+
}
505517

506518
process_binlog_event(current_event, portion, storage, skip_open_binlog);
507519
}

src/binsrv/event/gtid_log_post_header.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
#include "binsrv/event/gtid_log_flag_type.hpp"
2929

30+
#include "binsrv/gtids/common_types.hpp"
31+
#include "binsrv/gtids/gtid.hpp"
3032
#include "binsrv/gtids/uuid.hpp"
3133

3234
#include "util/byte_span.hpp"
@@ -132,6 +134,13 @@ gtid_log_post_header::get_flags() const noexcept {
132134
return boost::lexical_cast<std::string>(get_uuid());
133135
}
134136

137+
[[nodiscard]] gtids::gtid gtid_log_post_header::get_gtid() const {
138+
if (get_gno() < gtids::min_gno) {
139+
return {};
140+
}
141+
return {get_uuid(), get_gno()};
142+
}
143+
135144
std::ostream &operator<<(std::ostream &output,
136145
const gtid_log_post_header &obj) {
137146
return output << "flags: " << obj.get_readable_flags()

src/binsrv/event/gtid_log_post_header.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
#include "binsrv/event/gtid_log_flag_type_fwd.hpp"
2525

2626
#include "binsrv/gtids/common_types.hpp"
27-
#include "binsrv/gtids/uuid.hpp"
27+
#include "binsrv/gtids/gtid_fwd.hpp"
28+
#include "binsrv/gtids/uuid_fwd.hpp"
2829

2930
#include "util/byte_span_fwd.hpp"
3031

@@ -50,7 +51,12 @@ class [[nodiscard]] gtid_log_post_header {
5051
[[nodiscard]] gtids::uuid get_uuid() const noexcept;
5152
[[nodiscard]] std::string get_readable_uuid() const;
5253

54+
[[nodiscard]] gtids::gtid get_gtid() const;
55+
5356
[[nodiscard]] std::int64_t get_gno_raw() const noexcept { return gno_; }
57+
[[nodiscard]] gtids::gno_t get_gno() const noexcept {
58+
return static_cast<gtids::gno_t>(get_gno_raw());
59+
}
5460

5561
[[nodiscard]] std::uint8_t get_logical_ts_code_raw() const noexcept {
5662
return logical_ts_code_;

src/binsrv/event/gtid_tagged_log_body_impl.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
#include "binsrv/event/code_type.hpp"
3939
#include "binsrv/event/gtid_log_flag_type.hpp"
4040

41+
#include "binsrv/gtids/gtid.hpp"
42+
#include "binsrv/gtids/tag.hpp"
4143
#include "binsrv/gtids/uuid.hpp"
4244

4345
#include "util/bounded_string_storage.hpp"
@@ -160,11 +162,22 @@ generic_body_impl<code_type::gtid_tagged_log>::get_readable_uuid() const {
160162
return boost::lexical_cast<std::string>(get_uuid());
161163
}
162164

165+
[[nodiscard]] gtids::tag
166+
generic_body_impl<code_type::gtid_tagged_log>::get_tag() const {
167+
return gtids::tag{get_tag_raw()};
168+
}
169+
163170
[[nodiscard]] std::string_view
164-
generic_body_impl<code_type::gtid_tagged_log>::get_tag() const noexcept {
171+
generic_body_impl<code_type::gtid_tagged_log>::get_readable_tag()
172+
const noexcept {
165173
return util::to_string_view(get_tag_raw());
166174
}
167175

176+
[[nodiscard]] gtids::gtid
177+
generic_body_impl<code_type::gtid_tagged_log>::get_gtid() const {
178+
return {get_uuid(), get_tag(), get_gno()};
179+
}
180+
168181
[[nodiscard]] std::string generic_body_impl<code_type::gtid_tagged_log>::
169182
get_readable_immediate_commit_timestamp() const {
170183
// threre is still no way to get string representationof the
@@ -204,7 +217,8 @@ operator<<(std::ostream &output,
204217
const generic_body_impl<code_type::gtid_tagged_log> &obj) {
205218
output << "flags: " << obj.get_readable_flags()
206219
<< ", uuid: " << obj.get_readable_uuid()
207-
<< ", gno: " << obj.get_gno_raw() << ", tag: " << obj.get_tag()
220+
<< ", gno: " << obj.get_gno_raw()
221+
<< ", tag: " << obj.get_readable_tag()
208222
<< ", last_committed: " << obj.get_last_committed_raw()
209223
<< ", sequence_number: " << obj.get_sequence_number_raw();
210224

src/binsrv/event/gtid_tagged_log_body_impl.hpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
#include "binsrv/event/gtid_tagged_log_body_impl_fwd.hpp" // IWYU pragma: export
2020

21+
#include <chrono>
2122
#include <cstddef>
2223
#include <cstdint>
2324

2425
#include "binsrv/event/gtid_log_flag_type_fwd.hpp"
2526

2627
#include "binsrv/gtids/common_types.hpp"
27-
#include "binsrv/gtids/uuid.hpp"
28+
#include "binsrv/gtids/gtid_fwd.hpp"
29+
#include "binsrv/gtids/tag_fwd.hpp"
30+
#include "binsrv/gtids/uuid_fwd.hpp"
2831

2932
#include "util/bounded_string_storage.hpp"
3033
#include "util/byte_span_fwd.hpp"
@@ -49,11 +52,17 @@ template <> class [[nodiscard]] generic_body_impl<code_type::gtid_tagged_log> {
4952
[[nodiscard]] std::string get_readable_uuid() const;
5053

5154
[[nodiscard]] std::int64_t get_gno_raw() const noexcept { return gno_; }
55+
[[nodiscard]] gtids::gno_t get_gno() const noexcept {
56+
return static_cast<gtids::gno_t>(get_gno_raw());
57+
}
5258

5359
[[nodiscard]] const gtids::tag_storage &get_tag_raw() const noexcept {
5460
return tag_;
5561
}
56-
[[nodiscard]] std::string_view get_tag() const noexcept;
62+
[[nodiscard]] gtids::tag get_tag() const;
63+
[[nodiscard]] std::string_view get_readable_tag() const noexcept;
64+
65+
[[nodiscard]] gtids::gtid get_gtid() const;
5766

5867
[[nodiscard]] std::int64_t get_last_committed_raw() const noexcept {
5968
return last_committed_;

0 commit comments

Comments
 (0)