项目源码 : https://github.com/sogou/workflow
更加详细的源码注释可看 : https://github.com/chanchann/workflow_annotation
协议rfc : https://datatracker.ietf.org/doc/html/rfc1035
我们上次谈http看到了,我们会先去解析域名。
template<class REQ, class RESP, typename CTX>
void WFComplexClientTask<REQ, RESP, CTX>::dispatch()
{
switch (this->state)
{
case WFT_STATE_UNDEFINED: // 第一次是这个状态
LOG_TRACE("dispatch WFT_STATE_UNDEFINED");
if (this->check_request()) // 这里直接return true // 其他如mysql这些更为复杂可重写
{
// 这里 RouteManager::RouteResult route_result_;
// 通过dns来产生request_object
if (this->route_result_.request_object) // 第一次走着初始化是空的,直接到下面产生router_task_
{
case WFT_STATE_SUCCESS: // 第二次就直接success了
LOG_TRACE("dispatch WFT_STATE_SUCCESS");
this->set_request_object(route_result_.request_object);
// 此处实际上调用了WFClientTask的父类的父类CommRequest的dispatch
// 调用scheduler->request
this->WFClientTask<REQ, RESP>::dispatch();
return;
}
// 第一次直接过来了,产生route做dns解析
// 产生一个router_task_插入到前面去做dns解析
router_task_ = this->route();
series_of(this)->push_front(this);
series_of(this)->push_front(router_task_);
// router_task_ --> WFComplexClientTask(this) 串起来
}
default:
break;
}
this->subtask_done();
}
上述逻辑非常清晰了,我们看看其中的route其实调用create_router_task返回WFRouterTask
template<class REQ, class RESP, typename CTX>
WFRouterTask *WFComplexClientTask<REQ, RESP, CTX>::route()
{
auto&& cb = std::bind(&WFComplexClientTask::router_callback,
this,
std::placeholders::_1);
struct WFNSParams params = {
.type = type_,
.uri = uri_,
.info = info_.c_str(),
.fixed_addr = fixed_addr_,
.retry_times = retry_times_,
.tracing = &tracing_,
};
if (!ns_policy_) // 刚初始化的时候为Null
{
WFNameService *ns = WFGlobal::get_name_service();
ns_policy_ = ns->get_policy(uri_.host ? uri_.host : "");
}
return ns_policy_->create_router_task(¶ms, cb);
}
其中有三个比较核心的东西,一个是router_callback,一个是WFNSParams,一个是policy
template<class REQ, class RESP, typename CTX>
void WFComplexClientTask<REQ, RESP, CTX>::router_callback(WFRouterTask *task)
{
this->state = task->get_state();
if (this->state == WFT_STATE_SUCCESS)
route_result_ = std::move(*task->get_result());
else if (this->state == WFT_STATE_UNDEFINED)
...
}此处就做两件事,设置state(这个和前面的state结合看流程)和route_result_
/src/nameservice/WFNameService.h
struct WFNSParams
{
TransportType type;
ParsedURI& uri;
const char *info;
bool fixed_addr;
int retry_times;
WFNSTracing *tracing;
};/src/manager/WFGlobal.cc
WFNameService *WFGlobal::get_name_service()
{
return __NameServiceManager::get_instance()->get_name_service();
}Global这里必然是管理的单例
class __NameServiceManager
{
public:
static __NameServiceManager *get_instance()
{
static __NameServiceManager kInstance;
return &kInstance;
}
public:
WFDnsResolver *get_dns_resolver() { return &resolver_; }
WFNameService *get_name_service() { return &service_; }
private:
WFDnsResolver resolver_;
WFNameService service_;
public:
__NameServiceManager() : service_(&resolver_) { }
};注意此处 __NameServiceManager() : service_(&resolver_) { }, &WFDnsResolver 就是下面的 WFNSPolicy *default_policy
class WFNameService
{
public:
int add_policy(const char *name, WFNSPolicy *policy);
WFNSPolicy *get_policy(const char *name);
WFNSPolicy *del_policy(const char *name);
private:
WFNSPolicy *default_policy;
struct rb_root root;
pthread_rwlock_t rwlock;
private:
struct WFNSPolicyEntry *get_policy_entry(const char *name);
public:
WFNameService(WFNSPolicy *default_policy) :
rwlock(PTHREAD_RWLOCK_INITIALIZER)
{
this->root.rb_node = NULL;
this->default_policy = default_policy;
}
};WFNSPolicy *WFNameService::get_policy(const char *name)
{
WFNSPolicy *policy = this->default_policy;
struct WFNSPolicyEntry *entry;
pthread_rwlock_rdlock(&this->rwlock);
entry = this->get_policy_entry(name);
if (entry)
policy = entry->policy;
pthread_rwlock_unlock(&this->rwlock);
return policy;
}如果没有这个name的entry,就返回默认的policy
而我们默认的policy就为WFDnsResolver
/src/nameservice/WFDnsResolver.h
class WFDnsResolver : public WFNSPolicy
{
public:
virtual WFRouterTask *create_router_task(const struct WFNSParams *params,
router_callback_t callback);
public:
WFRouterTask *create(const struct WFNSParams *params, int dns_cache_level,
unsigned int dns_ttl_default, unsigned int dns_ttl_min,
const struct EndpointParams *endpoint_params,
router_callback_t&& callback);
private:
WFResourcePool respool; // 控制并行度
private:
WFConditional *get_cond(SubTask *task)
{
static void *buf;
return this->respool.get(task, &buf);
}
void post_cond() { this->respool.post(NULL); }
public:
WFDnsResolver();
friend class WFResolverTask;
};class WFNSPolicy
{
public:
virtual WFRouterTask *create_router_task(const struct WFNSParams *params,
router_callback_t callback) = 0;
virtual void success(RouteManager::RouteResult *result,
WFNSTracing *tracing,
CommTarget *target)
{
RouteManager::notify_available(result->cookie, target);
}
virtual void failed(RouteManager::RouteResult *result,
WFNSTracing *tracing,
CommTarget *target)
{
if (target)
RouteManager::notify_unavailable(result->cookie, target);
}
public:
virtual ~WFNSPolicy() { }
};一个比较重要的就是create_router_task这个纯虚函数需要被实现
这里涉及到了RouteManager, WFNSTracing先放到后面再看什么用
我们在最后一步调用ns_policy_->create_router_task(¶ms, cb);
WFRouterTask *WFDnsResolver::create_router_task(const struct WFNSParams *params,
router_callback_t callback)
{
const auto *settings = WFGlobal::get_global_settings();
unsigned int dns_ttl_default = settings->dns_ttl_default;
unsigned int dns_ttl_min = settings->dns_ttl_min;
const struct EndpointParams *endpoint_params = &settings->endpoint_params;
int dns_cache_level = params->retry_times == 0 ? DNS_CACHE_LEVEL_2 :
DNS_CACHE_LEVEL_1;
return create(params, dns_cache_level, dns_ttl_default, dns_ttl_min,
endpoint_params, std::move(callback));
}其中EndpointParams
struct EndpointParams
{
size_t max_connections;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
bool use_tls_sni;
};WFRouterTask *
WFDnsResolver::create(const struct WFNSParams *params, int dns_cache_level,
unsigned int dns_ttl_default, unsigned int dns_ttl_min,
const struct EndpointParams *endpoint_params,
router_callback_t&& callback)
{
return new WFResolverTask(params, dns_cache_level, dns_ttl_default,
dns_ttl_min, endpoint_params,
std::move(callback));
}!!!这里实际上是new了一个WFResolverTask
其实WFDnsResolver相当于WFResolverTask的工厂
class WFResolverTask : public WFRouterTask
{
public:
WFResolverTask(const struct WFNSParams *params, int dns_cache_level,
unsigned int dns_ttl_default, unsigned int dns_ttl_min,
const struct EndpointParams *endpoint_params,
router_callback_t&& cb) :
WFRouterTask(std::move(cb))
{
...初始化
}
private:
virtual void dispatch();
virtual SubTask *done();
void thread_dns_callback(ThreadDnsTask *dns_task);
void dns_single_callback(WFDnsTask *dns_task);
static void dns_partial_callback(WFDnsTask *dns_task);
void dns_parallel_callback(const ParallelWork *pwork);
void dns_callback_internal(DnsOutput *dns_task,
unsigned int ttl_default,
unsigned int ttl_min);
private:
TransportType type_;
std::string host_;
std::string info_;
unsigned short port_;
bool first_addr_only_;
bool insert_dns_;
int dns_cache_level_;
unsigned int dns_ttl_default_;
unsigned int dns_ttl_min_;
struct EndpointParams endpoint_params_;
};一个task最为核心的当然是dispatch
解析在此
https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/other_02_dns_opt.md