Skip to content

Commit 7682866

Browse files
PS-10320 fix: Local binary log data is not uploaded to S3 bucket upon disconnection due to read_timeout
https://perconadev.atlassian.net/browse/PS-10320 Added additional storage internal buffer flushing at the point where we detect connection termination due to timeout. This can be considered the third kind of checkpoiting (in addition to size-based and time-based ones).
1 parent 3c6e28d commit 7682866

3 files changed

Lines changed: 16 additions & 10 deletions

File tree

src/app.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,8 +633,11 @@ void receive_binlog_events(
633633
storage.discard_incomplete_transaction_events();
634634
}
635635

636-
// TODO: here (upon timing out) we also need to flush internal buffers in
637-
// the storage
636+
// connection termination is a good place to flush any remaining data
637+
// in the event buffer - this can be considered the third kind of
638+
// checkpointing (in addition to size-based and time-based ones)
639+
storage.flush_event_buffer();
640+
638641
logger.log(binsrv::log_severity::info,
639642
"timed out waiting for events and disconnected");
640643
}

src/binsrv/storage.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,7 @@ storage::storage(const storage_config &config,
107107
storage::~storage() {
108108
// bugprone-empty-catch should not be that strict in destructors
109109
try {
110-
if (has_event_data_to_flush()) {
111-
flush_event_buffer();
112-
}
110+
flush_event_buffer();
113111
} catch (...) { // NOLINT(bugprone-empty-catch)
114112
}
115113
}
@@ -255,7 +253,7 @@ void storage::write_event(util::const_byte_span event_data,
255253
checkpoint_interval_seconds_));
256254
}
257255
if (needs_flush) {
258-
flush_event_buffer();
256+
flush_event_buffer_internal();
259257

260258
last_checkpoint_position_ = ready_to_flush_position;
261259
last_checkpoint_timestamp_ = now_ts;
@@ -264,9 +262,7 @@ void storage::write_event(util::const_byte_span event_data,
264262
}
265263

266264
void storage::close_binlog() {
267-
if (has_event_data_to_flush()) {
268-
flush_event_buffer();
269-
}
265+
flush_event_buffer();
270266
event_buffer_.clear();
271267
event_buffer_.shrink_to_fit();
272268

@@ -289,6 +285,12 @@ void storage::discard_incomplete_transaction_events() {
289285
}
290286

291287
void storage::flush_event_buffer() {
288+
if (has_event_data_to_flush()) {
289+
flush_event_buffer_internal();
290+
}
291+
}
292+
293+
void storage::flush_event_buffer_internal() {
292294
assert(!event_buffer_.empty());
293295
assert(last_transaction_boundary_position_in_event_buffer_ <=
294296
std::size(event_buffer_));

src/binsrv/storage.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class [[nodiscard]] storage {
8888
void close_binlog();
8989

9090
void discard_incomplete_transaction_events();
91+
void flush_event_buffer();
9192

9293
private:
9394
basic_storage_backend_ptr backend_;
@@ -129,7 +130,7 @@ class [[nodiscard]] storage {
129130
return get_flushed_position() +
130131
last_transaction_boundary_position_in_event_buffer_;
131132
}
132-
void flush_event_buffer();
133+
void flush_event_buffer_internal();
133134

134135
void load_binlog_index();
135136
void validate_binlog_index(

0 commit comments

Comments
 (0)