Skip to content

Commit 533406f

Browse files
authored
Ci/improvement of test (#50)
* chore: add more test cases * chore: add connector mock * feat: add end to end test * chore: add test cases
1 parent 05f7935 commit 533406f

File tree

17 files changed

+3063
-111
lines changed

17 files changed

+3063
-111
lines changed

src/backend/client.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,34 @@ impl<'a> Drop for FrontConnectionGuard<'a> {
4646
metrics::front_conn_close(self.cluster);
4747
}
4848
}
49+
50+
#[cfg(test)]
51+
mod tests {
52+
use super::*;
53+
use crate::metrics;
54+
55+
#[test]
56+
fn client_ids_are_monotonic() {
57+
let a = ClientId::new();
58+
let b = ClientId::new();
59+
assert!(b.as_u64() > a.as_u64());
60+
}
61+
62+
#[test]
63+
fn front_connection_guard_updates_metrics() {
64+
let cluster = "guard-cluster";
65+
let initial_current = metrics::front_connections_current(cluster);
66+
let initial_total = metrics::front_connections_total(cluster);
67+
68+
{
69+
let _guard = FrontConnectionGuard::new(cluster);
70+
assert_eq!(
71+
metrics::front_connections_current(cluster),
72+
initial_current + 1
73+
);
74+
}
75+
76+
assert_eq!(metrics::front_connections_current(cluster), initial_current);
77+
assert_eq!(metrics::front_connections_total(cluster), initial_total + 1);
78+
}
79+
}

src/backend/executor.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::{anyhow, Result};
4+
use async_trait::async_trait;
5+
6+
use crate::backend::client::ClientId;
7+
use crate::backend::pool::{BackendNode, ConnectionPool};
8+
9+
use super::pool::BackendRequest;
10+
11+
/// Abstraction over backend request execution so it can be mocked in tests.
12+
#[async_trait]
13+
pub trait BackendExecutor<T>: Send + Sync
14+
where
15+
T: BackendRequest,
16+
{
17+
/// Dispatch a non-blocking request for the given client.
18+
async fn dispatch(
19+
&self,
20+
node: BackendNode,
21+
client_id: ClientId,
22+
request: T,
23+
) -> Result<T::Response>;
24+
25+
/// Dispatch a request that requires an exclusive backend connection.
26+
async fn dispatch_blocking(&self, node: BackendNode, request: T) -> Result<T::Response>;
27+
}
28+
29+
/// Default executor that proxies calls through the actual connection pool.
30+
pub struct PoolBackendExecutor<T: BackendRequest> {
31+
pool: Arc<ConnectionPool<T>>,
32+
}
33+
34+
impl<T: BackendRequest> PoolBackendExecutor<T> {
35+
pub fn new(pool: Arc<ConnectionPool<T>>) -> Self {
36+
Self { pool }
37+
}
38+
39+
pub fn pool(&self) -> &Arc<ConnectionPool<T>> {
40+
&self.pool
41+
}
42+
}
43+
44+
#[async_trait]
45+
impl<T> BackendExecutor<T> for PoolBackendExecutor<T>
46+
where
47+
T: BackendRequest,
48+
{
49+
async fn dispatch(
50+
&self,
51+
node: BackendNode,
52+
client_id: ClientId,
53+
request: T,
54+
) -> Result<T::Response> {
55+
let response_rx = self.pool.dispatch(node, client_id, request).await?;
56+
match response_rx.await {
57+
Ok(result) => result,
58+
Err(_) => Err(anyhow!("backend session closed unexpectedly")),
59+
}
60+
}
61+
62+
async fn dispatch_blocking(&self, node: BackendNode, request: T) -> Result<T::Response> {
63+
let mut exclusive = self.pool.acquire_exclusive(&node);
64+
let response_rx = exclusive.send(request).await?;
65+
let outcome = response_rx.await;
66+
drop(exclusive);
67+
match outcome {
68+
Ok(result) => result,
69+
Err(_) => Err(anyhow!("backend session closed unexpectedly")),
70+
}
71+
}
72+
}

src/backend/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod client;
2+
pub mod executor;
23
pub mod pool;

src/backend/pool.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,3 +345,132 @@ impl<'a, T: BackendRequest> Drop for ExclusiveConnection<'a, T> {
345345
}
346346
}
347347
}
348+
349+
#[cfg(test)]
350+
mod tests {
351+
use super::*;
352+
use std::sync::atomic::{AtomicUsize, Ordering};
353+
use std::sync::{Arc, Mutex};
354+
355+
#[derive(Default)]
356+
struct TestConnector {
357+
started: AtomicUsize,
358+
}
359+
360+
#[async_trait]
361+
impl Connector<TestRequest> for TestConnector {
362+
async fn run_session(
363+
self: Arc<Self>,
364+
_node: BackendNode,
365+
_cluster: Arc<str>,
366+
mut rx: mpsc::Receiver<SessionCommand<TestRequest>>,
367+
) {
368+
self.started.fetch_add(1, Ordering::SeqCst);
369+
while let Some(cmd) = rx.recv().await {
370+
let _ = cmd.respond_to.send(Ok(cmd.request.payload));
371+
}
372+
}
373+
}
374+
375+
impl TestConnector {
376+
fn started(&self) -> usize {
377+
self.started.load(Ordering::SeqCst)
378+
}
379+
}
380+
381+
#[derive(Clone)]
382+
struct CallRecorder {
383+
values: Arc<Mutex<Vec<String>>>,
384+
}
385+
386+
impl CallRecorder {
387+
fn new() -> Self {
388+
Self {
389+
values: Arc::new(Mutex::new(Vec::new())),
390+
}
391+
}
392+
393+
fn record(&self, value: &str) {
394+
self.values.lock().unwrap().push(value.to_string());
395+
}
396+
397+
fn entries(&self) -> Vec<String> {
398+
self.values.lock().unwrap().clone()
399+
}
400+
}
401+
402+
#[derive(Clone)]
403+
struct TestRequest {
404+
payload: &'static str,
405+
total: CallRecorder,
406+
remote: CallRecorder,
407+
}
408+
409+
impl TestRequest {
410+
fn new(payload: &'static str, total: CallRecorder, remote: CallRecorder) -> Self {
411+
Self {
412+
payload,
413+
total,
414+
remote,
415+
}
416+
}
417+
}
418+
419+
impl BackendRequest for TestRequest {
420+
type Response = &'static str;
421+
422+
fn apply_total_tracker(&mut self, cluster: &str) {
423+
self.total.record(cluster);
424+
}
425+
426+
fn apply_remote_tracker(&mut self, cluster: &str) {
427+
self.remote.record(cluster);
428+
}
429+
}
430+
431+
fn cluster_name() -> Arc<str> {
432+
Arc::<str>::from("cluster-backend-tests".to_string())
433+
}
434+
435+
fn backend_node() -> BackendNode {
436+
BackendNode::new("127.0.0.1:7000".into())
437+
}
438+
439+
#[tokio::test(flavor = "current_thread")]
440+
async fn dispatch_sends_request_and_tracks_cluster() {
441+
let connector = Arc::new(TestConnector::default());
442+
let total = CallRecorder::new();
443+
let remote = CallRecorder::new();
444+
let request = TestRequest::new("ok", total.clone(), remote.clone());
445+
let pool = ConnectionPool::with_slots(cluster_name(), connector.clone(), 2);
446+
let node = backend_node();
447+
let rx = pool
448+
.dispatch(node.clone(), ClientId::new(), request)
449+
.await
450+
.expect("dispatch");
451+
let response = rx.await.expect("oneshot").expect("response");
452+
assert_eq!(response, "ok");
453+
assert_eq!(connector.started(), 1);
454+
assert_eq!(total.entries(), vec!["cluster-backend-tests".to_string()]);
455+
assert_eq!(remote.entries(), vec!["cluster-backend-tests".to_string()]);
456+
}
457+
458+
#[tokio::test(flavor = "current_thread")]
459+
async fn exclusive_connections_are_reused() {
460+
let connector = Arc::new(TestConnector::default());
461+
let pool = ConnectionPool::<TestRequest>::with_slots(cluster_name(), connector.clone(), 1);
462+
let node = backend_node();
463+
464+
{
465+
let _conn = pool.acquire_exclusive(&node);
466+
tokio::task::yield_now().await;
467+
assert_eq!(connector.started(), 1);
468+
}
469+
470+
{
471+
let _conn = pool.acquire_exclusive(&node);
472+
tokio::task::yield_now().await;
473+
assert_eq!(connector.started(), 1);
474+
}
475+
}
476+
}

0 commit comments

Comments
 (0)