Skip to content

Commit 84bd64f

Browse files
authored
ref(apply): Cleanup commit lsn implementation (#365)
1 parent feb40e5 commit 84bd64f

File tree

1 file changed

+24
-83
lines changed

1 file changed

+24
-83
lines changed

etl/src/replication/apply.rs

Lines changed: 24 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -915,94 +915,42 @@ where
915915
S: SchemaStore + Clone + Send + 'static,
916916
T: ApplyLoopHook,
917917
{
918-
let commit_lsn = get_commit_lsn(state, &message)?;
919-
920918
state.current_tx_events += 1;
921919

922920
match &message {
923921
LogicalReplicationMessage::Begin(begin_body) => {
924-
handle_begin_message(state, start_lsn, commit_lsn, begin_body).await
922+
handle_begin_message(state, start_lsn, begin_body).await
925923
}
926924
LogicalReplicationMessage::Commit(commit_body) => {
927-
handle_commit_message(state, start_lsn, commit_lsn, commit_body, hook, pipeline_id)
928-
.await
925+
handle_commit_message(state, start_lsn, commit_body, hook, pipeline_id).await
929926
}
930927
LogicalReplicationMessage::Relation(relation_body) => {
931-
handle_relation_message(
932-
state,
933-
start_lsn,
934-
commit_lsn,
935-
relation_body,
936-
schema_store,
937-
hook,
938-
)
939-
.await
928+
handle_relation_message(state, start_lsn, relation_body, schema_store, hook).await
940929
}
941930
LogicalReplicationMessage::Insert(insert_body) => {
942-
handle_insert_message(
943-
state,
944-
start_lsn,
945-
commit_lsn,
946-
insert_body,
947-
hook,
948-
schema_store,
949-
)
950-
.await
931+
handle_insert_message(state, start_lsn, insert_body, hook, schema_store).await
951932
}
952933
LogicalReplicationMessage::Update(update_body) => {
953-
handle_update_message(
954-
state,
955-
start_lsn,
956-
commit_lsn,
957-
update_body,
958-
hook,
959-
schema_store,
960-
)
961-
.await
934+
handle_update_message(state, start_lsn, update_body, hook, schema_store).await
962935
}
963936
LogicalReplicationMessage::Delete(delete_body) => {
964-
handle_delete_message(
965-
state,
966-
start_lsn,
967-
commit_lsn,
968-
delete_body,
969-
hook,
970-
schema_store,
971-
)
972-
.await
937+
handle_delete_message(state, start_lsn, delete_body, hook, schema_store).await
973938
}
974939
LogicalReplicationMessage::Truncate(truncate_body) => {
975-
handle_truncate_message(state, start_lsn, commit_lsn, truncate_body, hook).await
940+
handle_truncate_message(state, start_lsn, truncate_body, hook).await
941+
}
942+
LogicalReplicationMessage::Origin(_) => {
943+
debug!("received unsupported ORIGIN message");
944+
Ok(HandleMessageResult::default())
945+
}
946+
LogicalReplicationMessage::Type(_) => {
947+
debug!("received unsupported TYPE message");
948+
Ok(HandleMessageResult::default())
976949
}
977-
LogicalReplicationMessage::Origin(_) => Ok(HandleMessageResult::default()),
978-
LogicalReplicationMessage::Type(_) => Ok(HandleMessageResult::default()),
979950
_ => Ok(HandleMessageResult::default()),
980951
}
981952
}
982953

983-
/// Determines the commit LSN for a replication message based on transaction state.
984-
///
985-
/// This function extracts the appropriate commit LSN depending on the message type.
986-
/// For `BEGIN` messages, it uses the final LSN from the message payload. For all
987-
/// other message types, it retrieves the previously stored `remote_final_lsn`
988-
/// that was set when the transaction began.
989-
fn get_commit_lsn(state: &ApplyLoopState, message: &LogicalReplicationMessage) -> EtlResult<PgLsn> {
990-
// If we are in a `Begin` message, the `commit_lsn` is the `final_lsn` of the payload, in all the
991-
// other cases we read the `remote_final_lsn` which should be always set in case we are within or
992-
// at the end of a transaction (meaning that the event type is different from `Begin`).
993-
if let LogicalReplicationMessage::Begin(message) = message {
994-
Ok(PgLsn::from(message.final_lsn()))
995-
} else {
996-
state.remote_final_lsn.ok_or_else(|| {
997-
etl_error!(
998-
ErrorKind::InvalidState,
999-
"Invalid transaction",
1000-
"A transaction should have started for get_commit_lsn to be performed"
1001-
)
1002-
})
1003-
}
1004-
}
1005-
1006954
/// Handles Postgres BEGIN messages that mark transaction boundaries.
1007955
///
1008956
/// This function processes transaction start events by validating the event type
@@ -1015,7 +963,6 @@ fn get_commit_lsn(state: &ApplyLoopState, message: &LogicalReplicationMessage) -
1015963
async fn handle_begin_message(
1016964
state: &mut ApplyLoopState,
1017965
start_lsn: PgLsn,
1018-
commit_lsn: PgLsn,
1019966
message: &protocol::BeginBody,
1020967
) -> EtlResult<HandleMessageResult> {
1021968
// We track the final lsn of this transaction, which should be equal to the `commit_lsn` of the
@@ -1028,7 +975,7 @@ async fn handle_begin_message(
1028975
state.current_tx_events = 0;
1029976

1030977
// Convert event from the protocol message.
1031-
let event = parse_event_from_begin_message(start_lsn, commit_lsn, message);
978+
let event = parse_event_from_begin_message(start_lsn, final_lsn, message);
1032979

1033980
Ok(HandleMessageResult::return_event(Event::Begin(event)))
1034981
}
@@ -1046,7 +993,6 @@ async fn handle_begin_message(
1046993
async fn handle_commit_message<T>(
1047994
state: &mut ApplyLoopState,
1048995
start_lsn: PgLsn,
1049-
commit_lsn: PgLsn,
1050996
message: &protocol::CommitBody,
1051997
hook: &T,
1052998
pipeline_id: PipelineId,
@@ -1068,14 +1014,14 @@ where
10681014
// If the commit lsn of the message is different from the remote final lsn, it means that the
10691015
// transaction that was started expect a different commit lsn in the commit message. In this case,
10701016
// we want to bail assuming we are in an inconsistent state.
1071-
let commit_lsn_msg = PgLsn::from(message.commit_lsn());
1072-
if commit_lsn_msg != remote_final_lsn {
1017+
let commit_lsn = PgLsn::from(message.commit_lsn());
1018+
if commit_lsn != remote_final_lsn {
10731019
bail!(
10741020
ErrorKind::ValidationError,
10751021
"Invalid commit LSN",
10761022
format!(
10771023
"Incorrect commit LSN {} in COMMIT message (expected {})",
1078-
commit_lsn_msg, remote_final_lsn
1024+
commit_lsn, remote_final_lsn
10791025
)
10801026
);
10811027
}
@@ -1155,7 +1101,6 @@ where
11551101
async fn handle_relation_message<S, T>(
11561102
state: &mut ApplyLoopState,
11571103
start_lsn: PgLsn,
1158-
commit_lsn: PgLsn,
11591104
message: &protocol::RelationBody,
11601105
schema_store: &S,
11611106
hook: &T,
@@ -1196,7 +1141,7 @@ where
11961141
})?;
11971142

11981143
// Convert event from the protocol message.
1199-
let event = parse_event_from_relation_message(start_lsn, commit_lsn, message)?;
1144+
let event = parse_event_from_relation_message(start_lsn, remote_final_lsn, message)?;
12001145

12011146
// We compare the table schema from the relation message with the existing schema (if any).
12021147
// The purpose of this comparison is that we want to throw an error and stop the processing
@@ -1219,7 +1164,6 @@ where
12191164
async fn handle_insert_message<S, T>(
12201165
state: &mut ApplyLoopState,
12211166
start_lsn: PgLsn,
1222-
commit_lsn: PgLsn,
12231167
message: &protocol::InsertBody,
12241168
hook: &T,
12251169
schema_store: &S,
@@ -1245,7 +1189,7 @@ where
12451189

12461190
// Convert event from the protocol message.
12471191
let event =
1248-
parse_event_from_insert_message(schema_store, start_lsn, commit_lsn, message).await?;
1192+
parse_event_from_insert_message(schema_store, start_lsn, remote_final_lsn, message).await?;
12491193

12501194
Ok(HandleMessageResult::return_event(Event::Insert(event)))
12511195
}
@@ -1254,7 +1198,6 @@ where
12541198
async fn handle_update_message<S, T>(
12551199
state: &mut ApplyLoopState,
12561200
start_lsn: PgLsn,
1257-
commit_lsn: PgLsn,
12581201
message: &protocol::UpdateBody,
12591202
hook: &T,
12601203
schema_store: &S,
@@ -1280,7 +1223,7 @@ where
12801223

12811224
// Convert event from the protocol message.
12821225
let event =
1283-
parse_event_from_update_message(schema_store, start_lsn, commit_lsn, message).await?;
1226+
parse_event_from_update_message(schema_store, start_lsn, remote_final_lsn, message).await?;
12841227

12851228
Ok(HandleMessageResult::return_event(Event::Update(event)))
12861229
}
@@ -1289,7 +1232,6 @@ where
12891232
async fn handle_delete_message<S, T>(
12901233
state: &mut ApplyLoopState,
12911234
start_lsn: PgLsn,
1292-
commit_lsn: PgLsn,
12931235
message: &protocol::DeleteBody,
12941236
hook: &T,
12951237
schema_store: &S,
@@ -1315,7 +1257,7 @@ where
13151257

13161258
// Convert event from the protocol message.
13171259
let event =
1318-
parse_event_from_delete_message(schema_store, start_lsn, commit_lsn, message).await?;
1260+
parse_event_from_delete_message(schema_store, start_lsn, remote_final_lsn, message).await?;
13191261

13201262
Ok(HandleMessageResult::return_event(Event::Delete(event)))
13211263
}
@@ -1329,7 +1271,6 @@ where
13291271
async fn handle_truncate_message<T>(
13301272
state: &mut ApplyLoopState,
13311273
start_lsn: PgLsn,
1332-
commit_lsn: PgLsn,
13331274
message: &protocol::TruncateBody,
13341275
hook: &T,
13351276
) -> EtlResult<HandleMessageResult>
@@ -1361,7 +1302,7 @@ where
13611302
}
13621303

13631304
// Convert event from the protocol message.
1364-
let event = parse_event_from_truncate_message(start_lsn, commit_lsn, message, rel_ids);
1305+
let event = parse_event_from_truncate_message(start_lsn, remote_final_lsn, message, rel_ids);
13651306

13661307
Ok(HandleMessageResult::return_event(Event::Truncate(event)))
13671308
}

0 commit comments

Comments
 (0)