@@ -7,16 +7,15 @@ use std::{
77 sync:: Arc ,
88} ;
99
10+ use anyhow:: Context ;
1011use common:: {
1112 bootstrap_model:: index:: database_index:: IndexedFields ,
1213 document:: ResolvedDocument ,
13- index:: IndexKey ,
14- interval:: {
15- BinaryKey ,
16- End ,
17- Interval ,
18- Start ,
14+ index:: {
15+ IndexKey ,
16+ IndexKeyBytes ,
1917 } ,
18+ interval:: Interval ,
2019 pause:: PauseClient ,
2120 persistence:: {
2221 new_static_repeatable_recent,
@@ -25,7 +24,10 @@ use common::{
2524 RetentionValidator ,
2625 TimestampRange ,
2726 } ,
28- query:: Order ,
27+ query:: {
28+ CursorPosition ,
29+ Order ,
30+ } ,
2931 runtime:: {
3032 RateLimiter ,
3133 Runtime ,
@@ -56,37 +58,37 @@ use value::{
5658/// The key is the last element processed thus far.
5759#[ derive( Clone , Debug ) ]
5860pub struct TableScanCursor {
59- pub index_key : Option < IndexKey > ,
61+ pub index_key : Option < CursorPosition > ,
6062}
6163
6264impl TableScanCursor {
6365 pub fn interval ( & self ) -> Interval {
6466 match & self . index_key {
6567 Some ( index_key) => {
66- let cursor_bytes: BinaryKey = index_key. clone ( ) . into_bytes ( ) . into ( ) ;
67- Interval {
68- start : Start :: Included (
69- // `.increment()` should never fail because:
70- // 1. Document IDs serialize to a nonempty index key
71- // 2. At the very least, the index key starts with ID_TAG < 255
72- cursor_bytes. increment ( ) . expect ( "id should have increment" ) ,
73- ) ,
74- end : End :: Unbounded ,
75- }
68+ let ( _, remaining) = Interval :: all ( ) . split ( index_key. clone ( ) , Order :: Asc ) ;
69+ remaining
7670 } ,
7771 None => Interval :: all ( ) ,
7872 }
7973 }
8074
81- pub fn advance ( & mut self , index_key : IndexKey ) -> anyhow:: Result < ( ) > {
75+ pub fn advance ( & mut self , index_key : CursorPosition ) -> anyhow:: Result < ( ) > {
8276 if let Some ( existing_key) = & self . index_key {
83- anyhow:: ensure!( index_key >= existing_key) ;
77+ anyhow:: ensure!( index_key > existing_key) ;
8478 }
8579 self . index_key = Some ( index_key) ;
8680 Ok ( ( ) )
8781 }
8882}
8983
84+ fn cursor_has_walked ( cursor : Option < & CursorPosition > , key : & IndexKeyBytes ) -> bool {
85+ match cursor {
86+ None => false ,
87+ Some ( CursorPosition :: End ) => true ,
88+ Some ( CursorPosition :: After ( cursor) ) => key <= cursor,
89+ }
90+ }
91+
9092pub struct TableIterator < RT : Runtime > {
9193 runtime : RT ,
9294 persistence : Box < dyn PersistenceReader > ,
@@ -128,12 +130,12 @@ impl<RT: Runtime> TableIterator<RT> {
128130 table_id,
129131 by_id,
130132 IndexedFields :: by_id ( ) ,
131- cursor. map ( |id| IndexKey :: new ( vec ! [ ] , id. into ( ) ) ) ,
133+ cursor. map ( |id| CursorPosition :: After ( IndexKey :: new ( vec ! [ ] , id. into ( ) ) . into_bytes ( ) ) ) ,
132134 rate_limiter,
133135 ) ;
134136 pin_mut ! ( stream) ;
135- while let Some ( item ) = stream. try_next ( ) . await ? {
136- yield item ;
137+ while let Some ( ( _ , ts , doc ) ) = stream. try_next ( ) . await ? {
138+ yield ( doc , ts ) ;
137139 }
138140 }
139141
@@ -152,13 +154,13 @@ impl<RT: Runtime> TableIterator<RT> {
152154 /// Consider a document that exists in the index at snapshot_ts.
153155 /// Either it has changed since snapshot_ts, in which case (2) will find
154156 /// it, or it has not, in which case (1) will find it.
155- #[ try_stream( ok = ( ResolvedDocument , Timestamp ) , error = anyhow:: Error ) ]
157+ #[ try_stream( ok = ( IndexKeyBytes , Timestamp , ResolvedDocument ) , error = anyhow:: Error ) ]
156158 pub async fn stream_documents_in_table_by_index (
157159 mut self ,
158160 table_id : TableId ,
159161 index_id : IndexId ,
160162 indexed_fields : IndexedFields ,
161- cursor : Option < IndexKey > ,
163+ cursor : Option < CursorPosition > ,
162164 rate_limiter : & RateLimiter < RT > ,
163165 ) {
164166 let mut cursor = TableScanCursor { index_key : cursor } ;
@@ -182,22 +184,19 @@ impl<RT: Runtime> TableIterator<RT> {
182184 self . runtime . wait ( delay) . await ;
183185 }
184186 let page_start = cursor. index_key . clone ( ) ;
185- let ( page, new_end_ts) = self
186- . fetch_page ( index_id, table_id, & indexed_fields, & mut cursor)
187- . await ?;
187+ let ( page, new_end_ts) = self . fetch_page ( index_id, table_id, & mut cursor) . await ?;
188188 anyhow:: ensure!( * new_end_ts >= end_ts) ;
189- // If page is empty, we have exhausted the cursor.
190- let page_end = if page. is_empty ( ) {
191- None
192- } else {
193- cursor. index_key . clone ( )
194- } ;
189+ let page_end = cursor
190+ . index_key
191+ . as_ref ( )
192+ . context ( "cursor after page should not be empty" ) ?;
195193 // Filter out rows from the index scan that were modified after
196194 // snapshot_ts. Such documents will be found when walking the
197195 // documents log to generate skipped_keys.
198- let page: Vec < _ > = page
196+ let page: BTreeMap < _ , _ > = page
199197 . into_iter ( )
200- . filter ( |( _, ts) | * ts <= * self . snapshot_ts )
198+ . filter ( |( _, ts, _) | * ts <= * self . snapshot_ts )
199+ . map ( |( index_key, ts, doc) | ( index_key, ( ts, doc) ) )
201200 . collect ( ) ;
202201
203202 // 2. Find any keys for documents that were skipped by this
@@ -219,71 +218,68 @@ impl<RT: Runtime> TableIterator<RT> {
219218 if let Some ( ( first_skipped_key, _) ) = skipped_keys. iter ( ) . next ( ) {
220219 // Check all skipped ids are after the old cursor,
221220 // which ensures the yielded output is in index key order.
222- anyhow:: ensure!( page_start. as_ref( ) < Some ( first_skipped_key) ) ;
221+ anyhow:: ensure!( !cursor_has_walked ( page_start. as_ref( ) , first_skipped_key) ) ;
223222 }
224223 end_ts = new_end_ts;
225224 // Extract the documents from skipped_keys that should be returned in
226225 // the current page.
227- let page_skipped_keys = match page_end. as_ref ( ) {
228- Some ( cursor_key) => {
229- let mut page_skipped_keys = BTreeMap :: new ( ) ;
230- while let Some ( first_skipped_key) = skipped_keys. first_entry ( )
231- && first_skipped_key. key ( ) <= cursor_key
232- {
233- let ( key, value) = first_skipped_key. remove_entry ( ) ;
234- page_skipped_keys. insert ( key, value) ;
235- }
236- page_skipped_keys
237- } ,
238- None => {
239- let page_skipped_keys = skipped_keys;
240- skipped_keys = BTreeMap :: new ( ) ;
241- page_skipped_keys
242- } ,
226+ let page_skipped_keys = {
227+ let mut page_skipped_keys = BTreeMap :: new ( ) ;
228+ while let Some ( first_skipped_key) = skipped_keys. first_entry ( )
229+ && cursor_has_walked ( Some ( page_end) , first_skipped_key. key ( ) )
230+ {
231+ let ( key, value) = first_skipped_key. remove_entry ( ) ;
232+ page_skipped_keys. insert ( key, value) ;
233+ }
234+ page_skipped_keys
243235 } ;
244236 // Note we already fetched these documents earlier when calculating
245237 // skipped_keys, but we would rather not hold them in memory. Since
246238 // skipped documents are relatively rare, an extra fetch
247239 // occasionally is worth it for the memory savings.
248- let page_skipped_docs: Vec < _ > = self
240+ let page_skipped_docs: BTreeMap < _ , _ > = self
249241 . load_revisions_at_snapshot_ts ( stream:: iter (
250242 page_skipped_keys. into_values ( ) . map ( Ok ) ,
251243 ) )
244+ . map_ok ( |( doc, ts) | {
245+ (
246+ doc. index_key ( & indexed_fields, self . persistence . version ( ) )
247+ . into_bytes ( ) ,
248+ ( ts, doc) ,
249+ )
250+ } )
252251 . try_collect ( )
253252 . await ?;
254- let mut merged_page: Vec < _ > = page. into_iter ( ) . chain ( page_skipped_docs) . collect ( ) ;
255- // Re-sort after merging the index walk and skipped keys.
256- merged_page
257- . sort_by_key ( |( doc, _) | doc. index_key ( & indexed_fields, self . persistence . version ( ) ) ) ;
253+ // Merge index walk and skipped keys into BTreeMap, which sorts by index key.
254+ let merged_page: BTreeMap < _ , _ > = page. into_iter ( ) . chain ( page_skipped_docs) . collect ( ) ;
258255
259256 // Sanity check output.
260257 let all_ids: BTreeSet < _ > = merged_page
261258 . iter ( )
262- . map ( |( doc , _ ) | doc. id ( ) . internal_id ( ) )
259+ . map ( |( _ , ( _ , doc ) ) | doc. id ( ) . internal_id ( ) )
263260 . collect ( ) ;
264261 anyhow:: ensure!(
265262 all_ids. len( ) == merged_page. len( ) ,
266263 "duplicate id in table iterator {merged_page:?}"
267264 ) ;
268265 anyhow:: ensure!(
269- merged_page. iter( ) . all( |( _, ts) | * ts <= * self . snapshot_ts) ,
266+ merged_page
267+ . iter( )
268+ . all( |( _, ( ts, _) ) | * ts <= * self . snapshot_ts) ,
270269 "document after snapshot in table iterator {merged_page:?}"
271270 ) ;
272271 anyhow:: ensure!(
273- merged_page. iter( ) . all( |( doc, _) | {
274- let key = doc. index_key( & indexed_fields, self . persistence. version( ) ) ;
275- page_start. as_ref( ) < Some ( & key)
276- && ( page_end
277- . as_ref( )
278- . map_or( true , |cursor_key| key <= * cursor_key) )
272+ merged_page. iter( ) . all( |( key, _) | {
273+ !cursor_has_walked( page_start. as_ref( ) , key)
274+ && cursor_has_walked( Some ( page_end) , key)
279275 } ) ,
280276 "document outside page in table iterator {merged_page:?}"
281277 ) ;
282278
283- for ( revision , revision_ts) in merged_page {
284- yield ( revision , revision_ts) ;
279+ for ( key , ( revision_ts, revision ) ) in merged_page {
280+ yield ( key , revision_ts, revision ) ;
285281 }
286- if page_end . is_none ( ) {
282+ if matches ! ( page_end , CursorPosition :: End ) {
287283 // If we are done, all skipped_keys would be put in this final page.
288284 anyhow:: ensure!( skipped_keys. is_empty( ) ) ;
289285 break ;
@@ -301,20 +297,22 @@ impl<RT: Runtime> TableIterator<RT> {
301297 & self ,
302298 table_id : TableId ,
303299 indexed_fields : & IndexedFields ,
304- lower_bound : Option < & IndexKey > ,
300+ lower_bound : Option < & CursorPosition > ,
305301 start_ts : Timestamp ,
306302 end_ts : RepeatableTimestamp ,
307303 rate_limiter : & RateLimiter < RT > ,
308- ) -> anyhow:: Result < BTreeMap < IndexKey , InternalDocumentId > > {
304+ ) -> anyhow:: Result < BTreeMap < IndexKeyBytes , InternalDocumentId > > {
309305 let reader = self . persistence . clone ( ) ;
310306 let persistence_version = reader. version ( ) ;
311307 let skipped_revs = self . walk_document_log ( table_id, start_ts, end_ts, rate_limiter) ;
312308 let revisions_at_snapshot = self . load_revisions_at_snapshot_ts ( skipped_revs) ;
313309 pin_mut ! ( revisions_at_snapshot) ;
314310 let mut skipped_keys = BTreeMap :: new ( ) ;
315311 while let Some ( ( doc, _) ) = revisions_at_snapshot. try_next ( ) . await ? {
316- let index_key = doc. index_key ( indexed_fields, persistence_version) ;
317- if lower_bound < Some ( & index_key) {
312+ let index_key = doc
313+ . index_key ( indexed_fields, persistence_version)
314+ . into_bytes ( ) ;
315+ if !cursor_has_walked ( lower_bound, & index_key) {
318316 skipped_keys. insert ( index_key, doc. id_with_table_id ( ) ) ;
319317 }
320318 }
@@ -377,9 +375,11 @@ impl<RT: Runtime> TableIterator<RT> {
377375 & self ,
378376 index_id : IndexId ,
379377 table_id : TableId ,
380- indexed_fields : & IndexedFields ,
381378 cursor : & mut TableScanCursor ,
382- ) -> anyhow:: Result < ( Vec < ( ResolvedDocument , Timestamp ) > , RepeatableTimestamp ) > {
379+ ) -> anyhow:: Result < (
380+ Vec < ( IndexKeyBytes , Timestamp , ResolvedDocument ) > ,
381+ RepeatableTimestamp ,
382+ ) > {
383383 let ts = self . new_ts ( ) . await ?;
384384 let repeatable_persistence = RepeatablePersistence :: new (
385385 self . persistence . clone ( ) ,
@@ -394,13 +394,11 @@ impl<RT: Runtime> TableIterator<RT> {
394394 Order :: Asc ,
395395 self . page_size ,
396396 ) ;
397- let documents_in_page: Vec < _ > = stream
398- . take ( self . page_size )
399- . map ( |item| item. map ( |( _, ts, document) | ( document, ts) ) )
400- . try_collect ( )
401- . await ?;
402- if let Some ( ( document, _) ) = documents_in_page. last ( ) {
403- cursor. advance ( document. index_key ( indexed_fields, self . persistence . version ( ) ) ) ?;
397+ let documents_in_page: Vec < _ > = stream. take ( self . page_size ) . try_collect ( ) . await ?;
398+ if documents_in_page. len ( ) < self . page_size {
399+ cursor. advance ( CursorPosition :: End ) ?;
400+ } else if let Some ( ( index_key, ..) ) = documents_in_page. last ( ) {
401+ cursor. advance ( CursorPosition :: After ( index_key. clone ( ) ) ) ?;
404402 }
405403 Ok ( ( documents_in_page, ts) )
406404 }
@@ -767,7 +765,7 @@ mod tests {
767765 assert_eq ! ( revisions. len( ) , 2 ) ;
768766 let k_values: Vec < _ > = revisions
769767 . iter ( )
770- . map ( |( doc , _) | doc. value ( ) . get ( "k" ) . unwrap ( ) . clone ( ) )
768+ . map ( |( _ , _, doc ) | doc. value ( ) . get ( "k" ) . unwrap ( ) . clone ( ) )
771769 . collect ( ) ;
772770 assert_eq ! ( k_values, vec![ assert_val!( "m" ) , assert_val!( "z" ) ] ) ;
773771
0 commit comments