Skip to content

Commit 445e7dc

Browse files
authored
introduce tests for isolated services (#655)
This branch introduces a new style of tests to the proxy codebase, and an initial implementation of new test support code for writing tests in this style. Rather than running an entire proxy alongside a simulated control plane, and binding actual network socket, as the present integration tests do, the new tests run individual proxy components in isolation, using simpler mock implementations of components like name resolution and IO. This approach has a few advantages. It should reduce flakiness significantly, since we don't perform any IO and don't need to synchronize events between multiple threads running test support servers/controllers (the new mocks all just synchronously provide values immediately). We can run everything in a single thread using Tokio's basic scheduler. Additionally, since we are testing individual components in isolation, these tests can live within the crate for the part of the proxy being tested. This means we have access to more internal state to make assertions on, rather than having to make assertions on side effects like metrics. I've added a new `linkerd2-app-test` crate that adds some initial mocks for this kind of testing, including a simple mock resolver and a mock connector using `tokio-test`'s mock IO. I've also added a simple "hello world" test for the outbound TCP stack written in this style. The intention is to use the new test support code for testing the recent changes adding TCP mTLS and load-balancing. In order to make this code more testable, it was necessary to move a few things around. In particular, the outbound TCP balancer stack is now constructed in a separate method, rather than in `Config::build_server`, so that it can be tested without requiring all the dependencies of the full HTTP server stack. A few other similar changes were also necessary. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 12011fb commit 445e7dc

File tree

15 files changed

+606
-76
lines changed

15 files changed

+606
-76
lines changed

Cargo.lock

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,7 @@ dependencies = [
906906
"hyper",
907907
"linkerd2-app",
908908
"linkerd2-app-core",
909+
"linkerd2-app-test",
909910
"linkerd2-metrics",
910911
"linkerd2-proxy-api",
911912
"quickcheck",
@@ -932,6 +933,7 @@ dependencies = [
932933
"http 0.2.1",
933934
"indexmap",
934935
"linkerd2-app-core",
936+
"linkerd2-app-test",
935937
"linkerd2-identity",
936938
"linkerd2-retry",
937939
"pin-project",
@@ -949,6 +951,28 @@ dependencies = [
949951
"tokio",
950952
]
951953

954+
[[package]]
955+
name = "linkerd2-app-test"
956+
version = "0.1.0"
957+
dependencies = [
958+
"futures 0.3.5",
959+
"h2 0.2.6",
960+
"http 0.2.1",
961+
"http-body",
962+
"hyper",
963+
"linkerd2-app-core",
964+
"linkerd2-proxy-api-resolve",
965+
"linkerd2-proxy-core",
966+
"linkerd2-proxy-transport",
967+
"regex 0.1.80",
968+
"tokio",
969+
"tokio-test",
970+
"tower",
971+
"tracing",
972+
"tracing-futures",
973+
"tracing-subscriber",
974+
]
975+
952976
[[package]]
953977
name = "linkerd2-buffer"
954978
version = "0.1.0"
@@ -1663,12 +1687,24 @@ dependencies = [
16631687
"kernel32-sys",
16641688
"libc",
16651689
"log",
1666-
"miow",
1690+
"miow 0.2.1",
16671691
"net2",
16681692
"slab",
16691693
"winapi 0.2.8",
16701694
]
16711695

1696+
[[package]]
1697+
name = "mio-named-pipes"
1698+
version = "0.1.7"
1699+
source = "registry+https://github.com/rust-lang/crates.io-index"
1700+
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
1701+
dependencies = [
1702+
"log",
1703+
"mio",
1704+
"miow 0.3.5",
1705+
"winapi 0.3.8",
1706+
]
1707+
16721708
[[package]]
16731709
name = "mio-uds"
16741710
version = "0.6.7"
@@ -1692,6 +1728,16 @@ dependencies = [
16921728
"ws2_32-sys",
16931729
]
16941730

1731+
[[package]]
1732+
name = "miow"
1733+
version = "0.3.5"
1734+
source = "registry+https://github.com/rust-lang/crates.io-index"
1735+
checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e"
1736+
dependencies = [
1737+
"socket2",
1738+
"winapi 0.3.8",
1739+
]
1740+
16951741
[[package]]
16961742
name = "multimap"
16971743
version = "0.8.1"
@@ -2508,6 +2554,7 @@ dependencies = [
25082554
"libc",
25092555
"memchr 2.3.3",
25102556
"mio",
2557+
"mio-named-pipes",
25112558
"mio-uds",
25122559
"num_cpus",
25132560
"parking_lot",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ members = [
99
"linkerd/app/integration",
1010
"linkerd/app/outbound",
1111
"linkerd/app/profiling",
12+
"linkerd/app/test",
1213
"linkerd/app",
1314
"linkerd/cache",
1415
"linkerd/buffer",

linkerd/app/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ pub use linkerd2_drain as drain;
1818
pub use linkerd2_error::{Error, Never, Recover};
1919
pub use linkerd2_exp_backoff as exp_backoff;
2020
pub use linkerd2_http_metrics as http_metrics;
21-
pub use linkerd2_metrics as metrics;
2221
pub use linkerd2_opencensus as opencensus;
2322
pub use linkerd2_reconnect as reconnect;
2423
pub use linkerd2_request_filter as request_filter;
@@ -37,6 +36,7 @@ pub mod dst;
3736
pub mod errors;
3837
pub mod handle_time;
3938
pub mod metric_labels;
39+
pub mod metrics;
4040
pub mod proxy;
4141
pub mod retry;
4242
pub mod serve;

linkerd/app/src/metrics.rs renamed to linkerd/app/core/src/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
pub use linkerd2_app_core::{
1+
pub use crate::{
22
classify::Class,
33
errors, handle_time, http_metrics as metrics,
44
metric_labels::{ControlLabels, EndpointLabels, RouteLabels},
5-
metrics::FmtMetrics,
65
opencensus, proxy, stack_metrics, telemetry, transport, ControlHttpMetrics, ProxyMetrics,
76
};
7+
pub use linkerd2_metrics::*;
88
use std::time::{Duration, SystemTime};
99

1010
pub struct Metrics {

linkerd/app/integration/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ linkerd2-app = { path = "..", features = ["mock-orig-dst"] }
2828
linkerd2-app-core = { path = "../core", features = ["mock-orig-dst"] }
2929
linkerd2-metrics = { path = "../../metrics", features = ["test_util"] }
3030
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13", features = ["arbitrary"] }
31+
linkerd2-app-test = { path = "../test" }
3132
regex = "0.1"
3233
socket2 = "0.3.12"
3334
quickcheck = { version = "0.9", default-features = false }

linkerd/app/integration/src/lib.rs

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ pub use futures::{future, FutureExt, TryFuture, TryFutureExt};
1212

1313
pub use http::{HeaderMap, Request, Response, StatusCode};
1414
pub use http_body::Body as HttpBody;
15-
pub use linkerd2_app as app;
16-
pub use linkerd2_app_core::drain;
15+
pub use linkerd2_app::{
16+
self as app,
17+
core::{drain, Addr},
18+
};
19+
pub use linkerd2_app_test::*;
1720
use socket2::Socket;
1821
pub use std::collections::HashMap;
1922
use std::fmt;
@@ -39,31 +42,6 @@ pub const DEFAULT_TEST_PATIENCE: Duration = Duration::from_millis(15);
3942

4043
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
4144

42-
/// By default, disable logging in modules that are expected to error in tests.
43-
const DEFAULT_LOG: &'static str = "error,\
44-
linkerd2_proxy_http=off,\
45-
linkerd2_proxy_transport=off";
46-
47-
pub fn trace_subscriber() -> (Dispatch, app::core::trace::Handle) {
48-
use std::env;
49-
let log_level = env::var("LINKERD2_PROXY_LOG")
50-
.or_else(|_| env::var("RUST_LOG"))
51-
.unwrap_or_else(|_| DEFAULT_LOG.to_owned());
52-
env::set_var("RUST_LOG", &log_level);
53-
env::set_var("LINKERD2_PROXY_LOG", &log_level);
54-
let log_format = env::var("LINKERD2_PROXY_LOG_FORMAT").unwrap_or_else(|_| "PLAIN".to_string());
55-
env::set_var("LINKERD2_PROXY_LOG_FORMAT", &log_format);
56-
// This may fail, since the global log compat layer may have been
57-
// initialized by another test.
58-
let _ = app::core::trace::init_log_compat();
59-
app::core::trace::with_filter_and_format(&log_level, &log_format)
60-
}
61-
62-
pub fn trace_init() -> tracing::dispatcher::DefaultGuard {
63-
let (d, _) = trace_subscriber();
64-
tracing::dispatcher::set_default(&d)
65-
}
66-
6745
/// Retry an assertion up to a specified number of times, waiting
6846
/// `RUST_TEST_PATIENCE_MS` between retries.
6947
///

linkerd/app/outbound/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ features = [
3131

3232
[dev-dependencies]
3333
quickcheck = { version = "0.9", default-features = false }
34+
linkerd2-app-test = { path = "../test" }
35+
tokio = { version = "0.2", features = ["full", "macros"]}

linkerd/app/outbound/src/lib.rs

Lines changed: 91 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use tracing::{info, info_span};
3535
pub mod endpoint;
3636
mod prevent_loop;
3737
mod require_identity_on_endpoint;
38+
#[cfg(test)]
39+
mod tests;
3840

3941
use self::prevent_loop::PreventLoop;
4042
use self::require_identity_on_endpoint::MakeRequireIdentityLayer;
@@ -357,6 +359,82 @@ impl Config {
357359
.into_inner()
358360
}
359361

362+
/// Constructs a TCP load balancer.
363+
pub fn build_tcp_balance<C, E, I>(
364+
&self,
365+
tcp_connect: &C,
366+
resolve: E,
367+
prevent_loop: PreventLoop,
368+
metrics: &ProxyMetrics,
369+
) -> impl tower::Service<
370+
SocketAddr,
371+
Error = Error,
372+
Future = impl Unpin + Send + 'static,
373+
Response = impl tower::Service<
374+
I,
375+
Response = (),
376+
Future = impl Unpin + Send + 'static,
377+
Error = Error,
378+
> + Unpin
379+
+ Clone
380+
+ Send
381+
+ 'static,
382+
> + Unpin
383+
+ Clone
384+
+ Send
385+
+ Sync
386+
+ 'static
387+
where
388+
C: tower::Service<TcpEndpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
389+
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
390+
C::Future: Unpin + Send,
391+
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
392+
E::Future: Unpin + Send,
393+
E::Resolution: Unpin + Send,
394+
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
395+
{
396+
let ProxyConfig {
397+
dispatch_timeout,
398+
cache_max_idle_age,
399+
buffer_capacity,
400+
..
401+
} = self.proxy;
402+
svc::stack(tcp_connect.clone())
403+
.push_make_thunk()
404+
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
405+
.push(admit::AdmitLayer::new(prevent_loop))
406+
.check_make_service::<TcpEndpoint, ()>()
407+
.push(discover::resolve(map_endpoint::Resolve::new(
408+
endpoint::FromMetadata,
409+
resolve,
410+
)))
411+
.push(discover::buffer(1_000, cache_max_idle_age))
412+
.push_map_target(Addr::from)
413+
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
414+
.push_fallback_with_predicate(
415+
svc::stack(tcp_connect.clone())
416+
.push_make_thunk()
417+
.push(admit::AdmitLayer::new(prevent_loop))
418+
.push_map_target(TcpEndpoint::from)
419+
.instrument(|_: &SocketAddr| info_span!("forward")),
420+
is_discovery_rejected,
421+
)
422+
.into_new_service()
423+
.check_new_service::<SocketAddr, ()>()
424+
.cache(
425+
svc::layers().push_on_response(
426+
svc::layers()
427+
.push_failfast(dispatch_timeout)
428+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
429+
.push(metrics.stack.layer(stack_labels("tcp"))),
430+
),
431+
)
432+
.spawn_buffer(buffer_capacity)
433+
.check_make_service::<SocketAddr, ()>()
434+
.push(svc::layer::mk(tcp::Forward::new))
435+
.instrument(|a: &SocketAddr| info_span!("tcp", dst = %a))
436+
}
437+
360438
pub async fn build_server<E, R, C, H, S>(
361439
self,
362440
listen_addr: std::net::SocketAddr,
@@ -398,17 +476,21 @@ impl Config {
398476
{
399477
let ProxyConfig {
400478
server: ServerConfig { h2_settings, .. },
401-
disable_protocol_detection_for_ports: skip_detect,
479+
disable_protocol_detection_for_ports: ref skip_detect,
402480
dispatch_timeout,
403481
max_in_flight_requests,
404482
detect_protocol_timeout,
405-
cache_max_idle_age,
406-
buffer_capacity,
407483
..
408484
} = self.proxy;
409485
let canonicalize_timeout = self.canonicalize_timeout;
410486
let prevent_loop = PreventLoop::from(listen_addr.port());
411487

488+
// Load balances TCP streams that cannot be decoded as HTTP.
489+
let tcp_balance =
490+
svc::stack(self.build_tcp_balance(&tcp_connect, resolve, prevent_loop, &metrics))
491+
.push_map_target(|a: listen::Addrs| a.target_addr())
492+
.into_inner();
493+
412494
let http_admit_request = svc::layers()
413495
// Limits the number of in-flight requests.
414496
.push_concurrency_limit(max_in_flight_requests)
@@ -448,43 +530,6 @@ impl Config {
448530
.into_inner()
449531
.into_make_service();
450532

451-
// Load balances TCP streams that cannot be decoded as HTTP.
452-
let tcp_balance = svc::stack(tcp_connect.clone())
453-
.push_make_thunk()
454-
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
455-
.push(admit::AdmitLayer::new(prevent_loop))
456-
.check_make_service::<TcpEndpoint, ()>()
457-
.push(discover::resolve(map_endpoint::Resolve::new(
458-
endpoint::FromMetadata,
459-
resolve,
460-
)))
461-
.push(discover::buffer(1_000, cache_max_idle_age))
462-
.push_map_target(Addr::from)
463-
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
464-
.push_fallback_with_predicate(
465-
svc::stack(tcp_connect.clone())
466-
.push_make_thunk()
467-
.push(admit::AdmitLayer::new(prevent_loop))
468-
.push_map_target(TcpEndpoint::from)
469-
.instrument(|_: &SocketAddr| info_span!("forward")),
470-
is_discovery_rejected,
471-
)
472-
.into_new_service()
473-
.check_new_service::<SocketAddr, ()>()
474-
.cache(
475-
svc::layers().push_on_response(
476-
svc::layers()
477-
.push_failfast(dispatch_timeout)
478-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
479-
.push(metrics.stack.layer(stack_labels("tcp"))),
480-
),
481-
)
482-
.spawn_buffer(buffer_capacity)
483-
.check_make_service::<SocketAddr, ()>()
484-
.push(svc::layer::mk(tcp::Forward::new))
485-
.instrument(|a: &SocketAddr| info_span!("tcp", dst = %a))
486-
.push_map_target(|a: listen::Addrs| a.target_addr());
487-
488533
let http = http::DetectHttp::new(
489534
h2_settings,
490535
detect_protocol_timeout,
@@ -499,8 +544,12 @@ impl Config {
499544
.push(admit::AdmitLayer::new(prevent_loop))
500545
.push_map_target(TcpEndpoint::from);
501546

502-
let accept = svc::stack(svc::stack::MakeSwitch::new(skip_detect, http, tcp_forward))
503-
.push(metrics.transport.layer_accept(TransportLabels));
547+
let accept = svc::stack(svc::stack::MakeSwitch::new(
548+
skip_detect.clone(),
549+
http,
550+
tcp_forward,
551+
))
552+
.push(metrics.transport.layer_accept(TransportLabels));
504553

505554
info!(addr = %listen_addr, "Serving");
506555
serve::serve(listen, accept, drain.signal()).await

0 commit comments

Comments
 (0)