Skip to content

Commit 7f30ad6

Browse files
authored
Merge pull request #1300 from permaweb/VinceJuliano/su-anchor
feat(su): show anchor given query param
2 parents 85b5883 + 0048ce3 commit 7f30ad6

File tree

9 files changed

+159
-56
lines changed

9 files changed

+159
-56
lines changed

servers/su/src/domain/clients/local_store/store.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,32 @@ impl LocalStoreClient {
367367

368368
Ok((paginated_keys, has_next_page))
369369
}
370+
371+
fn get_process_bundle(&self, tx_id: &str) -> Result<Vec<u8>, StoreErrorType> {
372+
let assignment_key = self.proc_assignment_key(tx_id);
373+
if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? {
374+
return Ok(process_bundle);
375+
}
376+
377+
let cf = self.index_db.cf_handle("process").ok_or_else(|| {
378+
StoreErrorType::DatabaseError("Column family 'process' not found".to_string())
379+
})?;
380+
let process_key_prefix = format!("process:{}:", tx_id);
381+
let mut iter = self
382+
.index_db
383+
.prefix_iterator_cf(cf, process_key_prefix.as_bytes());
384+
385+
if let Some(result) = iter.next() {
386+
let (_key, assignment_id_bytes) = result?;
387+
let assignment_id = String::from_utf8(assignment_id_bytes.to_vec())?;
388+
let assignment_key = self.proc_assignment_key(&assignment_id);
389+
if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? {
390+
return Ok(process_bundle);
391+
}
392+
}
393+
394+
Err(StoreErrorType::NotFound("Process not found".to_string()))
395+
}
370396
}
371397

372398
#[async_trait]
@@ -462,7 +488,7 @@ impl DataStore for LocalStoreClient {
462488
async fn get_process(&self, tx_id: &str) -> Result<Process, StoreErrorType> {
463489
let assignment_key = self.proc_assignment_key(tx_id);
464490
if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? {
465-
return Ok(Process::from_bytes(process_bundle)?);
491+
return Ok(Process::from_bytes(process_bundle, &None)?);
466492
}
467493

468494
/*
@@ -487,7 +513,7 @@ impl DataStore for LocalStoreClient {
487513
let assignment_id = String::from_utf8(assignment_id_bytes.to_vec())?;
488514
let assignment_key = self.proc_assignment_key(&assignment_id);
489515
if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? {
490-
return Ok(Process::from_bytes(process_bundle)?);
516+
return Ok(Process::from_bytes(process_bundle, &None)?);
491517
}
492518
}
493519

@@ -505,7 +531,7 @@ impl DataStore for LocalStoreClient {
505531
fn get_message(&self, tx_id: &str, _process_id_in: &str) -> Result<Message, StoreErrorType> {
506532
let assignment_key = self.msg_assignment_key(tx_id);
507533
if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? {
508-
let message: Message = Message::from_bytes(message_bundle)?;
534+
let message: Message = Message::from_bytes(message_bundle, &None)?;
509535
return Ok(message);
510536
}
511537

@@ -531,7 +557,7 @@ impl DataStore for LocalStoreClient {
531557
let assignment_id = String::from_utf8(assignment_id_bytes.to_vec())?;
532558
let assignment_key = self.msg_assignment_key(&assignment_id);
533559
if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? {
534-
let message: Message = Message::from_bytes(message_bundle)?;
560+
let message: Message = Message::from_bytes(message_bundle, &None)?;
535561
return Ok(message);
536562
}
537563
}
@@ -651,6 +677,7 @@ impl DataStore for LocalStoreClient {
651677
limit: &Option<i32>,
652678
from_nonce: &Option<String>,
653679
to_nonce: &Option<String>,
680+
show_anchor: &Option<String>,
654681
) -> Result<PaginatedMessages, StoreErrorType> {
655682
let process_id = &process_in.process.process_id;
656683
let limit_val = limit.unwrap_or(100) as usize;
@@ -679,7 +706,12 @@ impl DataStore for LocalStoreClient {
679706
};
680707

681708
if include_process {
682-
let process_message = Message::from_process(process_in.clone())?;
709+
let proc_bundle = self.get_process_bundle(&process_in.process.process_id)?;
710+
let process_message = Message::from_process(
711+
process_in.clone(),
712+
&proc_bundle,
713+
show_anchor
714+
)?;
683715
messages.push(process_message);
684716
/*
685717
Adjust the limit since the process itself
@@ -746,7 +778,12 @@ impl DataStore for LocalStoreClient {
746778
};
747779

748780
if include_process {
749-
let process_message = Message::from_process(process_in.clone())?;
781+
let proc_bundle = self.get_process_bundle(&process_in.process.process_id)?;
782+
let process_message = Message::from_process(
783+
process_in.clone(),
784+
&proc_bundle,
785+
show_anchor
786+
)?;
750787
messages.push(process_message);
751788
/*
752789
Adjust the limit since the process itself
@@ -811,7 +848,7 @@ impl DataStore for LocalStoreClient {
811848
*/
812849
for _ in 0..10 {
813850
if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? {
814-
let message: Message = Message::from_bytes(message_data)?;
851+
let message: Message = Message::from_bytes(message_data, show_anchor)?;
815852
messages.push(message);
816853
break;
817854
} else {
@@ -865,7 +902,7 @@ impl DataStore for LocalStoreClient {
865902

866903
for _ in 0..10 {
867904
if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? {
868-
let message: Message = Message::from_bytes(message_data.clone())?;
905+
let message: Message = Message::from_bytes(message_data.clone(), &None)?;
869906
bundles.push((message.assignment.id, message_data));
870907
break;
871908
} else {

servers/su/src/domain/clients/local_store/tests.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ mod tests {
4747
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
4848

4949
let process_bundle = create_test_process_bundle();
50-
let test_process = Process::from_bytes(process_bundle.clone())?;
50+
let test_process = Process::from_bytes(process_bundle.clone(), &None)?;
5151

5252
client.save_process(&test_process, &process_bundle)?;
5353
let retrieved_process = client.get_process(&test_process.process.process_id).await?;
@@ -65,7 +65,7 @@ mod tests {
6565
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
6666

6767
let message_bundle = create_test_message_bundle();
68-
let test_message = Message::from_bytes(message_bundle.clone())?;
68+
let test_message = Message::from_bytes(message_bundle.clone(), &None)?;
6969

7070
client
7171
.save_message(&test_message, &message_bundle, None)
@@ -82,18 +82,18 @@ mod tests {
8282
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
8383

8484
let (process_bundle, message_bundles) = bundle_list();
85-
let test_process = Process::from_bytes(process_bundle.clone())?;
85+
let test_process = Process::from_bytes(process_bundle.clone(), &None)?;
8686
client.save_process(&test_process, &process_bundle)?;
8787

8888
// Save all messages
8989
for bundle in message_bundles.iter() {
90-
let test_message = Message::from_bytes(bundle.clone())?;
90+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
9191
client.save_message(&test_message, &bundle, None).await?;
9292
}
9393

9494
// Retrieve messages and check nonce order and continuity
9595
let result = client
96-
.get_messages(&test_process, &None, &None, &None, &None, &None)
96+
.get_messages(&test_process, &None, &None, &None, &None, &None, &None)
9797
.await?;
9898
let mut previous_nonce: Option<i32> = None;
9999

@@ -121,17 +121,17 @@ mod tests {
121121
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
122122

123123
let (process_bundle, message_bundles) = bundle_list();
124-
let test_process = Process::from_bytes(process_bundle.clone())?;
124+
let test_process = Process::from_bytes(process_bundle.clone(), &None)?;
125125
client.save_process(&test_process, &process_bundle)?;
126126

127127
for bundle in message_bundles.iter() {
128-
let test_message = Message::from_bytes(bundle.clone())?;
128+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
129129
client.save_message(&test_message, &bundle, None).await?;
130130
}
131131

132132
// Case 1: Default parameters
133133
let result = client
134-
.get_messages(&test_process, &None, &None, &None, &None, &None)
134+
.get_messages(&test_process, &None, &None, &None, &None, &None, &None)
135135
.await?;
136136
// result should also include the process
137137
assert_eq!(result.edges.len(), message_bundles.len() + 1);
@@ -140,7 +140,7 @@ mod tests {
140140
// Case 2: Limit parameter
141141
let limit = 11;
142142
let result = client
143-
.get_messages(&test_process, &None, &None, &Some(limit), &None, &None)
143+
.get_messages(&test_process, &None, &None, &Some(limit), &None, &None, &None)
144144
.await?;
145145
assert_eq!(result.edges.len(), limit as usize);
146146
assert!(result.page_info.has_next_page);
@@ -155,6 +155,7 @@ mod tests {
155155
&None,
156156
&None,
157157
&None,
158+
&None,
158159
)
159160
.await?;
160161
assert!(result
@@ -165,7 +166,7 @@ mod tests {
165166
// // Case 4: With 'to' parameter
166167
let to = "1728412714154".to_string();
167168
let result = client
168-
.get_messages(&test_process, &None, &Some(to.clone()), &None, &None, &None)
169+
.get_messages(&test_process, &None, &Some(to.clone()), &None, &None, &None, &None)
169170
.await?;
170171
assert!(result
171172
.edges
@@ -181,6 +182,7 @@ mod tests {
181182
&None,
182183
&None,
183184
&None,
185+
&None,
184186
)
185187
.await?;
186188
assert!(result.edges.iter().all(|m| {
@@ -197,6 +199,7 @@ mod tests {
197199
&Some(limit),
198200
&None,
199201
&None,
202+
&None
200203
)
201204
.await?;
202205
assert!(result.edges.iter().all(|m| {
@@ -214,40 +217,40 @@ mod tests {
214217
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
215218

216219
let (process_bundle, message_bundles) = bundle_list();
217-
let test_process = Process::from_bytes(process_bundle.clone())?;
220+
let test_process = Process::from_bytes(process_bundle.clone(), &None)?;
218221
client.save_process(&test_process, &process_bundle)?;
219222

220223
// Save half of the messages
221224
for bundle in message_bundles.iter().take(message_bundles.len() / 2) {
222-
let test_message = Message::from_bytes(bundle.clone())?;
225+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
223226
client.save_message(&test_message, &bundle, None).await?;
224227
}
225228

226229
let (process_bundle_2, message_bundles_2) = bundle_list_2();
227-
let test_process_2 = Process::from_bytes(process_bundle_2.clone())?;
230+
let test_process_2 = Process::from_bytes(process_bundle_2.clone(), &None)?;
228231
client.save_process(&test_process_2, &process_bundle_2)?;
229232

230233
// Save half of the messages of next process
231234
for bundle in message_bundles_2.iter().take(message_bundles_2.len() / 2) {
232-
let test_message = Message::from_bytes(bundle.clone())?;
235+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
233236
client.save_message(&test_message, &bundle, None).await?;
234237
}
235238

236239
// Save second half of messages for the first process
237240
for bundle in message_bundles.iter().skip(message_bundles.len() / 2) {
238-
let test_message = Message::from_bytes(bundle.clone())?;
241+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
239242
client.save_message(&test_message, &bundle, None).await?;
240243
}
241244

242245
// Save second half of messages for the second process
243246
for bundle in message_bundles_2.iter().skip(message_bundles_2.len() / 2) {
244-
let test_message = Message::from_bytes(bundle.clone())?;
247+
let test_message = Message::from_bytes(bundle.clone(), &None)?;
245248
client.save_message(&test_message, &bundle, None).await?;
246249
}
247250

248251
// Retrieve messages and check length, nonce order, and continuity
249252
let result = client
250-
.get_messages(&test_process, &None, &None, &None, &None, &None)
253+
.get_messages(&test_process, &None, &None, &None, &None, &None, &None)
251254
.await?;
252255
let mut previous_nonce: Option<i32> = None;
253256

@@ -293,7 +296,7 @@ mod tests {
293296
let client = LocalStoreClient::new(&test_db.file_db_path(), &test_db.index_db_path())?;
294297

295298
let (process_bundle, message_bundles) = bundle_list();
296-
let test_process = Process::from_bytes(process_bundle.clone())?;
299+
let test_process = Process::from_bytes(process_bundle.clone(), &None)?;
297300
let process_id = &test_process.process.process_id;
298301

299302
// Test with no messages - should return None
@@ -304,7 +307,7 @@ mod tests {
304307

305308
// Save messages with different timestamps to test ordering
306309
for (_i, bundle) in message_bundles.iter().enumerate() {
307-
let message = Message::from_bytes(bundle.clone())?;
310+
let message = Message::from_bytes(bundle.clone(), &None)?;
308311
client.save_message(&message, bundle, None).await?;
309312
}
310313

@@ -317,7 +320,7 @@ mod tests {
317320
// The last message should have the highest timestamp
318321
// Based on the bundle_list() messages, we expect the message with highest timestamp
319322
let all_messages: Vec<Message> = message_bundles.iter()
320-
.map(|b| Message::from_bytes(b.clone()).unwrap())
323+
.map(|b| Message::from_bytes(b.clone(), &None).unwrap())
321324
.collect();
322325
let expected_latest = all_messages.iter()
323326
.max_by_key(|m| m.timestamp().unwrap_or(0))

0 commit comments

Comments
 (0)