Skip to content

Commit 50e7f29

Browse files
erskingardneryukibtc
authored andcommitted
mls-storage: add MessageState
Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent d095088 commit 50e7f29

File tree

5 files changed

+241
-6
lines changed

5 files changed

+241
-6
lines changed

crates/nostr-mls-memory-storage/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ mod tests {
173173
use nostr::{EventId, Kind, PublicKey, RelayUrl, Tags, Timestamp, UnsignedEvent};
174174
use nostr_mls_storage::groups::types::{Group, GroupExporterSecret, GroupState, GroupType};
175175
use nostr_mls_storage::groups::GroupStorage;
176-
use nostr_mls_storage::messages::types::{Message, ProcessedMessageState};
176+
use nostr_mls_storage::messages::types::{Message, MessageState, ProcessedMessageState};
177177
use nostr_mls_storage::messages::MessageStorage;
178178
use nostr_mls_storage::welcomes::types::{ProcessedWelcomeState, Welcome, WelcomeState};
179179
use nostr_mls_storage::welcomes::WelcomeStorage;
@@ -533,6 +533,7 @@ mod tests {
533533
"Hello, world!".to_string(),
534534
),
535535
wrapper_event_id: wrapper_id,
536+
state: MessageState::Created,
536537
};
537538

538539
// Save the message

crates/nostr-mls-sqlite-storage/migrations/V100__initial.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS messages (
5151
tags JSONB NOT NULL,
5252
event JSONB NOT NULL,
5353
wrapper_event_id BLOB NOT NULL, -- Wrapper event ID as byte array
54+
state TEXT NOT NULL,
5455
FOREIGN KEY (mls_group_id) REFERENCES groups(mls_group_id) ON DELETE CASCADE
5556
);
5657

@@ -60,6 +61,7 @@ CREATE INDEX IF NOT EXISTS idx_messages_wrapper_event_id ON messages(wrapper_eve
6061
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at);
6162
CREATE INDEX IF NOT EXISTS idx_messages_pubkey ON messages(pubkey);
6263
CREATE INDEX IF NOT EXISTS idx_messages_kind ON messages(kind);
64+
CREATE INDEX IF NOT EXISTS idx_messages_state ON messages(state);
6365

6466
-- Processed Messages table
6567
CREATE TABLE IF NOT EXISTS processed_messages (

crates/nostr-mls-sqlite-storage/src/db.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use nostr::{EventId, JsonUtil, Kind, PublicKey, RelayUrl, Tags, Timestamp, Unsig
88
use nostr_mls_storage::groups::types::{
99
Group, GroupExporterSecret, GroupRelay, GroupState, GroupType,
1010
};
11-
use nostr_mls_storage::messages::types::{Message, ProcessedMessage, ProcessedMessageState};
11+
use nostr_mls_storage::messages::types::{
12+
Message, MessageState, ProcessedMessage, ProcessedMessageState,
13+
};
1214
use nostr_mls_storage::welcomes::types::{
1315
ProcessedWelcome, ProcessedWelcomeState, Welcome, WelcomeState,
1416
};
@@ -123,6 +125,7 @@ pub fn row_to_message(row: &Row) -> SqliteResult<Message> {
123125
let tags_json: &str = row.get_ref("tags")?.as_str()?;
124126
let event_json: &str = row.get_ref("event")?.as_str()?;
125127
let wrapper_event_id_blob: &[u8] = row.get_ref("wrapper_event_id")?.as_blob()?;
128+
let state_str: &str = row.get_ref("state")?.as_str()?;
126129

127130
// Parse values
128131
let id: EventId =
@@ -142,6 +145,9 @@ pub fn row_to_message(row: &Row) -> SqliteResult<Message> {
142145
let wrapper_event_id: EventId = EventId::from_slice(wrapper_event_id_blob)
143146
.map_err(|_| map_invalid_blob_data("Invalid wrapper event ID"))?;
144147

148+
let state: MessageState =
149+
MessageState::from_str(state_str).map_err(|_| map_invalid_text_data("Invalid state"))?;
150+
145151
Ok(Message {
146152
id,
147153
pubkey,
@@ -152,6 +158,7 @@ pub fn row_to_message(row: &Row) -> SqliteResult<Message> {
152158
tags,
153159
event,
154160
wrapper_event_id,
161+
state,
155162
})
156163
}
157164

crates/nostr-mls-sqlite-storage/src/messages.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ impl MessageStorage for NostrMlsSqliteStorage {
2727
conn_guard
2828
.execute(
2929
"INSERT OR REPLACE INTO messages
30-
(id, pubkey, kind, mls_group_id, created_at, content, tags, event, wrapper_event_id)
31-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
30+
(id, pubkey, kind, mls_group_id, created_at, content, tags, event, wrapper_event_id, state)
31+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
3232
params![
3333
message.id.as_bytes(),
3434
message.pubkey.as_bytes(),
@@ -39,6 +39,7 @@ impl MessageStorage for NostrMlsSqliteStorage {
3939
tags_json,
4040
message.event.as_json(),
4141
message.wrapper_event_id.as_bytes(),
42+
message.state.as_str(),
4243
],
4344
)
4445
.map_err(into_message_err)?;
@@ -114,7 +115,7 @@ mod tests {
114115
use nostr::{EventId, Kind, PublicKey, Tags, Timestamp, UnsignedEvent};
115116
use nostr_mls_storage::groups::types::{Group, GroupState, GroupType};
116117
use nostr_mls_storage::groups::GroupStorage;
117-
use nostr_mls_storage::messages::types::ProcessedMessageState;
118+
use nostr_mls_storage::messages::types::{MessageState, ProcessedMessageState};
118119

119120
use super::*;
120121

@@ -168,6 +169,7 @@ mod tests {
168169
"content".to_string(),
169170
),
170171
wrapper_event_id,
172+
state: MessageState::Created,
171173
};
172174

173175
// Save the message

crates/nostr-mls-storage/src/messages/types.rs

Lines changed: 224 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,80 @@ pub struct Message {
4646
pub event: UnsignedEvent,
4747
/// The event id of the 1059 event that contained the message
4848
pub wrapper_event_id: EventId,
49+
/// The state of the message
50+
pub state: MessageState,
51+
}
52+
53+
/// The state of the message
54+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
55+
pub enum MessageState {
56+
/// The message was created successfully and stored but we don't yet know if it was published to relays.
57+
Created,
58+
/// The message was successfully processed and stored in the database
59+
Processed,
60+
/// The message was deleted by the original sender - via a delete event
61+
Deleted,
62+
}
63+
64+
impl fmt::Display for MessageState {
65+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66+
write!(f, "{}", self.as_str())
67+
}
68+
}
69+
70+
impl MessageState {
71+
/// Get as `&str`
72+
pub fn as_str(&self) -> &str {
73+
match self {
74+
Self::Created => "created",
75+
Self::Processed => "processed",
76+
Self::Deleted => "deleted",
77+
}
78+
}
79+
}
80+
81+
impl FromStr for MessageState {
82+
type Err = MessageError;
83+
84+
fn from_str(s: &str) -> Result<Self, Self::Err> {
85+
match s {
86+
"created" => Ok(Self::Created),
87+
"processed" => Ok(Self::Processed),
88+
"deleted" => Ok(Self::Deleted),
89+
_ => Err(MessageError::InvalidParameters(format!(
90+
"Invalid message state: {}",
91+
s
92+
))),
93+
}
94+
}
95+
}
96+
97+
impl Serialize for MessageState {
98+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
99+
where
100+
S: Serializer,
101+
{
102+
serializer.serialize_str(self.as_str())
103+
}
104+
}
105+
106+
impl<'de> Deserialize<'de> for MessageState {
107+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
108+
where
109+
D: Deserializer<'de>,
110+
{
111+
let s: String = String::deserialize(deserializer)?;
112+
Self::from_str(&s).map_err(serde::de::Error::custom)
113+
}
49114
}
50115

51116
/// The Processing State of the message,
52117
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
53118
pub enum ProcessedMessageState {
119+
/// The processed message (and message) was created successfully and stored but we don't yet know if it was published to relays.
120+
/// This state only happens when you are sending a message. Since we can't decrypt messages from ourselves in MLS groups,
121+
/// once we see this message we mark it as processed but skip the rest of the processing.
122+
Created,
54123
/// The message was successfully processed and stored in the database
55124
Processed,
56125
/// The message failed to be processed and stored in the database
@@ -67,6 +136,7 @@ impl ProcessedMessageState {
67136
/// Get as `&str`
68137
pub fn as_str(&self) -> &str {
69138
match self {
139+
Self::Created => "created",
70140
Self::Processed => "processed",
71141
Self::Failed => "failed",
72142
}
@@ -78,6 +148,7 @@ impl FromStr for ProcessedMessageState {
78148

79149
fn from_str(s: &str) -> Result<Self, Self::Err> {
80150
match s {
151+
"created" => Ok(Self::Created),
81152
"processed" => Ok(Self::Processed),
82153
"failed" => Ok(Self::Failed),
83154
_ => Err(MessageError::InvalidParameters(format!(
@@ -113,8 +184,104 @@ mod tests {
113184

114185
use super::*;
115186

187+
#[test]
188+
fn test_message_state_from_str() {
189+
assert_eq!(
190+
MessageState::from_str("created").unwrap(),
191+
MessageState::Created
192+
);
193+
assert_eq!(
194+
MessageState::from_str("processed").unwrap(),
195+
MessageState::Processed
196+
);
197+
assert_eq!(
198+
MessageState::from_str("deleted").unwrap(),
199+
MessageState::Deleted
200+
);
201+
202+
let err = MessageState::from_str("invalid").unwrap_err();
203+
match err {
204+
MessageError::InvalidParameters(msg) => {
205+
assert!(msg.contains("Invalid message state: invalid"));
206+
}
207+
_ => panic!("Expected InvalidParameters error"),
208+
}
209+
}
210+
211+
#[test]
212+
fn test_message_state_to_string() {
213+
assert_eq!(MessageState::Created.to_string(), "created");
214+
assert_eq!(MessageState::Processed.to_string(), "processed");
215+
assert_eq!(MessageState::Deleted.to_string(), "deleted");
216+
}
217+
218+
#[test]
219+
fn test_message_state_serialization() {
220+
let created = MessageState::Created;
221+
let serialized = serde_json::to_string(&created).unwrap();
222+
assert_eq!(serialized, r#""created""#);
223+
224+
let processed = MessageState::Processed;
225+
let serialized = serde_json::to_string(&processed).unwrap();
226+
assert_eq!(serialized, r#""processed""#);
227+
228+
let deleted = MessageState::Deleted;
229+
let serialized = serde_json::to_string(&deleted).unwrap();
230+
assert_eq!(serialized, r#""deleted""#);
231+
}
232+
233+
#[test]
234+
fn test_message_state_deserialization() {
235+
let created: MessageState = serde_json::from_str(r#""created""#).unwrap();
236+
assert_eq!(created, MessageState::Created);
237+
238+
let processed: MessageState = serde_json::from_str(r#""processed""#).unwrap();
239+
assert_eq!(processed, MessageState::Processed);
240+
241+
let deleted: MessageState = serde_json::from_str(r#""deleted""#).unwrap();
242+
assert_eq!(deleted, MessageState::Deleted);
243+
244+
// Test invalid state
245+
let result = serde_json::from_str::<MessageState>(r#""invalid""#);
246+
assert!(result.is_err());
247+
}
248+
249+
#[test]
250+
fn test_message_serialization() {
251+
// Create a message to test serialization
252+
let pubkey =
253+
PublicKey::from_hex("8a9de562cbbed225b6ea0118dd3997a02df92c0bffd2224f71081a7450c3e549")
254+
.unwrap();
255+
let message = Message {
256+
id: EventId::all_zeros(),
257+
pubkey: pubkey.clone(),
258+
kind: Kind::MlsGroupMessage,
259+
mls_group_id: vec![1, 2, 3, 4],
260+
created_at: Timestamp::now(),
261+
content: "Test message".to_string(),
262+
tags: Tags::new(),
263+
event: UnsignedEvent::new(
264+
pubkey,
265+
Timestamp::now(),
266+
Kind::MlsGroupMessage,
267+
Tags::new(),
268+
"Test message".to_string(),
269+
),
270+
wrapper_event_id: EventId::all_zeros(),
271+
state: MessageState::Created,
272+
};
273+
274+
let serialized = serde_json::to_value(&message).unwrap();
275+
assert_eq!(serialized["state"], json!("created"));
276+
assert_eq!(serialized["content"], json!("Test message"));
277+
}
278+
116279
#[test]
117280
fn test_processed_message_state_from_str() {
281+
assert_eq!(
282+
ProcessedMessageState::from_str("created").unwrap(),
283+
ProcessedMessageState::Created
284+
);
118285
assert_eq!(
119286
ProcessedMessageState::from_str("processed").unwrap(),
120287
ProcessedMessageState::Processed
@@ -135,12 +302,17 @@ mod tests {
135302

136303
#[test]
137304
fn test_processed_message_state_to_string() {
305+
assert_eq!(ProcessedMessageState::Created.to_string(), "created");
138306
assert_eq!(ProcessedMessageState::Processed.to_string(), "processed");
139307
assert_eq!(ProcessedMessageState::Failed.to_string(), "failed");
140308
}
141309

142310
#[test]
143311
fn test_processed_message_state_serialization() {
312+
let created = ProcessedMessageState::Created;
313+
let serialized = serde_json::to_string(&created).unwrap();
314+
assert_eq!(serialized, r#""created""#);
315+
144316
let processed = ProcessedMessageState::Processed;
145317
let serialized = serde_json::to_string(&processed).unwrap();
146318
assert_eq!(serialized, r#""processed""#);
@@ -152,11 +324,18 @@ mod tests {
152324

153325
#[test]
154326
fn test_processed_message_state_deserialization() {
327+
let created: ProcessedMessageState = serde_json::from_str(r#""created""#).unwrap();
328+
assert_eq!(created, ProcessedMessageState::Created);
329+
155330
let processed: ProcessedMessageState = serde_json::from_str(r#""processed""#).unwrap();
156331
assert_eq!(processed, ProcessedMessageState::Processed);
157332

158333
let failed: ProcessedMessageState = serde_json::from_str(r#""failed""#).unwrap();
159334
assert_eq!(failed, ProcessedMessageState::Failed);
335+
336+
// Test invalid state
337+
let result = serde_json::from_str::<ProcessedMessageState>(r#""invalid""#);
338+
assert!(result.is_err());
160339
}
161340

162341
#[test]
@@ -172,6 +351,50 @@ mod tests {
172351

173352
let serialized = serde_json::to_value(&processed_message).unwrap();
174353
assert_eq!(serialized["state"], json!("processed"));
175-
assert_eq!(serialized["failure_reason"], json!(""));
354+
assert_eq!(serialized["failure_reason"], json!(null));
355+
356+
// Create a failed message with a reason
357+
let failed_message = ProcessedMessage {
358+
wrapper_event_id: EventId::all_zeros(),
359+
message_event_id: Some(EventId::all_zeros()),
360+
processed_at: Timestamp::now(),
361+
state: ProcessedMessageState::Failed,
362+
failure_reason: Some("Decryption failed".to_string()),
363+
};
364+
365+
let serialized = serde_json::to_value(&failed_message).unwrap();
366+
assert_eq!(serialized["state"], json!("failed"));
367+
assert_eq!(serialized["failure_reason"], json!("Decryption failed"));
368+
assert!(serialized["message_event_id"].is_string());
369+
}
370+
371+
#[test]
372+
fn test_processed_message_deserialization() {
373+
let json_str = r#"{
374+
"wrapper_event_id": "0000000000000000000000000000000000000000000000000000000000000000",
375+
"message_event_id": null,
376+
"processed_at": 1677721600,
377+
"state": "processed",
378+
"failure_reason": null
379+
}"#;
380+
381+
let processed_message: ProcessedMessage = serde_json::from_str(json_str).unwrap();
382+
assert_eq!(processed_message.state, ProcessedMessageState::Processed);
383+
assert_eq!(processed_message.failure_reason, None);
384+
385+
let json_str = r#"{
386+
"wrapper_event_id": "0000000000000000000000000000000000000000000000000000000000000000",
387+
"message_event_id": "0000000000000000000000000000000000000000000000000000000000000000",
388+
"processed_at": 1677721600,
389+
"state": "failed",
390+
"failure_reason": "Decryption failed"
391+
}"#;
392+
393+
let failed_message: ProcessedMessage = serde_json::from_str(json_str).unwrap();
394+
assert_eq!(failed_message.state, ProcessedMessageState::Failed);
395+
assert_eq!(
396+
failed_message.failure_reason,
397+
Some("Decryption failed".to_string())
398+
);
176399
}
177400
}

0 commit comments

Comments
 (0)