Skip to content

Commit 8d5c9f9

Browse files
easwarscjqzhao
authored andcommitted
grpc: add testing utilities for LB policy tests (hyperium#2380)
1 parent deb1650 commit 8d5c9f9

File tree

2 files changed

+149
-0
lines changed

2 files changed

+149
-0
lines changed

grpc/src/client/load_balancing/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ use crate::client::{
5353

5454
pub mod child_manager;
5555
pub mod pick_first;
56+
#[cfg(test)]
57+
pub mod test_utils;
5658

5759
pub(crate) mod registry;
5860
use super::{service_config::LbConfig, subchannel::SubchannelStateWatcher};
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use crate::client::load_balancing::{
26+
ChannelController, ExternalSubchannel, ForwardingSubchannel, LbState, Subchannel, WorkScheduler,
27+
};
28+
use crate::client::name_resolution::Address;
29+
use crate::service::{Message, Request, Response, Service};
30+
use std::hash::{Hash, Hasher};
31+
use std::{fmt::Debug, ops::Add, sync::Arc};
32+
use tokio::sync::{mpsc, Notify};
33+
use tokio::task::AbortHandle;
34+
35+
pub(crate) struct EmptyMessage {}
36+
impl Message for EmptyMessage {}
37+
pub(crate) fn new_request() -> Request {
38+
Request::new(Box::pin(tokio_stream::once(
39+
Box::new(EmptyMessage {}) as Box<dyn Message>
40+
)))
41+
}
42+
43+
// A test subchannel that forwards connect calls to a channel.
44+
// This allows tests to verify when a subchannel is asked to connect.
45+
pub(crate) struct TestSubchannel {
46+
address: Address,
47+
tx_connect: mpsc::UnboundedSender<TestEvent>,
48+
}
49+
50+
impl TestSubchannel {
51+
fn new(address: Address, tx_connect: mpsc::UnboundedSender<TestEvent>) -> Self {
52+
Self {
53+
address,
54+
tx_connect,
55+
}
56+
}
57+
}
58+
59+
impl ForwardingSubchannel for TestSubchannel {
60+
fn delegate(&self) -> Arc<dyn Subchannel> {
61+
panic!("unsupported operation on a test subchannel");
62+
}
63+
64+
fn address(&self) -> Address {
65+
self.address.clone()
66+
}
67+
68+
fn connect(&self) {
69+
println!("connect called for subchannel {}", self.address);
70+
self.tx_connect
71+
.send(TestEvent::Connect(self.address.clone()))
72+
.unwrap();
73+
}
74+
}
75+
76+
impl Hash for TestSubchannel {
77+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
78+
self.address.hash(state);
79+
}
80+
}
81+
82+
impl PartialEq for TestSubchannel {
83+
fn eq(&self, other: &Self) -> bool {
84+
std::ptr::eq(self, other)
85+
}
86+
}
87+
impl Eq for TestSubchannel {}
88+
89+
pub(crate) enum TestEvent {
90+
NewSubchannel(Arc<dyn Subchannel>),
91+
UpdatePicker(LbState),
92+
RequestResolution,
93+
Connect(Address),
94+
ScheduleWork,
95+
}
96+
97+
// TODO(easwars): Remove this and instead derive Debug.
98+
impl Debug for TestEvent {
99+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100+
match self {
101+
Self::NewSubchannel(sc) => write!(f, "NewSubchannel({})", sc.address()),
102+
Self::UpdatePicker(state) => write!(f, "UpdatePicker({})", state.connectivity_state),
103+
Self::RequestResolution => write!(f, "RequestResolution"),
104+
Self::Connect(addr) => write!(f, "Connect({})", addr.address.to_string()),
105+
Self::ScheduleWork => write!(f, "ScheduleWork"),
106+
}
107+
}
108+
}
109+
110+
/// A test channel controller that forwards calls to a channel. This allows
111+
/// tests to verify when a channel controller is asked to create subchannels or
112+
/// update the picker.
113+
pub(crate) struct TestChannelController {
114+
pub(crate) tx_events: mpsc::UnboundedSender<TestEvent>,
115+
}
116+
117+
impl ChannelController for TestChannelController {
118+
fn new_subchannel(&mut self, address: &Address) -> Arc<dyn Subchannel> {
119+
println!("new_subchannel called for address {}", address);
120+
let notify = Arc::new(Notify::new());
121+
let subchannel: Arc<dyn Subchannel> =
122+
Arc::new(TestSubchannel::new(address.clone(), self.tx_events.clone()));
123+
self.tx_events
124+
.send(TestEvent::NewSubchannel(subchannel.clone()))
125+
.unwrap();
126+
subchannel
127+
}
128+
fn update_picker(&mut self, update: LbState) {
129+
println!("picker_update called with {}", update.connectivity_state);
130+
self.tx_events
131+
.send(TestEvent::UpdatePicker(update))
132+
.unwrap();
133+
}
134+
fn request_resolution(&mut self) {
135+
self.tx_events.send(TestEvent::RequestResolution).unwrap();
136+
}
137+
}
138+
139+
pub(crate) struct TestWorkScheduler {
140+
pub(crate) tx_events: mpsc::UnboundedSender<TestEvent>,
141+
}
142+
143+
impl WorkScheduler for TestWorkScheduler {
144+
fn schedule_work(&self) {
145+
self.tx_events.send(TestEvent::ScheduleWork).unwrap();
146+
}
147+
}

0 commit comments

Comments
 (0)