|
4 | 4 | #![feature(coroutines)] |
5 | 5 |
|
6 | 6 | use std::{ |
| 7 | + cmp, |
7 | 8 | collections::{ |
8 | 9 | BTreeMap, |
9 | 10 | BTreeSet, |
@@ -533,36 +534,44 @@ impl PersistenceReader for SqlitePersistence { |
533 | 534 | async fn previous_revisions( |
534 | 535 | &self, |
535 | 536 | ids: BTreeSet<(InternalDocumentId, Timestamp)>, |
| 537 | + retention_validator: Arc<dyn RetentionValidator>, |
536 | 538 | ) -> anyhow::Result< |
537 | 539 | BTreeMap<(InternalDocumentId, Timestamp), (Timestamp, Option<ResolvedDocument>)>, |
538 | 540 | > { |
539 | | - let inner = self.inner.lock(); |
540 | 541 | let mut out = BTreeMap::new(); |
541 | | - for (id, ts) in ids { |
542 | | - let mut stmt = inner.connection.prepare(PREV_REV_QUERY)?; |
543 | | - let internal_id = id.internal_id(); |
544 | | - let params = params![&id.table().0[..], &internal_id[..], &u64::from(ts)]; |
545 | | - let mut row_iter = stmt.query_map(params, load_document_row)?; |
546 | | - if let Some(row) = row_iter.next() { |
547 | | - let (id, prev_ts, table, json_value, deleted) = row?; |
548 | | - let id = InternalId::try_from(id)?; |
549 | | - let table = TableId(table.try_into()?); |
550 | | - let prev_ts = Timestamp::try_from(prev_ts)?; |
551 | | - let document_id = table.id(id); |
552 | | - let document = if !deleted { |
553 | | - let json_value = json_value.ok_or_else(|| { |
554 | | - anyhow::anyhow!("Unexpected NULL json_value at {} {}", id, prev_ts) |
555 | | - })?; |
556 | | - let json_value: serde_json::Value = serde_json::from_str(&json_value)?; |
557 | | - let value: ConvexValue = json_value.try_into()?; |
558 | | - let document = ResolvedDocument::from_database(table, value)?; |
559 | | - Some(document) |
560 | | - } else { |
561 | | - None |
562 | | - }; |
563 | | - out.insert((document_id, ts), (prev_ts, document)); |
| 542 | + let mut min_ts = Timestamp::MAX; |
| 543 | + { |
| 544 | + let inner = self.inner.lock(); |
| 545 | + for (id, ts) in ids { |
| 546 | + let mut stmt = inner.connection.prepare(PREV_REV_QUERY)?; |
| 547 | + let internal_id = id.internal_id(); |
| 548 | + let params = params![&id.table().0[..], &internal_id[..], &u64::from(ts)]; |
| 549 | + let mut row_iter = stmt.query_map(params, load_document_row)?; |
| 550 | + if let Some(row) = row_iter.next() { |
| 551 | + let (id, prev_ts, table, json_value, deleted) = row?; |
| 552 | + let id = InternalId::try_from(id)?; |
| 553 | + let table = TableId(table.try_into()?); |
| 554 | + let prev_ts = Timestamp::try_from(prev_ts)?; |
| 555 | + let document_id = table.id(id); |
| 556 | + let document = if !deleted { |
| 557 | + let json_value = json_value.ok_or_else(|| { |
| 558 | + anyhow::anyhow!("Unexpected NULL json_value at {} {}", id, prev_ts) |
| 559 | + })?; |
| 560 | + let json_value: serde_json::Value = serde_json::from_str(&json_value)?; |
| 561 | + let value: ConvexValue = json_value.try_into()?; |
| 562 | + let document = ResolvedDocument::from_database(table, value)?; |
| 563 | + Some(document) |
| 564 | + } else { |
| 565 | + None |
| 566 | + }; |
| 567 | + min_ts = cmp::min(ts, min_ts); |
| 568 | + out.insert((document_id, ts), (prev_ts, document)); |
| 569 | + } |
564 | 570 | } |
565 | 571 | } |
| 572 | + retention_validator |
| 573 | + .validate_document_snapshot(min_ts) |
| 574 | + .await?; |
566 | 575 | Ok(out) |
567 | 576 | } |
568 | 577 |
|
|
0 commit comments