forked from MQEnergy/go-websocket
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient_hub.go
More file actions
133 lines (120 loc) · 3.66 KB
/
client_hub.go
File metadata and controls
133 lines (120 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package go_websocket
import (
"errors"
"sync"
)
type Hub struct {
Clients map[*Client]bool // 全部客户端列表 {*Client1: bool, *Client2: bool...}
SystemClients map[string][]*Client // 全部系统列表 {"systemId1": []*Clients{*Client1, *Client2...}, "systemId2": []*Clients{*Client1, *Client2...}}
GroupClients map[string][]*Client // 全部群组列表 {"groupName": []*Clients{*Client1, *Client2...}}
ClientRegister chan *Client // 客户端连接处理
ClientUnregister chan *Client // 客户端断开连接处理
ClientLock sync.RWMutex // 客户端列表读写锁
Broadcast chan []byte // 来自客户端的入站消息
GroupBroadcast chan map[string][]byte // 来自群组的入站消息 {groupName:[]byte}
}
// NewHub 实例化
func NewHub() *Hub {
return &Hub{
Clients: make(map[*Client]bool),
GroupClients: make(map[string][]*Client, 1000),
SystemClients: make(map[string][]*Client, 1000),
ClientRegister: make(chan *Client),
ClientUnregister: make(chan *Client),
Broadcast: make(chan []byte),
GroupBroadcast: make(chan map[string][]byte, 1000),
}
}
// Run
func (m *Hub) Run() {
for {
select {
case client := <-m.ClientRegister:
m.handleClientRegister(client)
case client := <-m.ClientUnregister:
m.handleClientUnregister(client)
close(client.Send)
case message := <-m.Broadcast:
for client := range m.Clients {
select {
case client.Send <- message:
default:
close(client.Send)
m.handleClientUnregister(client)
}
}
case groups := <-m.GroupBroadcast:
m.GroupBroadcastHandle(groups)
}
}
}
// handleClientRegister 客户端连接处理
func (m *Hub) handleClientRegister(client *Client) {
m.ClientLock.Lock()
m.SystemClients[client.SystemId] = append(m.SystemClients[client.SystemId], client)
if client.GroupId != "" {
m.GroupClients[client.GroupId] = append(m.GroupClients[client.GroupId], client)
}
m.Clients[client] = true
m.ClientLock.Unlock()
}
// handleClientUnregister 客户端断开连接处理
func (m *Hub) handleClientUnregister(client *Client) {
m.ClientLock.Lock()
if _, ok := m.Clients[client]; ok {
delete(m.Clients, client)
}
for index, _client := range m.SystemClients[client.SystemId] {
if _client.ClientId == client.ClientId {
m.SystemClients[client.SystemId] = append(m.SystemClients[client.SystemId][:index], m.SystemClients[client.SystemId][index+1:]...)
break
}
}
m.ClientLock.Unlock()
}
// GroupBroadcastHandle 群组消息通道处理
func (m *Hub) GroupBroadcastHandle(groups map[string][]byte) {
for gname, message := range groups {
clients, err := m.GetGroupClients(gname)
if err != nil {
m.RemoveGroup(gname)
break
}
for _, client := range clients {
select {
case client.Send <- message:
default:
close(client.Send)
m.handleClientUnregister(client)
}
}
}
}
// SetClientToGroups 添加客户端到分组
func (m *Hub) SetClientToGroups(groupName string, client *Client) bool {
clients, ok := m.GroupClients[groupName]
if !ok {
return false
}
for _, _client := range clients {
if _client.ClientId == client.ClientId {
return false
}
}
m.ClientLock.Lock()
m.GroupClients[groupName] = append(m.GroupClients[groupName], client)
m.ClientLock.Unlock()
return true
}
// GetGroupClients 获取群组的客户端列表
func (m *Hub) GetGroupClients(name string) ([]*Client, error) {
clients, ok := m.GroupClients[name]
if !ok {
return []*Client{}, errors.New("group name is not exist")
}
return clients, nil
}
// RemoveGroup 删除group和群组中的client
func (m *Hub) RemoveGroup(name string) {
delete(m.GroupClients, name)
}