-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemcache.txt
More file actions
103 lines (83 loc) · 5.41 KB
/
memcache.txt
File metadata and controls
103 lines (83 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
main:
main_base = event_init(); libevent 初始化
conn_init();
conns = calloc(max_fds, sizeof(conn *))
这个结构体放着 sfd, 和 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
struct event event; 和 short ev_flags;
memcached_thread_init(settings.num_threads, main_base);
401 typedef struct {
402 pthread_t thread_id; /* unique ID of this thread */
403 struct event_base *base; /* libevent handle this thread uses */
404 struct event notify_event; /* listen event for notify pipe */
405 int notify_receive_fd; /* receiving end of notify pipe */
406 int notify_send_fd; /* sending end of notify pipe */
407 struct thread_stats stats; /* Stats generated by this thread */
408 struct conn_queue *new_conn_queue; /* queue of new connections to handle */
409 cache_t *suffix_cache; /* suffix cache */
410 } LIBEVENT_THREAD;
设置主线程的main_base:
776 dispatcher_thread.base = main_base;
777 dispatcher_thread.thread_id = pthread_self();
为每个线程建立管道:
779 for (i = 0; i < nthreads; i++) {
780 int fds[2];
781 if (pipe(fds)) {
782 perror("Can't create notify pipe");
783 exit(1);
784 }
785
786 threads[i].notify_receive_fd = fds[0];
787 threads[i].notify_send_fd = fds[1];
788
789 setup_thread(&threads[i]);
790 /* Reserve three fds for the libevent base, and two for the pipe */
791 stats.reserved_fds += 5;
792 }
setup_thread 函数:
陆续调用 event_init, event_set, event_base_set, event_add
初始化 me->new_conn_queue = malloc(sizeof(struct conn_queue)); 连接队列 当收到主线程的任务信息, 主线程会提前把数据往队列里塞
create_worker(worker_libevent, &threads[i]);
调用 worker_libevent的四个线程
event_base_loop(me->base, 0);
此时 四个线程分别有自己的event loop 分别监听一个fd(pipe建立的)
其中 event_set的回调函数是 -------------------------thread_libevent_process-----------, 当有event到达时, 会自动调用这个函数。
调用 server_sockets
调用 server_socket
socket, bind, listen,setsockopt(SO_KEEPALIVE SO_LINGER TCP_NODELAYSO_REUSEADDR)
4582 if (!(listen_conn_add = conn_new(sfd, conn_listening,
4583 EV_READ | EV_PERSIST, 1,
4584 transport, main_base))) {
新建 conn_new, c->state状态为conn_listening
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
event_add(&c->event, 0);
将建立的新fd注册到libevent中去, --------------------回调函数是 event_handler------------------------
------------------------------- listen_conn -------------------是监听队列
启动完成
*********************************************************************************
当有新连接, libevent调用 event_handler 函数, event_handler调用drive_machine 函数
drive_machine:
主线程的event状态一直是 c->state = conn_listening
drivemachine先accept4 得到新句柄, 调用 dispatch_conn_new 把任务交给子线程
dispatch_conn_new先得到一个CQ_ITEM, 这个item是一次性分配了很多, 为了减少内存碎片。
然后采用round-robin选取一个线程, 将CQ_ITEM塞入到该线程的队列中
最后主线程往该线程的pipe管道写入了一个字符“c”, 将状态改为conn_new_cmd
break
紧接着 子线程0的thread_libevent_process触发 收到了管道信号, 读取了一个字符'c'
从new_conn_queue队列中取出一个item, 再次调用 conn_new 函数 建立新链接。
--------------------回调函数是 event_handler------------------------
*********************************************************************************
键入新命令 set a 0 0 4
线程调用 reset_cmd_handler, 初始化命令, conn_set_state(c, conn_waiting) 将状态改成 conn_waiting
循环没停止, 直接 conn_set_state(c, conn_read), 停止了。
注意, 这次转变, 线程并没有调用 read() 来读取socket, 因为采用的是水平触发, 所以libevent会不停的触发。
下一次drive_machine被触发, 调用 try_read_network函数, 该函数准备耗内存, 并且从socket中读取尽量很多的字符。
当读取成功时(返回 READ_DATA_RECEIVED), 调用 conn_set_state(c, conn_parse_cmd); 循环继续
try_read_command, memcached试着读取解析命令, 将set a 0 0 4\r\n 变成set a 0 0 4\0\n 然后调用 process_command
process_command 调用 process_update_command, set命令需要value域, 所以将状态变成 nread conn_set_state(c, conn_nread);
res = read(c->sfd, c->ritem, c->rlbytes); 因为 需要输入的字符串长度是 4+strlen("\r\n") 所以c->rlbytes是6
然后 c->rlbytes -= res; c->rlbytes 变为0 循环继续
调用 complete_nread 调用 complete_nread_ascii, 接着调用 out_string(c, "STORED");
在out_string中, conn_set_state(c, conn_write); 状态变为 conn_write
调用 transmit,最后发出系统调用sendmsg