Skip to content

Commit d1dedde

Browse files
goffrieConvex, Inc.
authored andcommitted
Change the meaning of persistence_reader_stream_revision_pairs prev_rev.document being None (#41170)
Currently prev_rev.document is None if the previous revision is a tombstone (`deleted=1`), and the stream returns an error if the revision does not exist. However, a prev_ts should never be a tombstone, so turn that into an error. On the other hand, it is possible for the previous revision to not exist at all - if it has been garbage collected. Normally this should be forbidden by the retention validator, but the document retention worker intentionally looks at rows out of retention, and it shouldn't encounter an error. GitOrigin-RevId: f08ea7ca3b9a8ed9c0fd367dc40ff74bcb58ff29
1 parent bc270e3 commit d1dedde

File tree

2 files changed

+85
-4
lines changed

2 files changed

+85
-4
lines changed

crates/common/src/persistence_helpers.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,17 @@ pub(crate) async fn persistence_reader_stream_revision_pairs<'a, P: PersistenceR
9999
let rev = DocumentRevision { ts, document };
100100
let prev_rev = prev_ts
101101
.map(|prev_ts| {
102-
let entry = prev_revs
102+
let document = prev_revs
103103
.remove(&DocumentPrevTsQuery { id, ts, prev_ts })
104-
.with_context(|| format!("prev_ts is missing for {id}@{ts}: {prev_ts}"))?;
104+
.map(|entry| {
105+
entry.value.with_context(|| {
106+
format!("prev_ts {prev_ts} of {id}@{ts} points to a deleted value?")
107+
})
108+
})
109+
.transpose()?;
105110
anyhow::Ok(DocumentRevision {
106-
ts: entry.ts,
107-
document: entry.value,
111+
ts: prev_ts,
112+
document,
108113
})
109114
})
110115
.transpose()?;

crates/common/src/testing/persistence_test_suite.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ use crate::{
6363
PersistenceIndexEntry,
6464
TimestampRange,
6565
},
66+
persistence_helpers::{
67+
DocumentRevision,
68+
RevisionPair,
69+
},
6670
query::Order,
6771
testing::{
6872
self,
@@ -247,6 +251,13 @@ macro_rules! run_persistence_test_suite {
247251
persistence_test_suite::persistence_previous_revisions(::std::sync::Arc::new(p)).await
248252
}
249253

254+
#[tokio::test]
255+
async fn test_persistence_load_revision_pairs() -> anyhow::Result<()> {
256+
let $db = $create_db;
257+
let p = $create_persistence;
258+
persistence_test_suite::test_load_revision_pairs(::std::sync::Arc::new(p)).await
259+
}
260+
250261
#[tokio::test]
251262
async fn test_persistence_table_stats() -> anyhow::Result<()> {
252263
let $db = $create_db;
@@ -2121,6 +2132,71 @@ pub async fn persistence_previous_revisions<P: Persistence>(p: Arc<P>) -> anyhow
21212132
Ok(())
21222133
}
21232134

2135+
pub async fn test_load_revision_pairs<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
2136+
let mut id_generator = TestIdGenerator::new();
2137+
let table: TableName = str::parse("table")?;
2138+
2139+
let id = id_generator.user_generate(&table);
2140+
let documents = vec![
2141+
doc(id, 2, Some(2), Some(1))?,
2142+
doc(id, 3, Some(3), Some(2))?,
2143+
doc(id, 4, None, Some(3))?,
2144+
];
2145+
p.write(&documents, &[], ConflictStrategy::Error).await?;
2146+
let revision_pairs: Vec<_> = p
2147+
.reader()
2148+
.load_revision_pairs(
2149+
None,
2150+
TimestampRange::new(Timestamp::must(2)..),
2151+
Order::Asc,
2152+
1,
2153+
Arc::new(NoopRetentionValidator),
2154+
)
2155+
.try_collect()
2156+
.await?;
2157+
assert_eq!(
2158+
revision_pairs,
2159+
vec![
2160+
RevisionPair {
2161+
id: id.into(),
2162+
rev: DocumentRevision {
2163+
ts: Timestamp::must(2),
2164+
document: documents[0].value.clone(),
2165+
},
2166+
prev_rev: Some(DocumentRevision {
2167+
ts: Timestamp::must(1),
2168+
// Odd semantics here: None means that the revision is
2169+
// garbage collected, *not* that it is a tombstone
2170+
document: None
2171+
}),
2172+
},
2173+
RevisionPair {
2174+
id: id.into(),
2175+
rev: DocumentRevision {
2176+
ts: Timestamp::must(3),
2177+
document: documents[1].value.clone(),
2178+
},
2179+
prev_rev: Some(DocumentRevision {
2180+
ts: Timestamp::must(2),
2181+
document: documents[0].value.clone(),
2182+
}),
2183+
},
2184+
RevisionPair {
2185+
id: id.into(),
2186+
rev: DocumentRevision {
2187+
ts: Timestamp::must(4),
2188+
document: None,
2189+
},
2190+
prev_rev: Some(DocumentRevision {
2191+
ts: Timestamp::must(3),
2192+
document: documents[1].value.clone(),
2193+
}),
2194+
},
2195+
]
2196+
);
2197+
Ok(())
2198+
}
2199+
21242200
pub async fn persistence_table_stats<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
21252201
let reader = p.reader();
21262202

0 commit comments

Comments
 (0)