Skip to content

Commit 929ebc1

Browse files
committed
服务器框架支持客户端连接平均分配的功能
增加了 master_dispatch 服务模板,由其接收外来客户端连接,同时将连接平均分配给后端的服务子进程
1 parent 26e96fc commit 929ebc1

File tree

80 files changed

+3502
-144
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+3502
-144
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#include "stdafx.h"
2+
#include "ManagerTimer.h"
3+
#include "ClientManager.h"
4+
#include "ServerManager.h"
5+
#include "ServerConnection.h"
6+
#include "ClientConnection.h"
7+
8+
ClientConnection::ClientConnection(acl::aio_socket_stream* conn, int ttl)
9+
: IConnection(conn)
10+
{
11+
struct timeval now;
12+
13+
gettimeofday(&now, NULL);
14+
expire_ = ((acl_uint64) now.tv_sec + ttl) * 1000000
15+
+ ((acl_uint64) now.tv_usec);
16+
}
17+
18+
ClientConnection::~ClientConnection()
19+
{
20+
conn_->close();
21+
}
22+
23+
void ClientConnection::run()
24+
{
25+
// 必须先将套接置为阻塞状态,否则接收者调用读时会立刻返回 -1
26+
acl_non_blocking(conn_->sock_handle(), ACL_BLOCKING);
27+
28+
// 调用描述字发送过程将客户端套接字传给服务端
29+
if (ManagerTimer::transfer(this) == false)
30+
// 如果传输描述字失败,则加入待处理队列,由定时器
31+
// 进行处理
32+
ClientManager::get_instance().set(this);
33+
else
34+
{
35+
// 尝试从集合中删除
36+
ClientManager::get_instance().del(this);
37+
delete this;
38+
}
39+
}
40+
41+
bool ClientConnection::expired() const
42+
{
43+
struct timeval now;
44+
45+
gettimeofday(&now, NULL);
46+
long long present = ((acl_uint64) now.tv_sec) * 1000000
47+
+ ((acl_uint64) now.tv_usec);
48+
49+
return present >= expire_ ? true : false;
50+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
#include "IConnection.h"
3+
4+
class ClientConnection : public IConnection
5+
{
6+
public:
7+
ClientConnection(acl::aio_socket_stream* conn, int ttl);
8+
~ClientConnection();
9+
10+
bool expired() const;
11+
12+
protected:
13+
// 基类纯虚函数
14+
void run();
15+
16+
private:
17+
long long int expire_;
18+
};
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "stdafx.h"
2+
#include "ClientConnection.h"
3+
#include "ClientManager.h"
4+
5+
void ClientManager::set(ClientConnection* conn)
6+
{
7+
std::vector<ClientConnection*>::iterator it = conns_.begin();
8+
for (; it != conns_.end(); ++it)
9+
{
10+
if ((*it) == conn)
11+
{
12+
logger_warn("duplicate ClientConnection!");
13+
return;
14+
}
15+
}
16+
17+
conns_.push_back(conn);
18+
}
19+
20+
void ClientManager::del(ClientConnection* conn)
21+
{
22+
std::vector<ClientConnection*>::iterator it = conns_.begin();
23+
for (; it != conns_.end(); ++it)
24+
{
25+
if ((*it) == conn)
26+
{
27+
conns_.erase(it);
28+
break;
29+
}
30+
}
31+
}
32+
33+
ClientConnection* ClientManager::pop()
34+
{
35+
std::vector<ClientConnection*>::iterator it = conns_.begin();
36+
if (it == conns_.end())
37+
return NULL;
38+
39+
ClientConnection* conn = *it;
40+
conns_.erase(it);
41+
return conn;
42+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
#include <vector>
3+
4+
class ClientConnection;
5+
6+
/**
7+
* 单例类,用来管理客户端连接对象
8+
*/
9+
class ClientManager : public acl::singleton<ClientManager>
10+
{
11+
public:
12+
ClientManager() {}
13+
~ClientManager() {}
14+
15+
/**
16+
* 添加客户端连接对象,不能重复添加相同的连接对象,
17+
* 否则,内部直接 fatal
18+
* @param conn {ClientConnection*} 非空对象
19+
*/
20+
void set(ClientConnection* conn);
21+
22+
/**
23+
* 删除客户端对象
24+
* @param conn {ClientConnection*} 非空对象
25+
*/
26+
void del(ClientConnection* conn);
27+
28+
/**
29+
* 从连接对象集合中弹出一个连接对象,并从集合中删除
30+
* @return {ClientConnection*} 如果返回空,则说明没有连接对象
31+
*/
32+
ClientConnection* pop();
33+
34+
size_t length() const
35+
{
36+
return conns_.size();
37+
}
38+
39+
private:
40+
// 存储客户端连接对象的数组集合
41+
std::vector<ClientConnection*> conns_;
42+
};
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include "stdafx.h"
2+
#include "IConnection.h"
3+
4+
int IConnection::sock_handle() const
5+
{
6+
return conn_->sock_handle();
7+
}
8+
9+
const char* IConnection::get_peer(bool full /* = true */) const
10+
{
11+
return conn_->get_peer(full);
12+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#pragma once
2+
3+
// 纯虚类,用来处理来自于客户端及服务端的连接
4+
class IConnection
5+
{
6+
public:
7+
IConnection(acl::aio_socket_stream* conn) : conn_(conn) {}
8+
virtual ~IConnection() {}
9+
10+
/**
11+
* 纯虚函数,子类必须实现
12+
*/
13+
virtual void run() = 0;
14+
15+
/**
16+
* 获得连接对象的 socket 描述符
17+
* @return {int}
18+
*/
19+
int sock_handle() const;
20+
21+
/**
22+
* 获得连接对象的地址
23+
* @return {const char*}
24+
*/
25+
const char* get_peer(bool full = true) const;
26+
27+
protected:
28+
acl::aio_socket_stream* conn_;
29+
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
include ./Makefile.in
2+
PROG = master_dispatch
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
CC = g++
2+
3+
CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \
4+
-Wno-long-long \
5+
-Wpointer-arith -Werror -Wshadow -O3 \
6+
-D_REENTRANT -D_POSIX_PTHREAD_SEMANTICS -D_USE_FAST_MACRO
7+
8+
###########################################################
9+
#Check system:
10+
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
11+
SYSLIB = -lpthread -lcrypt -lz
12+
CHECKSYSRES = @echo "Unknow system type!";exit 1
13+
UNIXNAME = $(shell uname -sm)
14+
OSTYPE = $(shell uname -p)
15+
RPATH = linux64
16+
17+
ifeq ($(CC),)
18+
CC = gcc
19+
endif
20+
21+
# For FreeBSD
22+
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
23+
ifeq ($(findstring gcc, $(CC)), gcc)
24+
CFLAGS += -Wstrict-prototypes
25+
endif
26+
CFLAGS += -DFREEBSD -D_REENTRANT
27+
SYSLIB = -lcrypt -lpthread
28+
endif
29+
30+
#Path for Linux
31+
ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
32+
ifeq ($CC, "gcc")
33+
CFLAGS += -Wstrict-prototypes
34+
endif
35+
ifeq ($(findstring i686, $(OSTYPE)), i686)
36+
RPATH = linux32
37+
endif
38+
ifeq ($(findstring x86_64, $(OSTYPE)), x86_64)
39+
RPATH = linux64
40+
endif
41+
CFLAGS += -DLINUX2 -D_REENTRANT
42+
endif
43+
44+
#Path for SunOS
45+
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
46+
ifeq ($(findstring 86, $(UNIXNAME)), 86)
47+
SYSLIB += -lsocket -lnsl -lrt
48+
endif
49+
ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u)
50+
SYSLIB += -lsocket -lnsl -lrt
51+
endif
52+
ifeq ($CC, "gcc")
53+
CFLAGS += -Wstrict-prototypes
54+
endif
55+
CFLAGS += -DSUNOS5 -D_REENTRANT
56+
endif
57+
58+
#Path for HP-UX
59+
ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX)
60+
ifeq ($CC, "gcc")
61+
CFLAGS += -Wstrict-prototypes
62+
endif
63+
CFLAGS += -DHP_UX -DHPUX11
64+
PLAT_NAME=hp-ux
65+
endif
66+
67+
#Find system type.
68+
ifneq ($(SYSPATH),)
69+
CHECKSYSRES = @echo "System is $(shell uname -sm)"
70+
endif
71+
###########################################################
72+
73+
CFLAGS += -I../../../lib_acl/include -I../../../lib_protocol/include -I../../../lib_acl_cpp/include
74+
EXTLIBS =
75+
LDFLAGS = -L../../../lib_acl_cpp/lib -l_acl_cpp -L../../../lib_protocol/lib -l_protocol -L../../../lib_acl/lib -l_acl \
76+
$(EXTLIBS) $(SYSLIB)
77+
78+
COMPILE = $(CC) $(CFLAGS)
79+
LINK = $(CC) $(OBJ) $(LDFLAGS)
80+
###########################################################
81+
OBJ_PATH = .
82+
83+
#Project's objs
84+
SRC = $(wildcard *.cpp)
85+
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
86+
87+
$(OBJ_PATH)/%.o: %.cpp
88+
$(COMPILE) $< -o $@
89+
90+
.PHONY = all clean
91+
all: RM $(OBJ)
92+
$(LINK) -o $(PROG)
93+
@echo ""
94+
@echo "All ok! Output:$(PROG)"
95+
@echo ""
96+
RM:
97+
rm -f $(PROG)
98+
clean:
99+
rm -f $(PROG)
100+
rm -f $(OBJ)
101+
install:
102+
cp $(PROG) ../../../dist/master/libexec/$(RPATH)/
103+
cp $(PROG).cf ../../../dist/master/conf/service/
104+
###########################################################
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#include "stdafx.h"
2+
#include "ClientManager.h"
3+
#include "ServerManager.h"
4+
#include "ClientConnection.h"
5+
#include "ServerConnection.h"
6+
#include "ManagerTimer.h"
7+
8+
void ManagerTimer::destroy()
9+
{
10+
delete this;
11+
}
12+
13+
bool ManagerTimer::transfer(ClientConnection* client)
14+
{
15+
ServerConnection* server;
16+
const char* peer;
17+
char buf[256];
18+
int ret;
19+
20+
// 从服务端连接管理对象中取得连接数最小的一个
21+
// 服务端对象,并将所给客户端连接传递给它,
22+
// 一直到成功或所有传输都失败为止
23+
24+
while (true)
25+
{
26+
server = ServerManager::get_instance().min();
27+
if (server == NULL)
28+
return false;
29+
30+
peer = client->get_peer();
31+
if (peer == NULL)
32+
peer = "unkonwn";
33+
memset(buf, 0, sizeof(buf));
34+
snprintf(buf, sizeof(buf), "%s", peer);
35+
36+
// 将客户端连接传递给服务端,如果失败,则尝试下一个
37+
// 服务端,同时将失败的服务端从服务端管理集合中删除
38+
ret = acl_write_fd(server->sock_handle(), buf,
39+
strlen(buf), client->sock_handle());
40+
if (ret == -1)
41+
{
42+
ServerManager::get_instance().del(server);
43+
server->close();
44+
}
45+
else
46+
server->inc_nconns();
47+
48+
return true;
49+
}
50+
}
51+
52+
void ManagerTimer::timer_callback(unsigned int)
53+
{
54+
ClientConnection* client;
55+
56+
// 从客户端管理对象弹出所有延迟待处理的客户端连接对象
57+
// 并传递给服务端,如果传递失败,则再次置入客户端管理
58+
// 对象,由下次定时器再次尝试处理
59+
60+
logger("total client: %d, total server: %d",
61+
(int) ClientManager::get_instance().length(),
62+
(int) ServerManager::get_instance().length());
63+
64+
while (true)
65+
{
66+
client = ClientManager::get_instance().pop();
67+
if (client == NULL)
68+
break;
69+
70+
if (transfer(client) == true)
71+
{
72+
ClientManager::get_instance().del(client);
73+
delete client;
74+
continue;
75+
}
76+
77+
// 如果在规定的时间内依然没有服务端准备接收连接,
78+
// 则直接删除该对象
79+
if (client->expired())
80+
{
81+
logger_warn("no server side, client(%s) expired!",
82+
client->get_peer());
83+
delete client;
84+
}
85+
else
86+
ClientManager::get_instance().set(client);
87+
break;
88+
}
89+
}

0 commit comments

Comments
 (0)