Skip to content

Commit 067ca95

Browse files
committed
feat: implement retryable state and preserve message context on failure
- Preserve message_event_id, epoch, and mls_group_id when transitioning to Failed state - Allow reprocessing of messages in Retryable state - Update message processing logic to handle retry scenarios - Fix message state persistence in storage implementations
1 parent 21bda1a commit 067ca95

File tree

6 files changed

+455
-9
lines changed

6 files changed

+455
-9
lines changed

crates/mdk-core/src/messages.rs

Lines changed: 196 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,12 +1515,25 @@ where
15151515
sanitized_reason
15161516
);
15171517

1518+
// Try to fetch existing record to preserve message_event_id and other context
1519+
let existing_record = self
1520+
.storage()
1521+
.find_processed_message_by_event_id(&event_id)
1522+
.ok()
1523+
.flatten();
1524+
1525+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1526+
let mls_group_id = existing_record
1527+
.as_ref()
1528+
.and_then(|r| r.mls_group_id.clone());
1529+
let epoch = existing_record.as_ref().and_then(|r| r.epoch);
1530+
15181531
let processed_message = message_types::ProcessedMessage {
15191532
wrapper_event_id: event_id,
1520-
message_event_id: None,
1533+
message_event_id,
15211534
processed_at: Timestamp::now(),
1522-
epoch: None,
1523-
mls_group_id: None,
1535+
epoch,
1536+
mls_group_id,
15241537
state: message_types::ProcessedMessageState::Failed,
15251538
failure_reason: Some(sanitized_reason.to_string()),
15261539
};
@@ -1599,6 +1612,48 @@ where
15991612
.ok_or(Error::MessageNotFound)?;
16001613
Ok(MessageProcessingResult::ApplicationMessage(message))
16011614
}
1615+
message_types::ProcessedMessageState::Retryable => {
1616+
// Retryable messages are ones that previously failed due to wrong epoch keys
1617+
// but have been marked for retry after a rollback. For our own messages,
1618+
// we should have cached content - try to retrieve and return it.
1619+
tracing::debug!(target: "mdk_core::messages::process_message", "Retrying own message after rollback");
1620+
1621+
if let Some(message_event_id) = processed_message.message_event_id
1622+
&& let Ok(Some(mut message)) =
1623+
self.get_message(&group.mls_group_id, &message_event_id)
1624+
{
1625+
// Update states to mark as successfully processed
1626+
message.state = message_types::MessageState::Processed;
1627+
self.storage()
1628+
.save_message(message)
1629+
.map_err(|e| Error::Message(e.to_string()))?;
1630+
1631+
processed_message.state =
1632+
message_types::ProcessedMessageState::Processed;
1633+
self.storage()
1634+
.save_processed_message(processed_message.clone())
1635+
.map_err(|e| Error::Message(e.to_string()))?;
1636+
1637+
tracing::info!(
1638+
target: "mdk_core::messages::process_message",
1639+
"Successfully retried own cached message after rollback"
1640+
);
1641+
let message = self
1642+
.get_message(&group.mls_group_id, &message_event_id)?
1643+
.ok_or(Error::MessageNotFound)?;
1644+
return Ok(MessageProcessingResult::ApplicationMessage(message));
1645+
}
1646+
1647+
// No cached content available - this shouldn't happen for our own messages,
1648+
// but if it does, we can't recover
1649+
tracing::warn!(
1650+
target: "mdk_core::messages::process_message",
1651+
"Retryable own message has no cached content - cannot recover"
1652+
);
1653+
Ok(MessageProcessingResult::Unprocessable {
1654+
mls_group_id: group.mls_group_id.clone(),
1655+
})
1656+
}
16021657
message_types::ProcessedMessageState::ProcessedCommit => {
16031658
tracing::debug!(target: "mdk_core::messages::process_message", "Message already processed as a commit");
16041659

@@ -1667,6 +1722,21 @@ where
16671722
.find_failed_messages_for_retry(&group.mls_group_id)
16681723
.unwrap_or_default();
16691724

1725+
// Mark these messages as Retryable so they can pass through
1726+
// deduplication when the application re-fetches and reprocesses them
1727+
for event_id in &messages_needing_refetch {
1728+
if let Err(e) =
1729+
self.storage().mark_processed_message_retryable(event_id)
1730+
{
1731+
tracing::warn!(
1732+
target: "mdk_core::messages::process_message",
1733+
"Failed to mark message {} as retryable: {}",
1734+
event_id,
1735+
e
1736+
);
1737+
}
1738+
}
1739+
16701740
if let Some(cb) = &self.callback {
16711741
cb.on_rollback(&crate::RollbackInfo {
16721742
group_id: group.mls_group_id.clone(),
@@ -1713,9 +1783,19 @@ where
17131783

17141784
// Not our own commit - this is a genuine error
17151785
tracing::error!(target: "mdk_core::messages::process_message", "Epoch mismatch for message that is not our own commit: {:?}", error);
1786+
1787+
// Try to fetch existing record to preserve message_event_id and other context
1788+
let existing_record = self
1789+
.storage()
1790+
.find_processed_message_by_event_id(&event.id)
1791+
.ok()
1792+
.flatten();
1793+
1794+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1795+
17161796
let processed_message = message_types::ProcessedMessage {
17171797
wrapper_event_id: event.id,
1718-
message_event_id: None,
1798+
message_event_id,
17191799
processed_at: Timestamp::now(),
17201800
epoch: Some(group.epoch),
17211801
mls_group_id: Some(group.mls_group_id.clone()),
@@ -1755,9 +1835,19 @@ where
17551835
}
17561836

17571837
tracing::error!(target: "mdk_core::messages::process_message", "Epoch mismatch for message that is not our own commit: {:?}", error);
1838+
1839+
// Try to fetch existing record to preserve message_event_id and other context
1840+
let existing_record = self
1841+
.storage()
1842+
.find_processed_message_by_event_id(&event.id)
1843+
.ok()
1844+
.flatten();
1845+
1846+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1847+
17581848
let processed_message = message_types::ProcessedMessage {
17591849
wrapper_event_id: event.id,
1760-
message_event_id: None,
1850+
message_event_id,
17611851
processed_at: Timestamp::now(),
17621852
epoch: Some(group.epoch),
17631853
mls_group_id: Some(group.mls_group_id.clone()),
@@ -1774,9 +1864,19 @@ where
17741864
}
17751865
Error::ProcessMessageWrongGroupId => {
17761866
tracing::error!(target: "mdk_core::messages::process_message", "Group ID mismatch: {:?}", error);
1867+
1868+
// Try to fetch existing record to preserve message_event_id and other context
1869+
let existing_record = self
1870+
.storage()
1871+
.find_processed_message_by_event_id(&event.id)
1872+
.ok()
1873+
.flatten();
1874+
1875+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1876+
17771877
let processed_message = message_types::ProcessedMessage {
17781878
wrapper_event_id: event.id,
1779-
message_event_id: None,
1879+
message_event_id,
17801880
processed_at: Timestamp::now(),
17811881
epoch: Some(group.epoch),
17821882
mls_group_id: Some(group.mls_group_id.clone()),
@@ -1793,9 +1893,19 @@ where
17931893
}
17941894
Error::ProcessMessageUseAfterEviction => {
17951895
tracing::error!(target: "mdk_core::messages::process_message", "Attempted to use group after eviction: {:?}", error);
1896+
1897+
// Try to fetch existing record to preserve message_event_id and other context
1898+
let existing_record = self
1899+
.storage()
1900+
.find_processed_message_by_event_id(&event.id)
1901+
.ok()
1902+
.flatten();
1903+
1904+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1905+
17961906
let processed_message = message_types::ProcessedMessage {
17971907
wrapper_event_id: event.id,
1798-
message_event_id: None,
1908+
message_event_id,
17991909
processed_at: Timestamp::now(),
18001910
epoch: Some(group.epoch),
18011911
mls_group_id: Some(group.mls_group_id.clone()),
@@ -1825,9 +1935,19 @@ where
18251935
}
18261936
_ => {
18271937
tracing::error!(target: "mdk_core::messages::process_message", "Unexpected error processing message: {:?}", error);
1938+
1939+
// Try to fetch existing record to preserve message_event_id and other context
1940+
let existing_record = self
1941+
.storage()
1942+
.find_processed_message_by_event_id(&event.id)
1943+
.ok()
1944+
.flatten();
1945+
1946+
let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id);
1947+
18281948
let processed_message = message_types::ProcessedMessage {
18291949
wrapper_event_id: event.id,
1830-
message_event_id: None,
1950+
message_event_id,
18311951
processed_at: Timestamp::now(),
18321952
epoch: Some(group.epoch),
18331953
mls_group_id: Some(group.mls_group_id.clone()),
@@ -2026,7 +2146,7 @@ where
20262146
processed.state
20272147
);
20282148

2029-
// Only block reprocessing for Failed state
2149+
// Block reprocessing for Failed state
20302150
// Other states (Created, Processed, ProcessedCommit) should continue
20312151
// to allow normal message flow (e.g., processing own messages from relay)
20322152
if processed.state == message_types::ProcessedMessageState::Failed {
@@ -2051,6 +2171,16 @@ where
20512171

20522172
return Ok(MessageProcessingResult::Unprocessable { mls_group_id });
20532173
}
2174+
2175+
// Allow Retryable messages to be reprocessed after rollback
2176+
if processed.state == message_types::ProcessedMessageState::Retryable {
2177+
tracing::info!(
2178+
target: "mdk_core::messages::process_message",
2179+
"Retrying previously failed message after rollback (event_id: {})",
2180+
event.id
2181+
);
2182+
// Continue to processing - don't return early
2183+
}
20542184
}
20552185

20562186
// Step 1: Validate event and extract group ID
@@ -8723,6 +8853,63 @@ mod tests {
87238853
);
87248854
}
87258855

8856+
#[test]
8857+
fn test_save_failed_processed_message_preserves_message_event_id() {
8858+
let mdk = create_test_mdk();
8859+
let keys = Keys::generate();
8860+
8861+
// Create a test event
8862+
let event = EventBuilder::new(Kind::Metadata, "")
8863+
.sign_with_keys(&keys)
8864+
.unwrap();
8865+
8866+
// Create a fake message event ID
8867+
let message_event_id =
8868+
EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001")
8869+
.unwrap();
8870+
8871+
// Manually save a Created state with message_event_id (simulating a message we created/sent)
8872+
let processed_message = message_types::ProcessedMessage {
8873+
wrapper_event_id: event.id,
8874+
message_event_id: Some(message_event_id),
8875+
processed_at: nostr::Timestamp::now(),
8876+
epoch: Some(123),
8877+
mls_group_id: Some(GroupId::from_slice(&[1, 2, 3, 4])),
8878+
state: message_types::ProcessedMessageState::Created,
8879+
failure_reason: None,
8880+
};
8881+
mdk.storage()
8882+
.save_processed_message(processed_message)
8883+
.unwrap();
8884+
8885+
// Now simulate a failure (e.g. decryption failed for own message)
8886+
let error = Error::CannotDecryptOwnMessage;
8887+
mdk.save_failed_processed_message(event.id, &error).unwrap();
8888+
8889+
// Verify the message_event_id is preserved
8890+
let updated_record = mdk
8891+
.storage()
8892+
.find_processed_message_by_event_id(&event.id)
8893+
.unwrap()
8894+
.expect("Record should exist");
8895+
8896+
assert_eq!(
8897+
updated_record.state,
8898+
message_types::ProcessedMessageState::Failed
8899+
);
8900+
assert_eq!(
8901+
updated_record.message_event_id,
8902+
Some(message_event_id),
8903+
"message_event_id should be preserved"
8904+
);
8905+
assert_eq!(updated_record.epoch, Some(123), "epoch should be preserved");
8906+
assert_eq!(
8907+
updated_record.mls_group_id,
8908+
Some(GroupId::from_slice(&[1, 2, 3, 4])),
8909+
"mls_group_id should be preserved"
8910+
);
8911+
}
8912+
87268913
/// Test EpochSnapshotManager directly for unit testing
87278914
mod epoch_snapshot_manager_tests {
87288915
use mdk_storage_traits::GroupId;

0 commit comments

Comments
 (0)