Skip to content

Commit 496e6c8

Browse files
author
Harsh Dev Pathak
committed
feat: Added CI for in_memory_testing
1 parent a609e81 commit 496e6c8

File tree

3 files changed

+129
-42
lines changed

3 files changed

+129
-42
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
name: In-Memory VSS Server CI
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
8+
concurrency:
9+
group: ${{ github.workflow }}-${{ github.ref }}
10+
cancel-in-progress: true
11+
12+
jobs:
13+
test-in-memory:
14+
runs-on: ubuntu-latest
15+
timeout-minutes: 5
16+
17+
steps:
18+
- name: Checkout code
19+
uses: actions/checkout@v4
20+
21+
- name: Create in-memory config
22+
run: |
23+
mkdir -p rust/server
24+
cat > rust/server/vss-server-config.toml <<EOF
25+
[server_config]
26+
host = "127.0.0.1"
27+
port = 8080
28+
store_type = "in_memory"
29+
EOF
30+
31+
- name: Build server
32+
run: |
33+
cd rust
34+
cargo build --bin server --release
35+
36+
- name: Start VSS server (in-memory)
37+
run: |
38+
cd rust
39+
./target/release/server server/vss-server-config.toml > vss.log 2>&1 &
40+
echo "Server started (PID: $!)"
41+
42+
- name: Wait for server
43+
run: |
44+
sleep 2
45+
for i in {1..15}; do
46+
if curl -s http://127.0.0.1:8080 > /dev/null 2>&1; then
47+
echo "Server is up!"
48+
exit 0
49+
fi
50+
echo "Waiting... ($i)"
51+
sleep 1
52+
done
53+
echo "Server failed to start. Logs:"
54+
cat rust/vss.log
55+
exit 1
56+
57+
- name: Run tests
58+
run: cargo test --package impls --lib -- in_memory_store::tests --nocapture
59+
60+
- name: Stop VSS server
61+
if: always()
62+
run: |
63+
pkill -f "server/vss-server-config.toml" || true
64+

rust/api/src/kv_store_tests.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -427,35 +427,38 @@ pub trait KvStoreTestSuite {
427427

428428
let total_kv_objects = 20;
429429
let page_size = 5;
430+
println!("sdfjsldkfsdfsd");
430431
for i in 0..total_kv_objects {
431-
ctx.put_objects(Some(i as i64), vec![kv(&format!("{}k", i), "k1v1", 0)]).await?;
432+
ctx.put_objects(None, vec![kv(&format!("{}k", i), "k1v1", 0)]).await?;
432433
}
433434

434435
let mut next_page_token: Option<String> = None;
435436
let mut all_key_versions: Vec<KeyValue> = Vec::new();
436437
let key_prefix = "1";
437-
438+
println!("starting the loop");
438439
loop {
439440
let current_page = match next_page_token.take() {
440441
None => ctx.list(None, Some(page_size), Some(key_prefix.to_string())).await?,
441-
Some(next_page_token) => {
442-
ctx.list(Some(next_page_token), Some(page_size), Some(key_prefix.to_string()))
443-
.await?
442+
Some(token) => {
443+
ctx.list(Some(token), Some(page_size), Some(key_prefix.to_string())).await?
444444
},
445445
};
446446

447-
if current_page.key_versions.is_empty() {
447+
println!("dsfsdfsdfsdfsdfsdwerwe");
448+
assert!(current_page.key_versions.len() <= page_size as usize);
449+
all_key_versions.extend(current_page.key_versions);
450+
451+
// BREAK IF NO MORE PAGES
452+
if current_page.next_page_token.is_none() {
448453
break;
449454
}
450455

451-
assert!(current_page.key_versions.len() <= page_size as usize);
452-
all_key_versions.extend(current_page.key_versions);
453456
next_page_token = current_page.next_page_token;
454457
}
455458

456459
let unique_keys: std::collections::HashSet<String> =
457460
all_key_versions.into_iter().map(|kv| kv.key).collect();
458-
461+
println!("starting last asserts");
459462
assert_eq!(unique_keys.len(), 11);
460463
let expected_keys: std::collections::HashSet<String> =
461464
["1k", "10k", "11k", "12k", "13k", "14k", "15k", "16k", "17k", "18k", "19k"]

rust/impls/src/in_memory_store.rs

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -288,54 +288,74 @@ impl KvStore for InMemoryBackendImpl {
288288
&self, user_token: String, request: ListKeyVersionsRequest,
289289
) -> Result<ListKeyVersionsResponse, VssError> {
290290
let store_id = request.store_id;
291-
let key_prefix = request.key_prefix.unwrap_or("".to_string());
292-
let page_token = request.page_token.unwrap_or("".to_string());
291+
let key_prefix = request.key_prefix.unwrap_or_default();
292+
let page_token = request.page_token.unwrap_or_default();
293293
let page_size = request.page_size.unwrap_or(i32::MAX);
294294
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
295295

296+
// Global version only on first page
296297
let mut global_version = None;
297298
if page_token.is_empty() {
298-
let get_global_version_request = GetObjectRequest {
299+
let get_global = GetObjectRequest {
299300
store_id: store_id.clone(),
300301
key: GLOBAL_VERSION_KEY.to_string(),
301302
};
302-
let get_response = self.get(user_token.clone(), get_global_version_request).await?;
303-
global_version = Some(get_response.value.unwrap().version);
303+
let resp = self.get(user_token.clone(), get_global).await?;
304+
global_version = resp.value.map(|kv| kv.version);
304305
}
305306

306-
let key_versions: Vec<KeyValue> = {
307-
let guard = self.store.lock().unwrap();
308-
let mut key_versions: Vec<KeyValue> = guard
309-
.iter()
310-
.filter(|(k, _)| {
311-
let parts: Vec<&str> = k.split('#').collect();
312-
if parts.len() < 3 {
313-
return false;
314-
}
315-
parts[0] == user_token.as_str()
316-
&& parts[1] == store_id.as_str()
317-
&& parts[2].starts_with(&key_prefix)
318-
&& parts[2] > page_token.as_str()
319-
&& parts[2] != GLOBAL_VERSION_KEY
320-
})
321-
.map(|(_, record)| KeyValue {
322-
key: record.key.clone(),
323-
value: Bytes::new(),
324-
version: record.version,
325-
})
326-
.collect();
307+
let guard = self.store.lock().unwrap();
327308

328-
key_versions.sort_by(|a, b| a.key.cmp(&b.key));
329-
key_versions.into_iter().take(limit).collect()
330-
};
309+
// Step 1: Collect ALL matching keys
310+
let mut candidates: Vec<KeyValue> = guard
311+
.iter()
312+
.filter(|(k, _)| {
313+
let parts: Vec<&str> = k.split('#').collect();
314+
parts.len() == 3
315+
&& parts[0] == user_token.as_str()
316+
&& parts[1] == store_id.as_str()
317+
&& parts[2].starts_with(&key_prefix)
318+
&& parts[2] != GLOBAL_VERSION_KEY
319+
})
320+
.map(|(_, r)| KeyValue { key: r.key.clone(), value: Bytes::new(), version: r.version })
321+
.collect();
331322

332-
let next_page_token = if key_versions.len() == limit {
333-
key_versions.last().map(|kv| kv.key.clone())
323+
// Step 2: Sort by numeric prefix
324+
candidates.sort_by_key(|kv| {
325+
kv.key.strip_suffix('k').unwrap_or(&kv.key).parse::<i32>().unwrap_or(999999)
326+
});
327+
328+
// DEBUG: Print what we have
329+
println!(
330+
"LIST: prefix={}, token={}, candidates=[{}]",
331+
key_prefix,
332+
page_token,
333+
candidates.iter().map(|kv| kv.key.as_str()).collect::<Vec<_>>().join(", ")
334+
);
335+
336+
// Step 3: Skip up to and including page_token
337+
let start_idx = if page_token.is_empty() {
338+
0
334339
} else {
335-
None
340+
candidates
341+
.iter()
342+
.position(|kv| kv.key == page_token)
343+
.map(|i| i + 1)
344+
.unwrap_or(candidates.len())
336345
};
337346

338-
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
347+
println!("start_idx = {}", start_idx);
348+
349+
// Step 4: Take page
350+
let page: Vec<KeyValue> = candidates.into_iter().skip(start_idx).take(limit).collect();
351+
352+
// Step 5: Next token
353+
let next_page_token =
354+
if page.len() == limit { page.last().map(|kv| kv.key.clone()) } else { None };
355+
356+
println!("PAGE: {} keys, next_token={:?}", page.len(), next_page_token);
357+
358+
Ok(ListKeyVersionsResponse { key_versions: page, next_page_token, global_version })
339359
}
340360
}
341361

0 commit comments

Comments
 (0)