Skip to content

Commit 7467010

Browse files
committed
refactor:熔断器流程调整
1 parent b041046 commit 7467010

File tree

12 files changed

+214
-33
lines changed

12 files changed

+214
-33
lines changed

examples/discover.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
1717

1818
use polaris_rust::{core::{
1919
context::SDKContext,
20-
model::{error::PolarisError, naming::Location},
20+
model::{error::PolarisError, loadbalance::Criteria, naming::Location, router::RouteInfo},
2121
}, discovery::{
2222
api::{new_consumer_api_by_context, new_provider_api_by_context, ConsumerAPI, ProviderAPI},
2323
req::{
24-
GetAllInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest,
25-
WatchInstanceRequest,
24+
GetAllInstanceRequest, GetOneInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest, WatchInstanceRequest
2625
},
2726
}, error, info};
2827
use tracing::level_filters::LevelFilter;
@@ -152,6 +151,30 @@ async fn main() -> Result<(), PolarisError> {
152151
}
153152
}
154153

154+
// 执行路由以及负载均衡能力
155+
let mut route_info = RouteInfo::default();
156+
157+
let ret = consumer.get_one_instance(GetOneInstanceRequest{
158+
flow_id: uuid::Uuid::new_v4().to_string(),
159+
timeout: Duration::from_secs(10),
160+
namespace: "rust-demo".to_string(),
161+
service: "polaris-rust-provider".to_string(),
162+
criteria: Criteria{
163+
policy: "random".to_string(),
164+
hash_key: "".to_string(),
165+
},
166+
route_info: route_info,
167+
}).await;
168+
169+
match ret {
170+
Err(err) => {
171+
tracing::error!("get one instance fail: {}", err.to_string());
172+
}
173+
Ok(instance) => {
174+
tracing::info!("get one instance: {:?}", instance);
175+
}
176+
}
177+
155178
for _ in 0..120 {
156179
std::thread::sleep(Duration::from_secs(1));
157180
}

src/circuitbreaker/api.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,19 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16-
use std::sync::Arc;
16+
use std::{sync::Arc, time::Duration};
1717

1818
use crate::core::{
1919
flow::CircuitBreakerFlow,
2020
model::{
21-
circuitbreaker::{CallAbortedError, CheckResult, Resource, ResourceStat},
21+
circuitbreaker::{CallAbortedError, CheckResult, MethodResource, Resource, ResourceStat, RetStatus, ServiceResource},
2222
error::PolarisError,
2323
},
2424
};
2525

2626
use super::req::{RequestContext, ResponseContext};
2727

28+
/// CircuitBreakerAPI .
2829
#[async_trait::async_trait]
2930
pub trait CircuitBreakerAPI
3031
where
@@ -41,8 +42,11 @@ where
4142
) -> Result<Arc<InvokeHandler>, PolarisError>;
4243
}
4344

45+
/// InvokeHandler .
4446
pub struct InvokeHandler {
47+
// req_ctx: 请求上下文
4548
req_ctx: RequestContext,
49+
// flow: 熔断器流程
4650
flow: Arc<CircuitBreakerFlow>,
4751
}
4852

@@ -53,14 +57,87 @@ impl InvokeHandler {
5357

5458
/// acquire_permission 检查当前请求是否可放通
5559
async fn acquire_permission(&self) -> Result<(), CallAbortedError> {
56-
Ok(())
60+
let svc_res = ServiceResource::new_waith_caller(
61+
self.req_ctx.caller_service.clone(),
62+
self.req_ctx.callee_service.clone());
63+
64+
match self.flow.check_resource(Resource::ServiceResource(svc_res)).await {
65+
Ok(ret) => {
66+
if ret.pass {
67+
Ok(())
68+
} else {
69+
Err(CallAbortedError::new(ret.rule_name, ret.fallback_info))
70+
}
71+
},
72+
Err(e) => {
73+
// 内部异常,不触发垄断,但是需要记录
74+
crate::error!("[circuitbreaker][invoke] check resource failed: {:?}", e);
75+
Ok(())
76+
},
77+
}
5778
}
5879

5980
async fn on_success(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
60-
Ok(())
81+
let cost = rsp.duration.clone();
82+
let mut code = -1 as i32;
83+
let mut status = RetStatus::RetSuccess;
84+
85+
if let Some(r) = &self.req_ctx.result_to_code {
86+
code = r.on_success(rsp.result.unwrap());
87+
}
88+
if let Some(e) = rsp.error {
89+
let ret = e.downcast::<CallAbortedError>();
90+
if ret.is_ok() {
91+
status = RetStatus::RetReject;
92+
}
93+
}
94+
self.common_report(cost, code, status).await
6195
}
6296

6397
async fn on_error(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
64-
Ok(())
98+
let cost = rsp.duration.clone();
99+
let mut code = 0 as i32;
100+
let mut status = RetStatus::RetUnknown;
101+
102+
if let Some(r) = &self.req_ctx.result_to_code {
103+
code = r.on_success(rsp.result.unwrap());
104+
}
105+
self.common_report(cost, code, status).await
106+
}
107+
108+
async fn common_report(&self, cost: Duration, code: i32, status: RetStatus) -> Result<(), PolarisError> {
109+
let stat = ResourceStat {
110+
resource: Resource::ServiceResource(ServiceResource::new_waith_caller(
111+
self.req_ctx.caller_service.clone(),
112+
self.req_ctx.callee_service.clone())),
113+
ret_code: code.to_string(),
114+
delay: cost,
115+
status: status.clone(),
116+
};
117+
118+
let ret = self.flow.report_stat(stat).await;
119+
if ret.is_err() {
120+
crate::error!("[circuitbreaker][invoke] report stat failed");
121+
return ret;
122+
}
123+
124+
if self.req_ctx.path.is_empty() {
125+
return Ok(());
126+
}
127+
128+
// 补充一个接口级别的数据上报
129+
let stat = ResourceStat {
130+
resource: Resource::MethodResource(MethodResource::new_waith_caller(
131+
self.req_ctx.caller_service.clone(),
132+
self.req_ctx.callee_service.clone(),
133+
self.req_ctx.protocol.clone(),
134+
self.req_ctx.method.clone(),
135+
self.req_ctx.path.clone(),
136+
)),
137+
ret_code: code.to_string(),
138+
delay: cost,
139+
status,
140+
};
141+
self.flow.report_stat(stat).await
65142
}
66143
}

src/circuitbreaker/req.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ pub trait ResultToErrorCode
2121
where
2222
Self: Send + Sync,
2323
{
24-
fn on_success(&self, ret: dyn Any) -> i32;
25-
fn on_error(&self, err: dyn Any) -> i32;
24+
fn on_success(&self, ret: Box<dyn Any>) -> i32;
25+
fn on_error(&self, err: Box<dyn Any>) -> i32;
2626
}
2727

2828
pub struct RequestContext {
@@ -31,7 +31,7 @@ pub struct RequestContext {
3131
pub protocol: String,
3232
pub method: String,
3333
pub path: String,
34-
pub result_to_code: Box<dyn ResultToErrorCode>,
34+
pub result_to_code: Option<Box<dyn ResultToErrorCode>>,
3535
}
3636

3737
pub struct ResponseContext {

src/core/context.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
// specific language governing permissions and limitations under the License.
1515

1616
use std::sync::Arc;
17-
use tracing::log::{log, logger};
1817
use crate::core::config::config::{load_default, Configuration};
1918
use crate::core::engine::Engine;
20-
use crate::core::logger;
2119
use crate::core::model::error::{ErrorCode, PolarisError};
2220
use crate::info;
2321

src/core/engine.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@ use tokio::runtime::{Builder, Runtime};
2121
use super::flow::{CircuitBreakerFlow, ClientFlow, RouterFlow};
2222
use super::model::config::{ConfigFile, ConfigGroup};
2323
use super::model::naming::{ServiceContractRequest, ServiceInstances};
24-
use super::model::router::RouteInfo;
2524
use super::model::ClientContext;
2625
use super::plugin::cache::{Filter, ResourceCache, ResourceListener};
2726
use super::plugin::connector::Connector;
28-
use super::plugin::loadbalance::LoadBalancer;
2927
use super::plugin::location::{LocationProvider, LocationSupplier};
30-
use super::plugin::router::RouteContext;
3128
use crate::config::req::{
3229
CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, PublishConfigFileRequest,
3330
UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest,

src/core/flow.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl CircuitBreakerFlow {
144144
}
145145
}
146146

147+
/// RouterFlow 路由流程
147148
pub struct RouterFlow {
148149
load_balancer: Arc<RwLock<HashMap<String, Arc<Box<dyn LoadBalancer>>>>>,
149150
extensions: Arc<Extensions>,
@@ -230,6 +231,15 @@ impl RouterFlow {
230231
}
231232
}
232233

233-
pub struct ReatelimitFlow {
234+
/// RatelimitFlow 限流流程
235+
pub struct RatelimitFlow {
234236
extensions: Arc<Extensions>,
235237
}
238+
239+
impl RatelimitFlow {
240+
pub fn new(extensions: Arc<Extensions>) -> Self {
241+
Self {
242+
extensions,
243+
}
244+
}
245+
}

src/core/model/circuitbreaker.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
use std::{
1717
fmt::{self, format, Display},
1818
iter::Map,
19-
str,
19+
str, time::Duration,
2020
};
2121

22-
use super::error::{ErrorCode, PolarisError};
22+
use super::{error::{ErrorCode, PolarisError}, naming::ServiceKey};
2323

2424
#[derive(Debug, PartialEq, Eq, Clone)]
2525
pub enum Status {
@@ -43,6 +43,7 @@ pub struct CircuitBreakerStatus {
4343
pub destroy: bool,
4444
}
4545

46+
#[derive(Debug, PartialEq, Eq, Clone)]
4647
pub enum RetStatus {
4748
RetUnknown,
4849
RetSuccess,
@@ -55,7 +56,7 @@ pub enum RetStatus {
5556
pub struct ResourceStat {
5657
pub resource: Resource,
5758
pub ret_code: String,
58-
pub delay: u32,
59+
pub delay: Duration,
5960
pub status: RetStatus,
6061
}
6162

@@ -65,13 +66,44 @@ pub enum Resource {
6566
InstanceResource(InstanceResource),
6667
}
6768

68-
pub struct ServiceResource {}
69+
/// ServiceResource 服务资源
70+
pub struct ServiceResource {
71+
pub callee: ServiceKey,
72+
pub caller: Option<ServiceKey>,
73+
}
74+
75+
impl ServiceResource {
76+
77+
pub fn new(callee: ServiceKey) -> Self {
78+
ServiceResource { caller: None, callee }
79+
}
6980

70-
impl ServiceResource {}
81+
pub fn new_waith_caller(caller: ServiceKey, callee: ServiceKey) -> Self {
82+
ServiceResource { caller: Some(caller), callee }
83+
}
7184

72-
pub struct MethodResource {}
85+
}
7386

74-
impl MethodResource {}
87+
/// MethodResource 方法资源
88+
pub struct MethodResource {
89+
pub callee: ServiceKey,
90+
pub caller: Option<ServiceKey>,
91+
pub protocol: String,
92+
pub method: String,
93+
pub path: String,
94+
}
95+
96+
impl MethodResource {
97+
98+
pub fn new(callee: ServiceKey, protocol: String, method: String, path: String) -> Self {
99+
MethodResource { caller: None, callee, protocol, method, path }
100+
}
101+
102+
pub fn new_waith_caller(caller: ServiceKey, callee: ServiceKey, protocol: String, method: String, path: String) -> Self {
103+
MethodResource { caller: Some(caller), callee, protocol, method, path }
104+
}
105+
106+
}
75107

76108
pub struct InstanceResource {}
77109

src/discovery/api.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ where
122122
async fn report_service_call(&self, req: ServiceCallResult);
123123
}
124124

125+
/// new_lossless_api 创建优雅上下线客户端实例
125126
pub(crate) fn new_lossless_api() -> Result<impl LosslessAPI, PolarisError> {
126127
let context_ret = SDKContext::default();
127128
if context_ret.is_err() {
@@ -131,12 +132,14 @@ pub(crate) fn new_lossless_api() -> Result<impl LosslessAPI, PolarisError> {
131132
Ok(DefaultLosslessAPI::new(context_ret.unwrap()))
132133
}
133134

135+
/// new_lossless_api_by_context 创建优雅上下线客户端实例
134136
pub(crate) fn new_lossless_api_by_context(
135137
context: SDKContext,
136138
) -> Result<Arc<dyn LosslessAPI>, PolarisError> {
137139
Ok(Arc::new(DefaultLosslessAPI::new(context)))
138140
}
139141

142+
/// LosslessAPI 负责优雅上下线客户端的生命周期管理
140143
pub(crate) trait LosslessAPI {
141144
fn set_action_provider(
142145
&self,

src/discovery/default.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tokio::task::JoinHandle;
2222

2323
use crate::core::context::SDKContext;
2424
use crate::core::model::error::PolarisError;
25-
use crate::core::model::naming::ServiceInstancesChangeEvent;
25+
use crate::core::model::naming::{ServiceInstancesChangeEvent, ServiceKey};
2626
use crate::core::plugin::cache::ResourceListener;
2727
use crate::discovery::api::{ConsumerAPI, LosslessAPI, ProviderAPI};
2828
use crate::discovery::req::{
@@ -176,6 +176,13 @@ impl ConsumerAPI for DefaultConsumerAPI {
176176
)
177177
.await;
178178

179+
// 重新设置被调服务数据信息
180+
let mut route_info = req.route_info;
181+
route_info.callee = ServiceKey{
182+
namespace: req.namespace.clone(),
183+
name: req.service.clone(),
184+
};
185+
179186
match rsp {
180187
Ok(rsp) => {
181188
let instances = rsp.instances;
@@ -186,7 +193,7 @@ impl ConsumerAPI for DefaultConsumerAPI {
186193
.router_api
187194
.router(ProcessRouteRequest {
188195
service_instances: instances,
189-
route_info: req.route_info,
196+
route_info: route_info,
190197
})
191198
.await;
192199

0 commit comments

Comments
 (0)