Skip to content

Commit 79f1c6e

Browse files
committed
Revert "Run message processing tasks in parallel (#878)"
This reverts commit bb8a64f.
1 parent cc4e87d commit 79f1c6e

File tree

9 files changed

+103
-139
lines changed

9 files changed

+103
-139
lines changed

common/src/typed_socket/mod.rs

Lines changed: 23 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,12 @@ pub struct TypedSocket<T: ChannelMessage> {
2626
}
2727

2828
#[derive(Clone)]
29-
pub struct TypedSocketSender<T: ChannelMessage> {
30-
sender: Sender<SocketAction<T>>,
29+
pub struct TypedSocketSender<A> {
30+
inner_send:
31+
Arc<dyn Fn(SocketAction<A>) -> Result<(), TypedSocketError> + 'static + Send + Sync>,
3132
}
3233

33-
#[derive(Clone)]
34-
pub struct WrappedTypedSocketSender<K> {
35-
send: Arc<dyn Fn(K) -> Result<(), TypedSocketError> + 'static + Send + Sync>,
36-
}
37-
38-
impl<K> WrappedTypedSocketSender<K> {
39-
pub fn new<T: ChannelMessage, F>(sender: Sender<SocketAction<T>>, transform: F) -> Self
40-
where
41-
F: (Fn(K) -> T) + 'static + Send + Sync,
42-
{
43-
Self {
44-
send: Arc::new(move |message| {
45-
sender
46-
.try_send(SocketAction::Send(transform(message)))
47-
.map_err(TypedSocketError::from)
48-
}),
49-
}
50-
}
51-
52-
pub fn send(&self, message: K) -> Result<(), TypedSocketError> {
53-
(self.send)(message)
54-
}
55-
}
56-
57-
impl<T: ChannelMessage> Debug for TypedSocketSender<T> {
34+
impl<T> Debug for TypedSocketSender<T> {
5835
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5936
f.write_str("typed socket sender")
6037
}
@@ -77,24 +54,16 @@ impl<A> From<TrySendError<A>> for TypedSocketError {
7754
}
7855
}
7956

80-
impl<T: ChannelMessage> TypedSocketSender<T> {
81-
pub fn send(&self, message: T) -> Result<(), TypedSocketError> {
82-
self.sender.try_send(SocketAction::Send(message))?;
57+
impl<A: Debug> TypedSocketSender<A> {
58+
pub fn send(&self, message: A) -> Result<(), TypedSocketError> {
59+
(self.inner_send)(SocketAction::Send(message))?;
8360
Ok(())
8461
}
8562

8663
pub fn close(&mut self) -> Result<(), TypedSocketError> {
87-
self.sender.try_send(SocketAction::Close)?;
64+
(self.inner_send)(SocketAction::Close)?;
8865
Ok(())
8966
}
90-
91-
/// Wrap the sender with a transform function.
92-
pub fn wrap<K, F>(&self, transform: F) -> WrappedTypedSocketSender<K>
93-
where
94-
F: (Fn(K) -> T) + 'static + Send + Sync,
95-
{
96-
WrappedTypedSocketSender::new(self.sender.clone(), transform)
97-
}
9867
}
9968

10069
impl<T: ChannelMessage> TypedSocket<T> {
@@ -109,10 +78,22 @@ impl<T: ChannelMessage> TypedSocket<T> {
10978
self.recv.recv().await
11079
}
11180

112-
pub fn sender(&self) -> TypedSocketSender<T> {
81+
pub fn sender<A, F>(&self, transform: F) -> TypedSocketSender<A>
82+
where
83+
F: (Fn(A) -> T) + 'static + Send + Sync,
84+
{
11385
let sender = self.send.clone();
114-
115-
TypedSocketSender { sender }
86+
let inner_send = move |message: SocketAction<A>| {
87+
let message = match message {
88+
SocketAction::Close => SocketAction::Close,
89+
SocketAction::Send(message) => SocketAction::Send(transform(message)),
90+
};
91+
sender.try_send(message).map_err(|e| e.into())
92+
};
93+
94+
TypedSocketSender {
95+
inner_send: Arc::new(inner_send),
96+
}
11697
}
11798

11899
pub async fn close(&mut self) {

plane/plane-tests/tests/cert_manager.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::sync::Arc;
1111
mod common;
1212

1313
#[plane_test]
14-
#[ignore = "Doesn't work"]
1514
async fn cert_manager_does_refresh(env: TestEnvironment) {
1615
let controller = env.controller().await;
1716

@@ -57,7 +56,6 @@ async fn cert_manager_does_refresh(env: TestEnvironment) {
5756
}
5857

5958
#[plane_test(500)]
60-
#[ignore = "Doesn't work"]
6159
async fn cert_manager_does_refresh_eab(env: TestEnvironment) {
6260
let certs_dir = env.scratch_dir.join("certs");
6361

plane/plane-tests/tests/proxy_cors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,6 @@ async fn proxy_valid_request_has_cors_headers(env: TestEnvironment) {
126126
.unwrap()
127127
.to_str()
128128
.unwrap(),
129-
"*, Authorization"
129+
"authorization, accept, content-type"
130130
);
131131
}

plane/src/controller/drone.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use plane_common::{
99
ApiErrorKind, BackendAction, BackendActionMessage, Heartbeat, KeyDeadlines,
1010
MessageFromDrone, MessageToDrone, RenewKeyResponse,
1111
},
12-
typed_socket::{server::new_server, TypedSocketSender},
12+
typed_socket::{server::new_server, TypedSocket},
1313
types::{
1414
backend_state::TerminationReason, ClusterName, DronePoolName, NodeId, TerminationKind,
1515
},
@@ -41,7 +41,7 @@ pub async fn handle_message_from_drone(
4141
msg: MessageFromDrone,
4242
drone_id: NodeId,
4343
controller: &Controller,
44-
sender: TypedSocketSender<MessageToDrone>,
44+
sender: &mut TypedSocket<MessageToDrone>,
4545
) -> anyhow::Result<()> {
4646
match msg {
4747
MessageFromDrone::BackendMetrics(metrics_msg) => {
@@ -146,7 +146,7 @@ pub async fn sweep_loop(db: PlaneDatabase, drone_id: NodeId) {
146146

147147
pub async fn process_pending_actions(
148148
db: &PlaneDatabase,
149-
socket: TypedSocketSender<MessageToDrone>,
149+
socket: &mut TypedSocket<MessageToDrone>,
150150
drone_id: &NodeId,
151151
) -> Result<(), anyhow::Error> {
152152
let mut count = 0;
@@ -200,7 +200,7 @@ pub async fn drone_socket_inner(
200200
let mut backend_actions: Subscription<BackendActionMessage> =
201201
controller.db.subscribe_with_key(&drone_id.to_string());
202202

203-
process_pending_actions(&controller.db, socket.sender(), &drone_id).await?;
203+
process_pending_actions(&controller.db, &mut socket, &drone_id).await?;
204204

205205
let mut interval = tokio::time::interval(Duration::from_secs(5));
206206
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -212,13 +212,7 @@ pub async fn drone_socket_inner(
212212
loop {
213213
tokio::select! {
214214
_ = interval.tick() => {
215-
let sender = socket.sender();
216-
let db = controller.db.clone();
217-
tokio::spawn(async move {
218-
if let Err(err) = process_pending_actions(&db, sender, &drone_id).await {
219-
tracing::error!(?err, "Error processing pending actions");
220-
}
221-
});
215+
process_pending_actions(&controller.db, &mut socket, &drone_id).await?;
222216
}
223217
_ = log_interval.tick() => {
224218
let (outgoing, incoming) = socket.channel_depths();
@@ -269,14 +263,9 @@ pub async fn drone_socket_inner(
269263
*message_counts.entry("backend_metrics").or_insert(0) += 1;
270264
}
271265
}
272-
273-
let sender = socket.sender();
274-
let controller = controller.clone();
275-
tokio::spawn(async move {
276-
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await {
277-
tracing::error!(?err, "Error handling message from drone");
278-
}
279-
});
266+
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, &mut socket).await {
267+
tracing::error!(?err, "Error handling message from drone");
268+
}
280269
}
281270
None => {
282271
tracing::info!("Drone socket closed");

plane/src/controller/proxy.rs

Lines changed: 57 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use plane_common::{
1414
ApiErrorKind, CertManagerRequest, CertManagerResponse, MessageFromProxy, MessageToProxy,
1515
RouteInfoRequest, RouteInfoResponse,
1616
},
17-
typed_socket::{server::new_server, TypedSocketSender},
17+
typed_socket::{server::new_server, TypedSocket},
1818
types::{BackendState, BearerToken, ClusterName, NodeId},
1919
};
2020
use std::{
@@ -28,7 +28,7 @@ use valuable::Valuable;
2828
pub async fn handle_route_info_request(
2929
token: BearerToken,
3030
controller: &Controller,
31-
socket: TypedSocketSender<MessageToProxy>,
31+
socket: &mut TypedSocket<MessageToProxy>,
3232
) -> anyhow::Result<()> {
3333
match controller.db.backend().route_info_for_token(&token).await {
3434
// When a proxy requests a route, either:
@@ -79,58 +79,65 @@ pub async fn handle_route_info_request(
7979
}
8080
}
8181

82-
let socket = socket.wrap(MessageToProxy::RouteInfoResponse);
83-
84-
loop {
85-
// Note: this timeout is arbitrary to avoid a memory leak. Under normal system operation, the critical
86-
// timeout will be that of the backend failing to start. We use a large timeout to avoid it becoming
87-
// the critical timeout when the system is functioning.
88-
let result = match tokio::time::timeout(
89-
std::time::Duration::from_secs(30 * 60 /* 30 minutes */),
90-
sub.next(),
91-
)
92-
.await
93-
{
94-
Ok(Some(result)) => result,
95-
Ok(None) => {
96-
tracing::error!("Event subscription closed!");
97-
break;
98-
}
99-
Err(_) => {
100-
tracing::error!("Timeout waiting for backend state");
101-
break;
102-
}
103-
};
82+
let socket = socket.sender(MessageToProxy::RouteInfoResponse);
83+
tokio::spawn(async move {
84+
loop {
85+
// Note: this timeout is arbitrary to avoid a memory leak. Under normal system operation, the critical
86+
// timeout will be that of the backend failing to start. We use a large timeout to avoid it becoming
87+
// the critical timeout when the system is functioning.
88+
let result = match tokio::time::timeout(
89+
std::time::Duration::from_secs(30 * 60 /* 30 minutes */),
90+
sub.next(),
91+
)
92+
.await
93+
{
94+
Ok(Some(result)) => result,
95+
Ok(None) => {
96+
tracing::error!("Event subscription closed!");
97+
break;
98+
}
99+
Err(_) => {
100+
tracing::error!("Timeout waiting for backend state");
101+
break;
102+
}
103+
};
104104

105-
let Notification { payload, .. } = result;
105+
let Notification { payload, .. } = result;
106106

107-
match payload {
108-
BackendState::Ready { address } => {
109-
let route_info = partial_route_info.set_address(address);
110-
let response = RouteInfoResponse {
111-
token,
112-
route_info: Some(route_info),
113-
};
114-
if let Err(err) = socket.send(response) {
115-
tracing::error!(?err, "Error sending route info response to proxy.");
107+
match payload {
108+
BackendState::Ready { address } => {
109+
let route_info = partial_route_info.set_address(address);
110+
let response = RouteInfoResponse {
111+
token,
112+
route_info: Some(route_info),
113+
};
114+
if let Err(err) = socket.send(response) {
115+
tracing::error!(
116+
?err,
117+
"Error sending route info response to proxy."
118+
);
119+
}
120+
break;
116121
}
117-
break;
118-
}
119-
BackendState::Terminated { .. }
120-
| BackendState::Terminating { .. }
121-
| BackendState::HardTerminating { .. } => {
122-
let response = RouteInfoResponse {
123-
token,
124-
route_info: None,
125-
};
126-
if let Err(err) = socket.send(response) {
127-
tracing::error!(?err, "Error sending route info response to proxy.");
122+
BackendState::Terminated { .. }
123+
| BackendState::Terminating { .. }
124+
| BackendState::HardTerminating { .. } => {
125+
let response = RouteInfoResponse {
126+
token,
127+
route_info: None,
128+
};
129+
if let Err(err) = socket.send(response) {
130+
tracing::error!(
131+
?err,
132+
"Error sending route info response to proxy."
133+
);
134+
}
135+
break;
128136
}
129-
break;
137+
_ => {}
130138
}
131-
_ => {}
132139
}
133-
}
140+
});
134141
}
135142
Ok(RouteInfoResult::NotFound) => {
136143
let response = RouteInfoResponse {
@@ -152,7 +159,7 @@ pub async fn handle_route_info_request(
152159
pub async fn handle_message_from_proxy(
153160
message: MessageFromProxy,
154161
controller: &Controller,
155-
socket: TypedSocketSender<MessageToProxy>,
162+
socket: &mut TypedSocket<MessageToProxy>,
156163
cluster: &ClusterName,
157164
node_id: NodeId,
158165
) -> anyhow::Result<()> {
@@ -294,15 +301,7 @@ pub async fn proxy_socket_inner(
294301
*message_counts.entry("cert_manager_request").or_insert(0) += 1;
295302
}
296303
}
297-
298-
let sender = socket.sender();
299-
let controller = controller.clone();
300-
let cluster = cluster.clone();
301-
tokio::spawn(async move {
302-
if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await {
303-
tracing::error!(?err, "Error handling message from proxy");
304-
}
305-
});
304+
handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await?
306305
}
307306
None => {
308307
tracing::info!("Proxy socket closed");

plane/src/drone/heartbeat.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use crate::heartbeat_consts::HEARTBEAT_INTERVAL;
22
use chrono::Utc;
3-
use plane_common::{
4-
log_types::LoggableTime, protocol::Heartbeat, typed_socket::WrappedTypedSocketSender,
5-
};
3+
use plane_common::{log_types::LoggableTime, protocol::Heartbeat, typed_socket::TypedSocketSender};
64
use tokio::task::JoinHandle;
75

86
/// A background task that sends heartbeats to the server.
@@ -11,7 +9,7 @@ pub struct HeartbeatLoop {
119
}
1210

1311
impl HeartbeatLoop {
14-
pub fn start(sender: WrappedTypedSocketSender<Heartbeat>) -> Self {
12+
pub fn start(sender: TypedSocketSender<Heartbeat>) -> Self {
1513
let handle = tokio::spawn(async move {
1614
loop {
1715
let local_time = LoggableTime(Utc::now());

plane/src/drone/key_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use plane_common::{
55
log_types::LoggableTime,
66
names::BackendName,
77
protocol::{AcquiredKey, BackendAction, KeyDeadlines, RenewKeyRequest},
8-
typed_socket::WrappedTypedSocketSender,
8+
typed_socket::TypedSocketSender,
99
types::{backend_state::TerminationReason, TerminationKind},
1010
};
1111
use std::{collections::HashMap, sync::Arc, time::Duration};
@@ -20,13 +20,13 @@ pub struct KeyManager {
2020
/// and terminating the backend if the key cannot be renewed.
2121
handles: HashMap<BackendName, (AcquiredKey, GuardHandle)>,
2222

23-
sender: Option<WrappedTypedSocketSender<RenewKeyRequest>>,
23+
sender: Option<TypedSocketSender<RenewKeyRequest>>,
2424
}
2525

2626
async fn renew_key_loop(
2727
key: AcquiredKey,
2828
backend: BackendName,
29-
sender: Option<WrappedTypedSocketSender<RenewKeyRequest>>,
29+
sender: Option<TypedSocketSender<RenewKeyRequest>>,
3030
executor: Arc<Executor>,
3131
) {
3232
loop {
@@ -120,7 +120,7 @@ impl KeyManager {
120120
}
121121
}
122122

123-
pub fn set_sender(&mut self, sender: WrappedTypedSocketSender<RenewKeyRequest>) {
123+
pub fn set_sender(&mut self, sender: TypedSocketSender<RenewKeyRequest>) {
124124
self.sender.replace(sender);
125125

126126
for (backend, (acquired_key, handle)) in self.handles.iter_mut() {

0 commit comments

Comments
 (0)