Skip to content

Commit e8b636f

Browse files
committed
feat:支持负载均衡
1 parent 9d0d5ac commit e8b636f

File tree

18 files changed

+556
-93
lines changed

18 files changed

+556
-93
lines changed

src/core/config/consumer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct ServiceRouterPluginConfig {
4545
#[derive(Deserialize, Debug)]
4646
#[serde(rename_all = "camelCase", deny_unknown_fields)]
4747
pub struct LoadBalancerConfig {
48+
pub default_policy: String,
4849
pub plugins: Option<Vec<String>>,
4950
}
5051

src/core/context.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::core::engine::Engine;
2121
use crate::core::model::error::{ErrorCode, PolarisError};
2222

2323
pub struct SDKContext {
24+
pub conf: Arc<Configuration>,
2425
engine: Arc<Engine>,
2526
}
2627

@@ -53,12 +54,14 @@ impl SDKContext {
5354
// create_by_configuration
5455
pub fn create_by_configuration(cfg: Configuration) -> Result<SDKContext, PolarisError> {
5556
let start_time = std::time::Instant::now();
56-
let ret = Engine::new(cfg);
57+
let cfg = Arc::new(cfg);
58+
let ret = Engine::new(cfg.clone());
5759
tracing::info!("create engine cost: {:?}", start_time.elapsed());
5860
if ret.is_err() {
5961
return Err(ret.err().unwrap());
6062
}
6163
return Ok(Self {
64+
conf: cfg,
6265
engine: Arc::new(ret.ok().unwrap()),
6366
});
6467
}

src/core/engine.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use std::sync::Arc;
1818

1919
use tokio::runtime::{Builder, Runtime};
20+
use tokio::sync::RwLock;
2021

2122
use crate::config::req::{
2223
CreateConfigFileRequest, GetConfigFileRequest, PublishConfigFileRequest,
@@ -35,8 +36,10 @@ use crate::discovery::req::{
3536
};
3637

3738
use super::model::config::ConfigFile;
39+
use super::model::naming::ServiceInstances;
3840
use super::plugin::cache::{Filter, ResourceCache, ResourceListener};
3941
use super::plugin::connector::Connector;
42+
use super::plugin::loadbalance::LoadBalancer;
4043
use super::plugin::location::{LocationProvider, LocationSupplier};
4144

4245
pub struct Engine
@@ -47,10 +50,11 @@ where
4750
local_cache: Arc<Box<dyn ResourceCache>>,
4851
server_connector: Arc<Box<dyn Connector>>,
4952
location_provider: Arc<LocationProvider>,
53+
load_balancer: Arc<RwLock<HashMap<String, Arc<Box<dyn LoadBalancer>>>>>,
5054
}
5155

5256
impl Engine {
53-
pub fn new(conf: Configuration) -> Result<Self, PolarisError> {
57+
pub fn new(arc_conf: Arc<Configuration>) -> Result<Self, PolarisError> {
5458
let runtime = Arc::new(
5559
Builder::new_multi_thread()
5660
.enable_all()
@@ -60,7 +64,6 @@ impl Engine {
6064
.unwrap(),
6165
);
6266

63-
let arc_conf = Arc::new(conf);
6467
let client_id = crate::core::plugin::plugins::acquire_client_id(arc_conf.clone());
6568

6669
// 初始化 extensions
@@ -101,6 +104,7 @@ impl Engine {
101104
local_cache: Arc::new(local_cache),
102105
server_connector: server_connector,
103106
location_provider: Arc::new(location_provider),
107+
load_balancer: Arc::new(RwLock::new(extension.load_loadbalancers())),
104108
})
105109
}
106110

@@ -220,17 +224,12 @@ impl Engine {
220224

221225
let svc_ins = ret.unwrap();
222226

223-
if only_available {
224-
Ok(InstancesResponse {
225-
service_info: svc_ins.service,
226-
instances: svc_ins.available_instances,
227-
})
228-
} else {
229-
Ok(InstancesResponse {
230-
service_info: svc_ins.service,
231-
instances: svc_ins.instances,
232-
})
233-
}
227+
Ok(InstancesResponse {
228+
instances: ServiceInstances::new(
229+
svc_ins.get_service_info(),
230+
svc_ins.list_instances(only_available).await,
231+
),
232+
})
234233
}
235234

236235
/// get_service_rule 获取服务规则
@@ -340,6 +339,11 @@ impl Engine {
340339
};
341340
}
342341

342+
pub async fn lookup_loadbalancer(&self, name: &str) -> Option<Arc<Box<dyn LoadBalancer>>> {
343+
let lb = self.load_balancer.read().await;
344+
lb.get(name).map(|lb| lb.clone())
345+
}
346+
343347
/// register_resource_listener 注册资源监听器
344348
pub async fn register_resource_listener(&self, listener: Arc<dyn ResourceListener>) {
345349
self.local_cache.register_resource_listener(listener).await;

src/core/model/cache.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,8 @@ pub struct ServiceInstancesCacheItem {
326326
initialized: Arc<AtomicBool>,
327327
pub svc_info: Service,
328328
pub value: Arc<RwLock<Vec<Instance>>>,
329+
pub available_instances: Arc<RwLock<Vec<Instance>>>,
330+
pub total_weight: u64,
329331
pub revision: String,
330332
}
331333

@@ -344,11 +346,16 @@ impl ServiceInstancesCacheItem {
344346
initialized: Arc::new(AtomicBool::new(false)),
345347
svc_info: Service::default(),
346348
value: Arc::new(RwLock::new(Vec::new())),
349+
available_instances: Arc::new(RwLock::new(Vec::new())),
350+
total_weight: 0,
347351
revision: String::new(),
348352
}
349353
}
350354

351-
pub async fn list_instances(&self) -> Vec<Instance> {
355+
pub async fn list_instances(&self, only_available: bool) -> Vec<Instance> {
356+
if only_available {
357+
return self.available_instances.read().await.clone();
358+
}
352359
self.value.read().await.clone()
353360
}
354361

@@ -375,6 +382,8 @@ impl Clone for ServiceInstancesCacheItem {
375382
initialized: self.initialized.clone(),
376383
svc_info: self.svc_info.clone(),
377384
value: self.value.clone(),
385+
available_instances: self.available_instances.clone(),
386+
total_weight: self.total_weight,
378387
revision: self.revision.clone(),
379388
}
380389
}

src/core/model/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ where
7878

7979
impl Display for PolarisError {
8080
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81-
write!(f, "{:?}", self)
81+
write!(f, "err_code={:?} err_msg={}", self.err_code, self.err_msg)
8282
}
8383
}
8484

src/core/model/naming.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,37 @@ pub struct ServiceInfo {
4545
pub revision: String,
4646
}
4747

48-
#[derive(Default)]
48+
#[derive(Default, Clone, Debug)]
4949
pub struct ServiceInstances {
5050
pub service: ServiceInfo,
5151
pub instances: Vec<Instance>,
52-
pub available_instances: Vec<Instance>,
52+
pub total_weight: u64,
53+
}
54+
55+
impl ServiceInstances {
56+
pub fn get_cache_key(&self) -> String {
57+
format!(
58+
"Instance-namespace={}-service={}",
59+
self.service.namespace, self.service.name
60+
)
61+
}
62+
63+
pub fn new(svc_info: ServiceInfo, all_ins: Vec<Instance>) -> Self {
64+
let mut total_weight: u64 = 0;
65+
for (_, val) in all_ins.iter().enumerate() {
66+
total_weight += val.weight as u64;
67+
}
68+
69+
Self {
70+
service: svc_info,
71+
instances: all_ins,
72+
total_weight,
73+
}
74+
}
75+
76+
pub fn get_total_weight(&self) -> u64 {
77+
self.total_weight
78+
}
5379
}
5480

5581
#[derive(Default, Debug, Clone)]

src/core/plugin/cache.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
// specific language governing permissions and limitations under the License.
1515

1616
use crate::core::config::config::Configuration;
17-
use crate::core::model::cache::{EventType, ResourceEventKey, ServerEvent};
17+
use crate::core::model::cache::{
18+
EventType, ResourceEventKey, ServerEvent, ServiceInstancesCacheItem,
19+
};
1820
use crate::core::model::config::{ConfigFile, ConfigGroup};
1921
use crate::core::model::error::PolarisError;
2022
use crate::core::model::naming::{ServiceInstances, ServiceRule, Services};
@@ -69,7 +71,7 @@ pub trait ResourceCache: Plugin {
6971
async fn load_service_instances(
7072
&self,
7173
filter: Filter,
72-
) -> Result<ServiceInstances, PolarisError>;
74+
) -> Result<ServiceInstancesCacheItem, PolarisError>;
7375
// 加载配置文件
7476
async fn load_config_file(&self, filter: Filter) -> Result<ConfigFile, PolarisError>;
7577
// 加载配置文件组
@@ -113,7 +115,7 @@ impl ResourceCache for NoopResourceCache {
113115
async fn load_service_instances(
114116
&self,
115117
filter: Filter,
116-
) -> Result<ServiceInstances, PolarisError> {
118+
) -> Result<ServiceInstancesCacheItem, PolarisError> {
117119
todo!()
118120
}
119121

src/core/plugin/loadbalance.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// specific language governing permissions and limitations under the License.
1515

1616
use crate::core::model::{
17+
error::PolarisError,
1718
loadbalance::Criteria,
1819
naming::{Instance, ServiceInstances},
1920
};
@@ -26,5 +27,9 @@ where
2627
Self: Plugin,
2728
{
2829
/// choose_instance 选择一个实例
29-
fn choose_instance(&self, criteria: Criteria, instances: ServiceInstances) -> Option<Instance>;
30+
fn choose_instance(
31+
&self,
32+
criteria: Criteria,
33+
instances: ServiceInstances,
34+
) -> Result<Instance, PolarisError>;
3035
}

src/core/plugin/plugins.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use crate::core::plugin::router::ServiceRouter;
2323
use crate::plugins::cache::memory::memory::MemoryCache;
2424
use crate::plugins::connector::grpc::connector::GrpcConnector;
2525
use crate::plugins::filter::configcrypto::crypto::ConfigFileCryptoFilter;
26+
use crate::plugins::loadbalance::random::random::WeightRandomLoadbalancer;
27+
use crate::plugins::loadbalance::ringhash::ringhash::ConsistentHashLoadBalancer;
28+
use crate::plugins::loadbalance::roundrobin::roundrobin::WeightedRoundRobinBalancer;
2629
use std::collections::HashMap;
2730
use std::fmt::Display;
2831
use std::hash::Hash;
@@ -35,6 +38,7 @@ use tokio::runtime::Runtime;
3538
use super::cache::InitResourceCacheOption;
3639
use super::connector::InitConnectorOption;
3740
use super::filter::DiscoverFilter;
41+
use super::loadbalance::LoadBalancer;
3842

3943
static SEQ: AtomicU64 = AtomicU64::new(1);
4044

@@ -173,6 +177,15 @@ impl Extensions {
173177
self.config_filters = Some(Arc::new(filters));
174178
Ok(())
175179
}
180+
181+
pub fn load_loadbalancers(&mut self) -> HashMap<String, Arc<Box<dyn LoadBalancer>>> {
182+
let mut loadbalancers = HashMap::<String, Arc<Box<dyn LoadBalancer>>>::new();
183+
for (name, supplier) in self.plugin_container.load_balancers.iter() {
184+
let lb = supplier();
185+
loadbalancers.insert(name.clone(), Arc::new(lb));
186+
}
187+
loadbalancers
188+
}
176189
}
177190

178191
pub struct PluginContainer {
@@ -181,6 +194,7 @@ pub struct PluginContainer {
181194
caches: HashMap<String, fn(InitResourceCacheOption) -> Box<dyn ResourceCache>>,
182195
discover_filters:
183196
HashMap<String, fn(serde_yaml::Value) -> Result<Box<dyn DiscoverFilter>, PolarisError>>,
197+
load_balancers: HashMap<String, fn() -> Box<dyn LoadBalancer>>,
184198
}
185199

186200
impl Default for PluginContainer {
@@ -190,6 +204,7 @@ impl Default for PluginContainer {
190204
routers: Default::default(),
191205
caches: Default::default(),
192206
discover_filters: Default::default(),
207+
load_balancers: Default::default(),
193208
};
194209

195210
return c;
@@ -201,6 +216,7 @@ impl PluginContainer {
201216
self.register_resource_cache();
202217
self.register_connector();
203218
self.register_discover_filter();
219+
self.register_load_balancer();
204220
}
205221

206222
fn register_connector(&mut self) {
@@ -227,6 +243,18 @@ impl PluginContainer {
227243
}
228244
}
229245

246+
fn register_load_balancer(&mut self) {
247+
let vec = vec![
248+
WeightRandomLoadbalancer::builder,
249+
ConsistentHashLoadBalancer::builder,
250+
WeightedRoundRobinBalancer::builder,
251+
];
252+
for c in vec {
253+
let (supplier, name) = c();
254+
self.load_balancers.insert(name, supplier);
255+
}
256+
}
257+
230258
fn get_connector_supplier(
231259
&self,
232260
name: &str,

src/discovery/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ where
9595
async fn get_one_instance(
9696
&self,
9797
req: GetOneInstanceRequest,
98-
) -> Result<InstancesResponse, PolarisError>;
98+
) -> Result<InstanceResponse, PolarisError>;
9999

100100
/// get_health_instance 拉取健康实例
101101
async fn get_health_instance(

0 commit comments

Comments
 (0)