@@ -288,61 +288,82 @@ impl KvStore for InMemoryBackendImpl {
288288 & self , user_token : String , request : ListKeyVersionsRequest ,
289289 ) -> Result < ListKeyVersionsResponse , VssError > {
290290 let store_id = request. store_id ;
291- let key_prefix = request. key_prefix . unwrap_or ( "" . to_string ( ) ) ;
292- let page_token = request. page_token . unwrap_or ( "" . to_string ( ) ) ;
291+ let key_prefix = request. key_prefix . unwrap_or_default ( ) ;
292+ let page_token = request. page_token . unwrap_or_default ( ) ;
293293 let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
294294 let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
295295
296+ // Global version only on first page
296297 let mut global_version = None ;
297298 if page_token. is_empty ( ) {
298- let get_global_version_request = GetObjectRequest {
299+ let get_global = GetObjectRequest {
299300 store_id : store_id. clone ( ) ,
300301 key : GLOBAL_VERSION_KEY . to_string ( ) ,
301302 } ;
302- let get_response = self . get ( user_token. clone ( ) , get_global_version_request ) . await ?;
303- global_version = Some ( get_response . value . unwrap ( ) . version ) ;
303+ let resp = self . get ( user_token. clone ( ) , get_global ) . await ?;
304+ global_version = resp . value . map ( |kv| kv . version ) ;
304305 }
305306
306- let key_versions: Vec < KeyValue > = {
307- let guard = self . store . lock ( ) . unwrap ( ) ;
308- let mut key_versions: Vec < KeyValue > = guard
309- . iter ( )
310- . filter ( |( k, _) | {
311- let parts: Vec < & str > = k. split ( '#' ) . collect ( ) ;
312- if parts. len ( ) < 3 {
313- return false ;
307+ let guard = self . store . lock ( ) . unwrap ( ) ;
308+
309+ let mut latest_per_key: std:: collections:: HashMap < String , KeyValue > =
310+ std:: collections:: HashMap :: new ( ) ;
311+ for ( k, r) in guard. iter ( ) {
312+ let parts: Vec < & str > = k. split ( '#' ) . collect ( ) ;
313+ if parts. len ( ) != 3
314+ || parts[ 0 ] != user_token. as_str ( )
315+ || parts[ 1 ] != store_id. as_str ( )
316+ || parts[ 2 ] == GLOBAL_VERSION_KEY
317+ || !parts[ 2 ] . starts_with ( & key_prefix)
318+ {
319+ continue ;
320+ }
321+ let key = parts[ 2 ] . to_string ( ) ;
322+ let candidate = KeyValue { key : key. clone ( ) , value : Bytes :: new ( ) , version : r. version } ;
323+ latest_per_key
324+ . entry ( key)
325+ . and_modify ( |e| {
326+ if r. version > e. version {
327+ * e = candidate. clone ( )
314328 }
315- parts[ 0 ] == user_token. as_str ( )
316- && parts[ 1 ] == store_id. as_str ( )
317- && parts[ 2 ] . starts_with ( & key_prefix)
318- && parts[ 2 ] > page_token. as_str ( )
319- && parts[ 2 ] != GLOBAL_VERSION_KEY
320- } )
321- . map ( |( _, record) | KeyValue {
322- key : record. key . clone ( ) ,
323- value : Bytes :: new ( ) ,
324- version : record. version ,
325329 } )
326- . collect ( ) ;
330+ . or_insert ( candidate) ;
331+ }
327332
328- key_versions. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
329- key_versions. into_iter ( ) . take ( limit) . collect ( )
330- } ;
333+ let mut keys: Vec < String > = latest_per_key. keys ( ) . cloned ( ) . collect ( ) ;
334+ keys. sort ( ) ;
331335
332- let next_page_token = if key_versions . len ( ) == limit {
333- key_versions . last ( ) . map ( |kv| kv . key . clone ( ) )
336+ let start_idx = if page_token . is_empty ( ) {
337+ 0
334338 } else {
335- None
339+ match keys. iter ( ) . position ( |k| k == & page_token) {
340+ Some ( i) => i + 1 ,
341+ None => {
342+ return Ok ( ListKeyVersionsResponse {
343+ key_versions : vec ! [ ] ,
344+ next_page_token : None ,
345+ global_version,
346+ } ) ;
347+ } ,
348+ }
336349 } ;
337350
338- Ok ( ListKeyVersionsResponse { key_versions, next_page_token, global_version } )
351+ let page_keys: Vec < String > = keys. into_iter ( ) . skip ( start_idx) . take ( limit) . collect ( ) ;
352+ let page: Vec < KeyValue > =
353+ page_keys. iter ( ) . filter_map ( |k| latest_per_key. get ( k) . cloned ( ) ) . collect ( ) ;
354+
355+ let next_page_token =
356+ if page. is_empty ( ) { None } else { page. last ( ) . map ( |kv| kv. key . clone ( ) ) } ;
357+
358+ Ok ( ListKeyVersionsResponse { key_versions : page, next_page_token, global_version } )
339359 }
340360}
341361
342362#[ cfg( test) ]
343363mod tests {
344364 use super :: * ;
345365 use api:: define_kv_store_tests;
366+ use api:: types:: { GetObjectRequest , KeyValue , PutObjectRequest } ;
346367 use bytes:: Bytes ;
347368 use tokio:: test;
348369
0 commit comments