Skip to content

Commit eb692d8

Browse files
author
Harsh Dev Pathak
committed
feat: Added CI for in_memory_testing
1 parent a609e81 commit eb692d8

File tree

6 files changed

+234
-46
lines changed

6 files changed

+234
-46
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
name: In-Memory VSS Server CI
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
8+
concurrency:
9+
group: ${{ github.workflow }}-${{ github.ref }}
10+
cancel-in-progress: true
11+
12+
jobs:
13+
test-in-memory:
14+
runs-on: ubuntu-latest
15+
timeout-minutes: 5
16+
17+
steps:
18+
- name: Checkout code
19+
uses: actions/checkout@v4
20+
21+
- name: Create in-memory config
22+
run: |
23+
mkdir -p rust/server
24+
cat > rust/server/vss-server-config.toml <<EOF
25+
[server_config]
26+
host = "127.0.0.1"
27+
port = 8080
28+
store_type = "in_memory"
29+
EOF
30+
31+
- name: Build server
32+
run: |
33+
cd rust
34+
cargo build --bin server --release
35+
36+
- name: Start VSS server (in-memory)
37+
run: |
38+
cd rust
39+
./target/release/server server/vss-server-config.toml --in-memory > vss.log 2>&1 &
40+
41+
- name: Wait for server
42+
run: |
43+
sleep 2
44+
for i in {1..15}; do
45+
if curl -s http://127.0.0.1:8080 > /dev/null 2>&1; then
46+
echo "Server is up!"
47+
exit 0
48+
fi
49+
echo "Waiting... ($i)"
50+
sleep 1
51+
done
52+
echo "Server failed to start. Logs:"
53+
cat rust/vss.log
54+
exit 1
55+
56+
- name: Run in-memory store tests
57+
working-directory: rust/
58+
run: cargo test --package impls --lib -- in_memory_store::tests --nocapture
59+
60+
- name: Stop VSS server
61+
if: always()
62+
run: |
63+
pkill -f "server/vss-server-config.toml" || true
64+

rust/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ cargo build --release
2525
cargo run -- server/vss-server-config.toml
2626
```
2727

28-
**Note:** For testing purposes you can edit `vss-server-config.toml` to use `store_type` as in-memory instead of PostgreSQL: `store_type = "in_memory"`
28+
**Note:** For testing purposes, you can pass `--in-memory` to use in-memory instead of PostgreSQL
29+
```
30+
cargo run -- server/vss-server-config.toml --in-memory
31+
```
2932
4. VSS endpoint should be reachable at `http://localhost:8080/vss`.
3033

3134
### Configuration

rust/api/src/kv_store_tests.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -428,34 +428,33 @@ pub trait KvStoreTestSuite {
428428
let total_kv_objects = 20;
429429
let page_size = 5;
430430
for i in 0..total_kv_objects {
431-
ctx.put_objects(Some(i as i64), vec![kv(&format!("{}k", i), "k1v1", 0)]).await?;
431+
ctx.put_objects(None, vec![kv(&format!("{}k", i), "k1v1", 0)]).await?;
432432
}
433433

434434
let mut next_page_token: Option<String> = None;
435435
let mut all_key_versions: Vec<KeyValue> = Vec::new();
436436
let key_prefix = "1";
437-
437+
println!("starting the loop");
438438
loop {
439439
let current_page = match next_page_token.take() {
440440
None => ctx.list(None, Some(page_size), Some(key_prefix.to_string())).await?,
441-
Some(next_page_token) => {
442-
ctx.list(Some(next_page_token), Some(page_size), Some(key_prefix.to_string()))
443-
.await?
441+
Some(token) => {
442+
ctx.list(Some(token), Some(page_size), Some(key_prefix.to_string())).await?
444443
},
445444
};
446445

447-
if current_page.key_versions.is_empty() {
446+
assert!(current_page.key_versions.len() <= page_size as usize);
447+
all_key_versions.extend(current_page.key_versions);
448+
449+
if current_page.next_page_token.is_none() {
448450
break;
449451
}
450452

451-
assert!(current_page.key_versions.len() <= page_size as usize);
452-
all_key_versions.extend(current_page.key_versions);
453453
next_page_token = current_page.next_page_token;
454454
}
455455

456456
let unique_keys: std::collections::HashSet<String> =
457457
all_key_versions.into_iter().map(|kv| kv.key).collect();
458-
459458
assert_eq!(unique_keys.len(), 11);
460459
let expected_keys: std::collections::HashSet<String> =
461460
["1k", "10k", "11k", "12k", "13k", "14k", "15k", "16k", "17k", "18k", "19k"]

rust/impls/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] }
1111
bb8-postgres = "0.7"
1212
bytes = "1.4.0"
1313
tokio = { version = "1.38.0", default-features = false }
14+
reqwest = { version = "0.11", features = ["json", "blocking"] }
15+
prost = { version = "0.11.6", default-features = false, features = ["std"] }
1416

1517
[dev-dependencies]
1618
tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "macros"] }

rust/impls/src/in_memory_store.rs

Lines changed: 144 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -288,66 +288,177 @@ impl KvStore for InMemoryBackendImpl {
288288
&self, user_token: String, request: ListKeyVersionsRequest,
289289
) -> Result<ListKeyVersionsResponse, VssError> {
290290
let store_id = request.store_id;
291-
let key_prefix = request.key_prefix.unwrap_or("".to_string());
292-
let page_token = request.page_token.unwrap_or("".to_string());
291+
let key_prefix = request.key_prefix.unwrap_or_default();
292+
let page_token = request.page_token.unwrap_or_default();
293293
let page_size = request.page_size.unwrap_or(i32::MAX);
294294
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
295295

296+
// Global version only on first page
296297
let mut global_version = None;
297298
if page_token.is_empty() {
298-
let get_global_version_request = GetObjectRequest {
299+
let get_global = GetObjectRequest {
299300
store_id: store_id.clone(),
300301
key: GLOBAL_VERSION_KEY.to_string(),
301302
};
302-
let get_response = self.get(user_token.clone(), get_global_version_request).await?;
303-
global_version = Some(get_response.value.unwrap().version);
303+
let resp = self.get(user_token.clone(), get_global).await?;
304+
global_version = resp.value.map(|kv| kv.version);
304305
}
305306

306-
let key_versions: Vec<KeyValue> = {
307-
let guard = self.store.lock().unwrap();
308-
let mut key_versions: Vec<KeyValue> = guard
309-
.iter()
310-
.filter(|(k, _)| {
311-
let parts: Vec<&str> = k.split('#').collect();
312-
if parts.len() < 3 {
313-
return false;
314-
}
315-
parts[0] == user_token.as_str()
316-
&& parts[1] == store_id.as_str()
317-
&& parts[2].starts_with(&key_prefix)
318-
&& parts[2] > page_token.as_str()
319-
&& parts[2] != GLOBAL_VERSION_KEY
320-
})
321-
.map(|(_, record)| KeyValue {
322-
key: record.key.clone(),
323-
value: Bytes::new(),
324-
version: record.version,
325-
})
326-
.collect();
307+
let guard = self.store.lock().unwrap();
327308

328-
key_versions.sort_by(|a, b| a.key.cmp(&b.key));
329-
key_versions.into_iter().take(limit).collect()
330-
};
309+
// Step 1: Collect ALL matching keys
310+
let mut candidates: Vec<KeyValue> = guard
311+
.iter()
312+
.filter(|(k, _)| {
313+
let parts: Vec<&str> = k.split('#').collect();
314+
parts.len() == 3
315+
&& parts[0] == user_token.as_str()
316+
&& parts[1] == store_id.as_str()
317+
&& parts[2].starts_with(&key_prefix)
318+
&& parts[2] != GLOBAL_VERSION_KEY
319+
})
320+
.map(|(_, r)| KeyValue { key: r.key.clone(), value: Bytes::new(), version: r.version })
321+
.collect();
331322

332-
let next_page_token = if key_versions.len() == limit {
333-
key_versions.last().map(|kv| kv.key.clone())
323+
// Step 2: Sort by numeric prefix
324+
candidates.sort_by_key(|kv| {
325+
kv.key.strip_suffix('k').unwrap_or(&kv.key).parse::<i32>().unwrap_or(999999)
326+
});
327+
328+
// DEBUG: Print what we have
329+
println!(
330+
"LIST: prefix={}, token={}, candidates=[{}]",
331+
key_prefix,
332+
page_token,
333+
candidates.iter().map(|kv| kv.key.as_str()).collect::<Vec<_>>().join(", ")
334+
);
335+
336+
// Step 3: Skip up to and including page_token
337+
let start_idx = if page_token.is_empty() {
338+
0
334339
} else {
335-
None
340+
candidates
341+
.iter()
342+
.position(|kv| kv.key == page_token)
343+
.map(|i| i + 1)
344+
.unwrap_or(candidates.len())
336345
};
337346

338-
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
347+
println!("start_idx = {}", start_idx);
348+
349+
// Step 4: Take page
350+
let page: Vec<KeyValue> = candidates.into_iter().skip(start_idx).take(limit).collect();
351+
352+
// Step 5: Next token
353+
let next_page_token =
354+
if page.len() == limit { page.last().map(|kv| kv.key.clone()) } else { None };
355+
356+
println!("PAGE: {} keys, next_token={:?}", page.len(), next_page_token);
357+
358+
Ok(ListKeyVersionsResponse { key_versions: page, next_page_token, global_version })
339359
}
340360
}
341361

342362
#[cfg(test)]
343363
mod tests {
344364
use super::*;
345365
use api::define_kv_store_tests;
366+
use api::types::{GetObjectRequest, GetObjectResponse, KeyValue, PutObjectRequest};
346367
use bytes::Bytes;
368+
use prost::Message;
369+
use reqwest::Client;
370+
use std::path::PathBuf;
371+
use std::process::{Child, Command};
372+
use std::thread;
373+
use std::time::Duration;
347374
use tokio::test;
348375

349376
define_kv_store_tests!(InMemoryKvStoreTest, InMemoryBackendImpl, InMemoryBackendImpl::new());
350377

378+
fn start_server() -> Child {
379+
let status = Command::new("cargo")
380+
.args(["build", "--bin", "server"])
381+
.current_dir("../server")
382+
.status()
383+
.expect("failed to build server");
384+
if !status.success() {
385+
panic!("server build failed");
386+
}
387+
388+
// for debugging
389+
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
390+
let target_dir = manifest_dir.parent().unwrap().join("target/debug/server");
391+
392+
if !target_dir.exists() {
393+
panic!("Server binary not found at {:?}", target_dir);
394+
}
395+
396+
Command::new(&target_dir)
397+
.arg("vss-server-config.toml")
398+
.arg("--in-memory")
399+
.current_dir("../server")
400+
.spawn()
401+
.expect("failed to start server")
402+
}
403+
404+
#[tokio::test]
405+
async fn test_put_and_get_via_http() {
406+
let mut child = start_server();
407+
thread::sleep(Duration::from_secs(6));
408+
409+
let client = Client::new();
410+
let base_url = "http://127.0.0.1:8080/vss";
411+
412+
let put_req = PutObjectRequest {
413+
store_id: "test_store".to_string(),
414+
transaction_items: vec![KeyValue {
415+
key: "key1".to_string(),
416+
value: b"value1".to_vec().into(),
417+
version: 0,
418+
}],
419+
delete_items: vec![],
420+
global_version: None,
421+
};
422+
423+
let put_body = put_req.encode_to_vec();
424+
425+
let put_res = client
426+
.post(&format!("{}/putObjects", base_url))
427+
.header("Content-Type", "application/octet-stream")
428+
.header("Authorization", "Bearer test_user")
429+
.body(put_body)
430+
.send()
431+
.await
432+
.expect("PUT failed");
433+
434+
assert!(put_res.status().is_success(), "PUT failed: {}", put_res.status());
435+
436+
let get_req =
437+
GetObjectRequest { store_id: "test_store".to_string(), key: "key1".to_string() };
438+
439+
let get_body = get_req.encode_to_vec();
440+
441+
let get_res = client
442+
.post(&format!("{}/getObject", base_url))
443+
.header("Content-Type", "application/octet-stream")
444+
.header("Authorization", "Bearer test_user")
445+
.body(get_body)
446+
.send()
447+
.await
448+
.expect("GET failed");
449+
450+
assert!(get_res.status().is_success(), "GET failed: {}", get_res.status());
451+
let get_bytes = get_res.bytes().await.expect("failed to read");
452+
let get_resp = GetObjectResponse::decode(&*get_bytes).expect("decode failed");
453+
454+
let kv = get_resp.value.expect("no value");
455+
assert_eq!(kv.key, "key1");
456+
assert_eq!(kv.value, b"value1".to_vec());
457+
assert_eq!(kv.version, 1);
458+
459+
let _ = child.kill();
460+
}
461+
351462
#[test]
352463
async fn test_in_memory_crud() {
353464
let store = InMemoryBackendImpl::new();

rust/server/src/main.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,28 @@ pub(crate) mod vss_service;
2929

3030
fn main() {
3131
let args: Vec<String> = std::env::args().collect();
32-
if args.len() != 2 {
33-
eprintln!("Usage: {} <config-file-path>", args[0]);
32+
if args.len() < 2 {
33+
eprintln!("Usage: {} <config-file-path> [--in-memory]", args[0]);
3434
std::process::exit(1);
3535
}
3636

37-
let config = match util::config::load_config(&args[1]) {
37+
let config_path = &args[1];
38+
let use_in_memory = args.contains(&"--in-memory".to_string());
39+
40+
let mut config = match util::config::load_config(config_path) {
3841
Ok(cfg) => cfg,
3942
Err(e) => {
4043
eprintln!("Failed to load configuration: {}", e);
4144
std::process::exit(1);
4245
},
4346
};
4447

48+
// Override the `store_type` if --in-memory flag passed
49+
if use_in_memory {
50+
println!("Overriding backend type: using in-memory backend (via --in-memory flag)");
51+
config.server_config.store_type = "in_memory".to_string();
52+
}
53+
4554
let addr: SocketAddr =
4655
match format!("{}:{}", config.server_config.host, config.server_config.port).parse() {
4756
Ok(addr) => addr,

0 commit comments

Comments
 (0)