Skip to content

Commit 3dc9840

Browse files
committed
grpc: add testing utilities for LB policy tests
1 parent a7bb502 commit 3dc9840

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use crate::client::{
26+
load_balancing::{
27+
ChannelController, ExternalSubchannel, ForwardingSubchannel, LbState, Subchannel,
28+
WorkScheduler,
29+
},
30+
name_resolution::Address,
31+
};
32+
use crate::service::{Message, Request, Response, Service};
33+
use std::{
34+
fmt::Display,
35+
hash::{Hash, Hasher},
36+
ops::Add,
37+
sync::Arc,
38+
};
39+
use tokio::{
40+
sync::{mpsc, Notify},
41+
task::AbortHandle,
42+
};
43+
44+
pub(crate) struct EmptyMessage {}
45+
impl Message for EmptyMessage {}
46+
pub(crate) fn new_request() -> Request {
47+
Request::new(Box::pin(tokio_stream::once(
48+
Box::new(EmptyMessage {}) as Box<dyn Message>
49+
)))
50+
}
51+
52+
// A test subchannel that forwards connect calls to a channel.
53+
// This allows tests to verify when a subchannel is asked to connect.
54+
pub(crate) struct TestSubchannel {
55+
address: Address,
56+
tx_connect: mpsc::UnboundedSender<TestEvent>,
57+
}
58+
59+
impl TestSubchannel {
60+
fn new(address: Address, tx_connect: mpsc::UnboundedSender<TestEvent>) -> Self {
61+
Self {
62+
address,
63+
tx_connect,
64+
}
65+
}
66+
}
67+
68+
impl ForwardingSubchannel for TestSubchannel {
69+
fn delegate(&self) -> Arc<dyn Subchannel> {
70+
panic!("unsupported operation on a test subchannel");
71+
}
72+
73+
fn address(&self) -> Address {
74+
self.address.clone()
75+
}
76+
77+
fn connect(&self) {
78+
println!("connect called for subchannel {}", self.address);
79+
self.tx_connect
80+
.send(TestEvent::Connect(self.address.clone()))
81+
.unwrap();
82+
}
83+
}
84+
85+
impl Hash for TestSubchannel {
86+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
87+
self.address.hash(state);
88+
}
89+
}
90+
91+
impl PartialEq for TestSubchannel {
92+
fn eq(&self, other: &Self) -> bool {
93+
self.address == other.address
94+
}
95+
}
96+
impl Eq for TestSubchannel {}
97+
98+
pub(crate) enum TestEvent {
99+
NewSubchannel(Address, Arc<dyn Subchannel>),
100+
UpdatePicker(LbState),
101+
RequestResolution,
102+
Connect(Address),
103+
ScheduleWork,
104+
}
105+
106+
impl Display for TestEvent {
107+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108+
match self {
109+
Self::NewSubchannel(addr, _) => write!(f, "NewSubchannel({})", addr),
110+
Self::UpdatePicker(state) => write!(f, "UpdatePicker({})", state.connectivity_state),
111+
Self::RequestResolution => write!(f, "RequestResolution"),
112+
Self::Connect(addr) => write!(f, "Connect({})", addr.address.to_string()),
113+
Self::ScheduleWork => write!(f, "ScheduleWork"),
114+
}
115+
}
116+
}
117+
118+
// A test channel controller that forwards calls to a channel. This allows
119+
// tests to verify when a channel controller is asked to create subchannels or
120+
// update the picker.
121+
pub(crate) struct TestChannelController {
122+
pub tx_events: mpsc::UnboundedSender<TestEvent>,
123+
}
124+
125+
impl ChannelController for TestChannelController {
126+
fn new_subchannel(&mut self, address: &Address) -> Arc<dyn Subchannel> {
127+
println!("new_subchannel called for address {}", address);
128+
let notify = Arc::new(Notify::new());
129+
let subchannel: Arc<dyn Subchannel> =
130+
Arc::new(TestSubchannel::new(address.clone(), self.tx_events.clone()));
131+
self.tx_events
132+
.send(TestEvent::NewSubchannel(
133+
address.clone(),
134+
subchannel.clone(),
135+
))
136+
.unwrap();
137+
subchannel
138+
}
139+
fn update_picker(&mut self, update: LbState) {
140+
println!("picker_update called with {}", update.connectivity_state);
141+
self.tx_events
142+
.send(TestEvent::UpdatePicker(update))
143+
.unwrap();
144+
}
145+
fn request_resolution(&mut self) {
146+
self.tx_events.send(TestEvent::RequestResolution).unwrap();
147+
}
148+
}
149+
150+
pub(crate) struct TestWorkScheduler {
151+
pub tx_events: mpsc::UnboundedSender<TestEvent>,
152+
}
153+
154+
impl WorkScheduler for TestWorkScheduler {
155+
fn schedule_work(&self) {
156+
self.tx_events.send(TestEvent::ScheduleWork).unwrap();
157+
}
158+
}

0 commit comments

Comments
 (0)