@@ -288,66 +288,177 @@ impl KvStore for InMemoryBackendImpl {
288288 & self , user_token : String , request : ListKeyVersionsRequest ,
289289 ) -> Result < ListKeyVersionsResponse , VssError > {
290290 let store_id = request. store_id ;
291- let key_prefix = request. key_prefix . unwrap_or ( "" . to_string ( ) ) ;
292- let page_token = request. page_token . unwrap_or ( "" . to_string ( ) ) ;
291+ let key_prefix = request. key_prefix . unwrap_or_default ( ) ;
292+ let page_token = request. page_token . unwrap_or_default ( ) ;
293293 let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
294294 let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
295295
296+ // Global version only on first page
296297 let mut global_version = None ;
297298 if page_token. is_empty ( ) {
298- let get_global_version_request = GetObjectRequest {
299+ let get_global = GetObjectRequest {
299300 store_id : store_id. clone ( ) ,
300301 key : GLOBAL_VERSION_KEY . to_string ( ) ,
301302 } ;
302- let get_response = self . get ( user_token. clone ( ) , get_global_version_request ) . await ?;
303- global_version = Some ( get_response . value . unwrap ( ) . version ) ;
303+ let resp = self . get ( user_token. clone ( ) , get_global ) . await ?;
304+ global_version = resp . value . map ( |kv| kv . version ) ;
304305 }
305306
306- let key_versions: Vec < KeyValue > = {
307- let guard = self . store . lock ( ) . unwrap ( ) ;
308- let mut key_versions: Vec < KeyValue > = guard
309- . iter ( )
310- . filter ( |( k, _) | {
311- let parts: Vec < & str > = k. split ( '#' ) . collect ( ) ;
312- if parts. len ( ) < 3 {
313- return false ;
314- }
315- parts[ 0 ] == user_token. as_str ( )
316- && parts[ 1 ] == store_id. as_str ( )
317- && parts[ 2 ] . starts_with ( & key_prefix)
318- && parts[ 2 ] > page_token. as_str ( )
319- && parts[ 2 ] != GLOBAL_VERSION_KEY
320- } )
321- . map ( |( _, record) | KeyValue {
322- key : record. key . clone ( ) ,
323- value : Bytes :: new ( ) ,
324- version : record. version ,
325- } )
326- . collect ( ) ;
307+ let guard = self . store . lock ( ) . unwrap ( ) ;
327308
328- key_versions. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
329- key_versions. into_iter ( ) . take ( limit) . collect ( )
330- } ;
309+ // Step 1: Collect ALL matching keys
310+ let mut candidates: Vec < KeyValue > = guard
311+ . iter ( )
312+ . filter ( |( k, _) | {
313+ let parts: Vec < & str > = k. split ( '#' ) . collect ( ) ;
314+ parts. len ( ) == 3
315+ && parts[ 0 ] == user_token. as_str ( )
316+ && parts[ 1 ] == store_id. as_str ( )
317+ && parts[ 2 ] . starts_with ( & key_prefix)
318+ && parts[ 2 ] != GLOBAL_VERSION_KEY
319+ } )
320+ . map ( |( _, r) | KeyValue { key : r. key . clone ( ) , value : Bytes :: new ( ) , version : r. version } )
321+ . collect ( ) ;
331322
332- let next_page_token = if key_versions. len ( ) == limit {
333- key_versions. last ( ) . map ( |kv| kv. key . clone ( ) )
323+ // Step 2: Sort by numeric prefix
324+ candidates. sort_by_key ( |kv| {
325+ kv. key . strip_suffix ( 'k' ) . unwrap_or ( & kv. key ) . parse :: < i32 > ( ) . unwrap_or ( 999999 )
326+ } ) ;
327+
328+ // DEBUG: Print what we have
329+ println ! (
330+ "LIST: prefix={}, token={}, candidates=[{}]" ,
331+ key_prefix,
332+ page_token,
333+ candidates. iter( ) . map( |kv| kv. key. as_str( ) ) . collect:: <Vec <_>>( ) . join( ", " )
334+ ) ;
335+
336+ // Step 3: Skip up to and including page_token
337+ let start_idx = if page_token. is_empty ( ) {
338+ 0
334339 } else {
335- None
340+ candidates
341+ . iter ( )
342+ . position ( |kv| kv. key == page_token)
343+ . map ( |i| i + 1 )
344+ . unwrap_or ( candidates. len ( ) )
336345 } ;
337346
338- Ok ( ListKeyVersionsResponse { key_versions, next_page_token, global_version } )
347+ println ! ( "start_idx = {}" , start_idx) ;
348+
349+ // Step 4: Take page
350+ let page: Vec < KeyValue > = candidates. into_iter ( ) . skip ( start_idx) . take ( limit) . collect ( ) ;
351+
352+ // Step 5: Next token
353+ let next_page_token =
354+ if page. len ( ) == limit { page. last ( ) . map ( |kv| kv. key . clone ( ) ) } else { None } ;
355+
356+ println ! ( "PAGE: {} keys, next_token={:?}" , page. len( ) , next_page_token) ;
357+
358+ Ok ( ListKeyVersionsResponse { key_versions : page, next_page_token, global_version } )
339359 }
340360}
341361
342362#[ cfg( test) ]
343363mod tests {
344364 use super :: * ;
345365 use api:: define_kv_store_tests;
366+ use api:: types:: { GetObjectRequest , GetObjectResponse , KeyValue , PutObjectRequest } ;
346367 use bytes:: Bytes ;
368+ use prost:: Message ;
369+ use reqwest:: Client ;
370+ use std:: path:: PathBuf ;
371+ use std:: process:: { Child , Command } ;
372+ use std:: thread;
373+ use std:: time:: Duration ;
347374 use tokio:: test;
348375
349376 define_kv_store_tests ! ( InMemoryKvStoreTest , InMemoryBackendImpl , InMemoryBackendImpl :: new( ) ) ;
350377
378+ fn start_server ( ) -> Child {
379+ let status = Command :: new ( "cargo" )
380+ . args ( [ "build" , "--bin" , "server" ] )
381+ . current_dir ( "../server" )
382+ . status ( )
383+ . expect ( "failed to build server" ) ;
384+ if !status. success ( ) {
385+ panic ! ( "server build failed" ) ;
386+ }
387+
388+ // for debugging
389+ let manifest_dir = PathBuf :: from ( env ! ( "CARGO_MANIFEST_DIR" ) ) ;
390+ let target_dir = manifest_dir. parent ( ) . unwrap ( ) . join ( "target/debug/server" ) ;
391+
392+ if !target_dir. exists ( ) {
393+ panic ! ( "Server binary not found at {:?}" , target_dir) ;
394+ }
395+
396+ Command :: new ( & target_dir)
397+ . arg ( "vss-server-config.toml" )
398+ . arg ( "--in-memory" )
399+ . current_dir ( "../server" )
400+ . spawn ( )
401+ . expect ( "failed to start server" )
402+ }
403+
404+ #[ tokio:: test]
405+ async fn test_put_and_get_via_http ( ) {
406+ let mut child = start_server ( ) ;
407+ thread:: sleep ( Duration :: from_secs ( 6 ) ) ;
408+
409+ let client = Client :: new ( ) ;
410+ let base_url = "http://127.0.0.1:8080/vss" ;
411+
412+ let put_req = PutObjectRequest {
413+ store_id : "test_store" . to_string ( ) ,
414+ transaction_items : vec ! [ KeyValue {
415+ key: "key1" . to_string( ) ,
416+ value: b"value1" . to_vec( ) . into( ) ,
417+ version: 0 ,
418+ } ] ,
419+ delete_items : vec ! [ ] ,
420+ global_version : None ,
421+ } ;
422+
423+ let put_body = put_req. encode_to_vec ( ) ;
424+
425+ let put_res = client
426+ . post ( & format ! ( "{}/putObjects" , base_url) )
427+ . header ( "Content-Type" , "application/octet-stream" )
428+ . header ( "Authorization" , "Bearer test_user" )
429+ . body ( put_body)
430+ . send ( )
431+ . await
432+ . expect ( "PUT failed" ) ;
433+
434+ assert ! ( put_res. status( ) . is_success( ) , "PUT failed: {}" , put_res. status( ) ) ;
435+
436+ let get_req =
437+ GetObjectRequest { store_id : "test_store" . to_string ( ) , key : "key1" . to_string ( ) } ;
438+
439+ let get_body = get_req. encode_to_vec ( ) ;
440+
441+ let get_res = client
442+ . post ( & format ! ( "{}/getObject" , base_url) )
443+ . header ( "Content-Type" , "application/octet-stream" )
444+ . header ( "Authorization" , "Bearer test_user" )
445+ . body ( get_body)
446+ . send ( )
447+ . await
448+ . expect ( "GET failed" ) ;
449+
450+ assert ! ( get_res. status( ) . is_success( ) , "GET failed: {}" , get_res. status( ) ) ;
451+ let get_bytes = get_res. bytes ( ) . await . expect ( "failed to read" ) ;
452+ let get_resp = GetObjectResponse :: decode ( & * get_bytes) . expect ( "decode failed" ) ;
453+
454+ let kv = get_resp. value . expect ( "no value" ) ;
455+ assert_eq ! ( kv. key, "key1" ) ;
456+ assert_eq ! ( kv. value, b"value1" . to_vec( ) ) ;
457+ assert_eq ! ( kv. version, 1 ) ;
458+
459+ let _ = child. kill ( ) ;
460+ }
461+
351462 #[ test]
352463 async fn test_in_memory_crud ( ) {
353464 let store = InMemoryBackendImpl :: new ( ) ;
0 commit comments