Skip to content

Commit a5f096b

Browse files
committed
fix service instances expired failed
1 parent 1ec4434 commit a5f096b

File tree

6 files changed

+72
-26
lines changed

6 files changed

+72
-26
lines changed

include/polaris/plugin.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ class LocalRegistry : public Plugin {
285285
virtual ReturnCode UpdateSetCircuitBreakerData(
286286
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets) = 0;
287287

288+
virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
289+
ServiceData*& service_data,
290+
std::vector<Instance*>& open_instances) = 0;
291+
288292
/// @brief 更新服务实例状态,properties存放的是状态值,当前支持2个key
289293
///
290294
/// 1. ReadyToServe: 故障熔断标识,true or false

polaris/cache/rcu_map.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class RcuMap {
7878
~RcuMap();
7979

8080
/// @brief 根据Key获取指向Value的指针,key不存在返回NULL
81-
Value* Get(const Key& key);
81+
Value* Get(const Key& key, bool update_access_time = true);
8282

8383
/// @brief 更新Key对应的Value
8484
/// 如果key对应的value已存在,则将旧的value加入待释放列表,内部线程会延迟一定时间释放
@@ -166,20 +166,24 @@ RcuMap<Key, Value>::~RcuMap() {
166166
}
167167

168168
template <typename Key, typename Value>
169-
Value* RcuMap<Key, Value>::Get(const Key& key) {
169+
Value* RcuMap<Key, Value>::Get(const Key& key, bool update_access_time) {
170170
// 查询read map,获取结果
171171
Value* read_result = NULL;
172172
InnerMap* current_read = read_map_;
173173
typename InnerMap::iterator it = current_read->find(key);
174174
if (it != current_read->end()) { // MapValue包含的value指针在整个过程中是可能改变的
175-
it->second->used_time_ = Time::GetCurrentTimeMs();
176-
read_result = it->second->value_;
175+
if (update_access_time) {
176+
it->second->used_time_ = Time::GetCurrentTimeMs();
177+
}
178+
read_result = it->second->value_;
177179
} else {
178180
// 从read map未读到数据,则加锁进行后续操作
179181
sync::MutexGuard mutex_guard(dirty_lock_);
180182
if ((it = dirty_map_->find(key)) != dirty_map_->end()) {
181-
it->second->used_time_ = Time::GetCurrentTimeMs();
182-
read_result = it->second->value_;
183+
if (update_access_time) {
184+
it->second->used_time_ = Time::GetCurrentTimeMs();
185+
}
186+
read_result = it->second->value_;
183187
if (read_map_ == current_read) {
184188
miss_time_++; // 记录read map读失败,dirty map读成功次数
185189
}

polaris/plugin/health_checker/health_checker.cpp

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -138,38 +138,32 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
138138
}
139139

140140
ServiceData* service_data = NULL;
141-
local_registry_->GetServiceDataWithRef(service_key_, kServiceDataInstances, service_data);
142-
if (service_data == NULL) {
141+
std::vector<Instance*> health_check_instances;
142+
if (local_registry_->GetCircuitBreakerInstances(service_key_, service_data,
143+
health_check_instances) != kReturnOk) {
143144
return kReturnOk;
144145
}
145-
Service* service = service_data->GetService();
146+
146147
ServiceInstances service_instances(service_data);
147148
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
148-
std::set<std::string> target_health_check_instances;
149149

150150
if (when_ == HealthCheckerConfig::kChainWhenAlways) {
151+
health_check_instances.clear();
151152
// 健康检查设置为always, 则探测所有非隔离实例
152153
for (std::map<std::string, Instance*>::iterator instance_iter = instance_map.begin();
153154
instance_iter != instance_map.end(); ++instance_iter) {
154155
if (!instance_iter->second->isIsolate()) {
155-
target_health_check_instances.insert(instance_iter->first);
156+
health_check_instances.push_back(instance_iter->second);
156157
}
157158
}
158-
} else if (when_ == HealthCheckerConfig::kChainWhenOnRecover) {
159-
// 健康检查设置为on_recover, 则探测半开实例
160-
target_health_check_instances = service->GetCircuitBreakerOpenInstances();
159+
} else if (when_ != HealthCheckerConfig::kChainWhenOnRecover) {
160+
// 健康检查设置不为on_recover, 则探测半开实例
161+
health_check_instances.clear();
161162
}
162-
for (std::set<std::string>::iterator it = target_health_check_instances.begin();
163-
it != target_health_check_instances.end(); ++it) {
164-
const std::string& instance_id = *it;
165-
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
166-
if (iter == instance_map.end()) {
167-
POLARIS_LOG(LOG_INFO, "The health checker of service[%s/%s] getting instance[%s] failed",
168-
service_key_.namespace_.c_str(), service_key_.name_.c_str(), instance_id.c_str());
169-
continue;
170-
}
163+
164+
for (std::size_t i = 0; i < health_check_instances.size(); ++i) {
171165
bool is_detect_success = false;
172-
Instance* instance = iter->second;
166+
Instance* instance = health_check_instances[i];
173167
for (std::size_t i = 0; i < health_checker_list_.size(); ++i) {
174168
HealthChecker*& detector = health_checker_list_[i];
175169
DetectResult detector_result;
@@ -195,15 +189,16 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
195189
// 探活插件成功,则将熔断实例置为半开状态,其他实例状态不变
196190
// 探活插件失败,则将健康实例置为熔断状态,其他实例状态不变
197191
if (is_detect_success) {
198-
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerOpen,
192+
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerOpen,
199193
kCircuitBreakerHalfOpen);
200194
POLARIS_LOG(LOG_INFO,
201195
"service[%s/%s] getting instance[%s-%s:%d] detectoring success, change to "
202196
"half-open status",
203197
service_key_.namespace_.c_str(), service_key_.name_.c_str(),
204198
instance->GetId().c_str(), instance->GetHost().c_str(), instance->GetPort());
205199
} else {
206-
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerClose, kCircuitBreakerOpen);
200+
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerClose,
201+
kCircuitBreakerOpen);
207202
POLARIS_LOG(LOG_INFO,
208203
"service[%s/%s] getting instance[%s-%s:%d] detectoring failed, change to "
209204
"open status",

polaris/plugin/local_registry/local_registry.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
175175
pthread_rwlock_unlock(&notify_rwlock_);
176176
if (service_data_type == kServiceDataInstances) { // 清除实例数据时对应的服务级别插件也删除
177177
context_impl->DeleteServiceContext(expired_services[i]);
178+
DeleteServiceInLock(expired_services[i]);
178179
}
179180
}
180181
}
@@ -308,6 +309,7 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
308309
}
309310
if (service_data == NULL) { // Server Connector反注册Handler触发更新为NULL
310311
if (data_type == kServiceDataInstances) { // 删除服务实例数据时,同时删除服务
312+
context_impl->DeleteServiceContext(service_key);
311313
DeleteServiceInLock(service_key);
312314
}
313315
context_impl->GetServiceRecord()->ServiceDataDelete(service_key,
@@ -389,4 +391,37 @@ ReturnCode InMemoryRegistry::UpdateSetCircuitBreakerData(
389391
return service->WriteCircuitBreakerUnhealthySets(unhealthy_sets);
390392
}
391393

394+
ReturnCode InMemoryRegistry::GetCircuitBreakerInstances(const ServiceKey& service_key,
395+
ServiceData*& service_data,
396+
std::vector<Instance*>& open_instances) {
397+
service_data = service_instances_data_.Get(service_key, false);
398+
if (service_data == NULL) {
399+
return kReturnServiceNotFound;
400+
}
401+
if (service_data->GetDataStatus() < kDataIsSyncing) {
402+
service_data->DecrementRef();
403+
return kReturnServiceNotFound;
404+
}
405+
Service* service = service_data->GetService();
406+
ServiceInstances service_instances(service_data);
407+
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
408+
std::set<std::string> circuit_breaker_open_instance = service->GetCircuitBreakerOpenInstances();
409+
for (std::set<std::string>::iterator it = circuit_breaker_open_instance.begin();
410+
it != circuit_breaker_open_instance.end(); ++it) {
411+
const std::string& instance_id = *it;
412+
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
413+
if (iter == instance_map.end()) {
414+
POLARIS_LOG(LOG_INFO, "The outlier detector of service[%s/%s] getting instance[%s] failed",
415+
service_key.namespace_.c_str(), service_key.name_.c_str(), instance_id.c_str());
416+
continue;
417+
}
418+
open_instances.push_back(iter->second);
419+
}
420+
if (open_instances.empty()) {
421+
return kReturnInstanceNotFound;
422+
}
423+
service_data->IncrementRef();
424+
return kReturnOk;
425+
}
426+
392427
} // namespace polaris

polaris/plugin/local_registry/local_registry.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class InMemoryRegistry : public LocalRegistry {
9595
virtual ReturnCode UpdateSetCircuitBreakerData(
9696
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets);
9797

98+
virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
99+
ServiceData*& service_data,
100+
std::vector<Instance*>& open_instances);
101+
98102
virtual ReturnCode UpdateDynamicWeight(const ServiceKey& service_key,
99103
const DynamicWeightData& dynamic_weight_data);
100104

test/mock/mock_local_registry.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class MockLocalRegistry : public LocalRegistry {
5959
ReturnCode(const ServiceKey &service_key,
6060
const CircuitBreakUnhealthySetsData &cb_unhealthy_set_data));
6161

62+
MOCK_METHOD3(GetCircuitBreakerInstances,
63+
ReturnCode(const ServiceKey &service_key, ServiceData *&service_data,
64+
std::vector<Instance *> &open_instances));
65+
6266
MOCK_METHOD2(UpdateDynamicWeight, ReturnCode(const ServiceKey &service_key,
6367
const DynamicWeightData &dynamic_weight_data));
6468

0 commit comments

Comments
 (0)