Skip to content

Commit 02509db

Browse files
authored
feat(policy): Add http protocol configuration (#13721)
This adds `"http"` and `"kubernetes.io/h2c"` as a valid values for service port `appProtocol`. Signed-off-by: Scott Fleener <[email protected]>
1 parent e47862b commit 02509db

File tree

9 files changed

+373
-23
lines changed

9 files changed

+373
-23
lines changed

policy-controller/core/src/outbound.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;
4444

4545
#[derive(Debug, Clone, PartialEq)]
4646
pub enum AppProtocol {
47+
Http1,
48+
Http2,
4749
Opaque,
4850
Unknown(Arc<str>),
4951
}
@@ -53,6 +55,8 @@ impl FromStr for AppProtocol {
5355

5456
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
5557
let protocol = match s.to_ascii_lowercase().as_str() {
58+
"http" => AppProtocol::Http1,
59+
"kubernetes.io/h2c" => AppProtocol::Http2,
5660
"linkerd.io/tcp" | "linkerd.io/opaque" => AppProtocol::Opaque,
5761
protocol => AppProtocol::Unknown(Arc::from(protocol)),
5862
};

policy-controller/grpc/src/outbound.rs

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -372,37 +372,64 @@ fn to_proto(
372372
) -> outbound::OutboundPolicy {
373373
let backend: outbound::Backend = default_backend(&policy, original_dst);
374374

375+
let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
376+
kind: Some(match accrual {
377+
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
378+
max_failures,
379+
backoff,
380+
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
381+
outbound::failure_accrual::ConsecutiveFailures {
382+
max_failures,
383+
backoff: Some(outbound::ExponentialBackoff {
384+
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
385+
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
386+
jitter_ratio: backoff.jitter,
387+
}),
388+
},
389+
),
390+
}),
391+
});
392+
393+
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
394+
375395
let kind = match &policy.app_protocol {
376396
Some(AppProtocol::Opaque) => {
377397
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
378398
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
379399
})
380400
}
401+
Some(AppProtocol::Http1) => {
402+
http_routes.sort_by(timestamp_then_name);
403+
http::http1_only_protocol(
404+
backend,
405+
http_routes.into_iter(),
406+
accrual,
407+
policy.http_retry.clone(),
408+
policy.timeouts.clone(),
409+
allow_l5d_request_headers,
410+
&policy.parent_info,
411+
original_dst,
412+
)
413+
}
414+
Some(AppProtocol::Http2) => {
415+
http_routes.sort_by(timestamp_then_name);
416+
http::http2_only_protocol(
417+
backend,
418+
http_routes.into_iter(),
419+
accrual,
420+
policy.http_retry.clone(),
421+
policy.timeouts.clone(),
422+
allow_l5d_request_headers,
423+
&policy.parent_info,
424+
original_dst,
425+
)
426+
}
381427
None | Some(AppProtocol::Unknown(_)) => {
382428
if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol {
383429
tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\"");
384430
}
385431

386-
let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
387-
kind: Some(match accrual {
388-
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
389-
max_failures,
390-
backoff,
391-
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
392-
outbound::failure_accrual::ConsecutiveFailures {
393-
max_failures,
394-
backoff: Some(outbound::ExponentialBackoff {
395-
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
396-
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
397-
jitter_ratio: backoff.jitter,
398-
}),
399-
},
400-
),
401-
}),
402-
});
403-
404432
let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
405-
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
406433
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
407434
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();
408435

policy-controller/grpc/src/outbound/http.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,103 @@ pub(crate) fn protocol(
8484
})
8585
}
8686

87+
#[allow(clippy::too_many_arguments)]
88+
pub(crate) fn http1_only_protocol(
89+
default_backend: outbound::Backend,
90+
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
91+
accrual: Option<outbound::FailureAccrual>,
92+
service_retry: Option<RouteRetry<HttpRetryCondition>>,
93+
service_timeouts: RouteTimeouts,
94+
allow_l5d_request_headers: bool,
95+
parent_info: &ParentInfo,
96+
original_dst: Option<SocketAddr>,
97+
) -> outbound::proxy_protocol::Kind {
98+
outbound::proxy_protocol::Kind::Http1(outbound::proxy_protocol::Http1 {
99+
routes: base_http_routes(
100+
default_backend,
101+
routes,
102+
service_retry,
103+
service_timeouts,
104+
allow_l5d_request_headers,
105+
parent_info,
106+
original_dst,
107+
),
108+
failure_accrual: accrual,
109+
})
110+
}
111+
112+
#[allow(clippy::too_many_arguments)]
113+
pub(crate) fn http2_only_protocol(
114+
default_backend: outbound::Backend,
115+
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
116+
accrual: Option<outbound::FailureAccrual>,
117+
service_retry: Option<RouteRetry<HttpRetryCondition>>,
118+
service_timeouts: RouteTimeouts,
119+
allow_l5d_request_headers: bool,
120+
parent_info: &ParentInfo,
121+
original_dst: Option<SocketAddr>,
122+
) -> outbound::proxy_protocol::Kind {
123+
outbound::proxy_protocol::Kind::Http2(outbound::proxy_protocol::Http2 {
124+
routes: base_http_routes(
125+
default_backend,
126+
routes,
127+
service_retry,
128+
service_timeouts,
129+
allow_l5d_request_headers,
130+
parent_info,
131+
original_dst,
132+
),
133+
failure_accrual: accrual,
134+
})
135+
}
136+
137+
fn base_http_routes(
138+
default_backend: outbound::Backend,
139+
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
140+
service_retry: Option<RouteRetry<HttpRetryCondition>>,
141+
service_timeouts: RouteTimeouts,
142+
allow_l5d_request_headers: bool,
143+
parent_info: &ParentInfo,
144+
original_dst: Option<SocketAddr>,
145+
) -> Vec<outbound::HttpRoute> {
146+
let mut routes = routes
147+
.map(|(gknn, route)| {
148+
convert_outbound_route(
149+
gknn,
150+
route,
151+
default_backend.clone(),
152+
service_retry.clone(),
153+
service_timeouts.clone(),
154+
allow_l5d_request_headers,
155+
parent_info,
156+
original_dst,
157+
)
158+
})
159+
.collect::<Vec<_>>();
160+
161+
match parent_info {
162+
ParentInfo::Service { .. } => {
163+
if routes.is_empty() {
164+
routes.push(default_outbound_service_route(
165+
default_backend,
166+
service_retry.clone(),
167+
service_timeouts.clone(),
168+
));
169+
}
170+
}
171+
ParentInfo::EgressNetwork { traffic_policy, .. } => {
172+
routes.push(default_outbound_egress_route(
173+
default_backend,
174+
service_retry.clone(),
175+
service_timeouts.clone(),
176+
traffic_policy,
177+
));
178+
}
179+
}
180+
181+
routes
182+
}
183+
87184
#[allow(clippy::too_many_arguments)]
88185
fn convert_outbound_route(
89186
gknn: GroupKindNamespaceName,

policy-test/src/outbound_api.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,46 @@ where
5858
}
5959
}
6060

61+
#[track_caller]
62+
pub fn http1_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
63+
let kind = config
64+
.protocol
65+
.as_ref()
66+
.expect("must have proxy protocol")
67+
.kind
68+
.as_ref()
69+
.expect("must have kind");
70+
if let grpc::outbound::proxy_protocol::Kind::Http1(grpc::outbound::proxy_protocol::Http1 {
71+
routes,
72+
failure_accrual: _,
73+
}) = kind
74+
{
75+
routes
76+
} else {
77+
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
78+
}
79+
}
80+
81+
#[track_caller]
82+
pub fn http2_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
83+
let kind = config
84+
.protocol
85+
.as_ref()
86+
.expect("must have proxy protocol")
87+
.kind
88+
.as_ref()
89+
.expect("must have kind");
90+
if let grpc::outbound::proxy_protocol::Kind::Http2(grpc::outbound::proxy_protocol::Http2 {
91+
routes,
92+
failure_accrual: _,
93+
}) = kind
94+
{
95+
routes
96+
} else {
97+
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
98+
}
99+
}
100+
61101
#[track_caller]
62102
pub fn grpc_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::GrpcRoute] {
63103
let kind = config

policy-test/tests/outbound_api_app_protocol.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use linkerd_policy_controller_k8s_api::gateway;
44
use linkerd_policy_test::{
55
assert_resource_meta, create,
66
outbound_api::{
7-
assert_route_is_default, assert_singleton, retry_watch_outbound_policy, tcp_routes,
7+
assert_route_is_default, assert_singleton, http1_routes, http2_routes,
8+
retry_watch_outbound_policy, tcp_routes,
89
},
910
test_route::TestParent,
1011
with_temp_ns,
@@ -45,3 +46,75 @@ async fn opaque_parent() {
4546

4647
test::<k8s::Service>().await;
4748
}
49+
50+
#[tokio::test(flavor = "current_thread")]
51+
async fn http1_parent() {
52+
async fn test<P: TestParent>() {
53+
tracing::debug!(
54+
parent = %P::kind(&P::DynamicType::default()),
55+
);
56+
with_temp_ns(|client, ns| async move {
57+
let port = 4191;
58+
// Create a parent with no routes.
59+
// let parent = P::create_parent(&client.clone(), &ns).await;
60+
let parent = create(
61+
&client,
62+
P::make_parent_with_protocol(&ns, Some("http".to_string())),
63+
)
64+
.await;
65+
66+
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
67+
let config = rx
68+
.next()
69+
.await
70+
.expect("watch must not fail")
71+
.expect("watch must return an initial config");
72+
tracing::trace!(?config);
73+
74+
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
75+
76+
let routes = http1_routes(&config);
77+
let route = assert_singleton(routes);
78+
assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port);
79+
})
80+
.await;
81+
}
82+
83+
test::<k8s::Service>().await;
84+
}
85+
86+
#[tokio::test(flavor = "current_thread")]
87+
async fn http2_parent() {
88+
async fn test<P: TestParent>() {
89+
tracing::debug!(
90+
parent = %P::kind(&P::DynamicType::default()),
91+
);
92+
with_temp_ns(|client, ns| async move {
93+
let port = 4191;
94+
// Create a parent with no routes.
95+
// let parent = P::create_parent(&client.clone(), &ns).await;
96+
let parent = create(
97+
&client,
98+
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
99+
)
100+
.await;
101+
102+
let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
103+
let config = rx
104+
.next()
105+
.await
106+
.expect("watch must not fail")
107+
.expect("watch must return an initial config");
108+
tracing::trace!(?config);
109+
110+
assert_resource_meta(&config.metadata, parent.obj_ref(), port);
111+
112+
let routes = http2_routes(&config);
113+
let route = assert_singleton(routes);
114+
assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port);
115+
})
116+
.await;
117+
}
118+
119+
test::<k8s::Service>().await;
120+
}

0 commit comments

Comments
 (0)