-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient.go
More file actions
349 lines (326 loc) · 9.71 KB
/
client.go
File metadata and controls
349 lines (326 loc) · 9.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package main
import (
"fmt"
"io"
"log"
"net"
"runtime"
"strconv"
"strings"
"time"
arbor "github.com/arborchat/arbor-go"
"github.com/arborchat/muscadine/archive"
"github.com/arborchat/muscadine/session"
"github.com/arborchat/muscadine/types"
uuid "github.com/nu7hatch/gouuid"
)
const timeout = 30 * time.Second
// Connector is the type of function that connects to a server over
// a given transport.
type Connector func(address string) (io.ReadWriteCloser, error)
// TCPDial makes an unencrypted TCP connection to the given address
func TCPDial(address string) (io.ReadWriteCloser, error) {
return net.Dial("tcp", address)
}
// NetClient manages the connection to a server. It provides methods to configure
// event handlers and to connect and disconnect from the server. It also embeds
// the functionality of an Archive and Composer.
type NetClient struct {
*archive.Manager
Composer
*Notifier
address string
arbor.ReadWriteCloser
connectFunc Connector
*session.List
session.Session
disconnectHandler func(types.Connection)
receiveHandler func(*arbor.ChatMessage)
stopSending chan struct{}
stopReceiving chan struct{}
// pingServer is used to request that we attempt to force a response from the server.
// This allows us to guard against a stale connection.
pingServer chan struct{}
}
// NewNetClient creates a NetClient configured to communicate with the server at the
// given address and to use the provided archive to store the history.
func NewNetClient(address, username string, history *archive.Manager) (*NetClient, error) {
if address == "" {
return nil, fmt.Errorf("Illegal address: \"%s\"", address)
} else if username == "" {
return nil, fmt.Errorf("Illegal username: \"%s\"", username)
} else if history == nil {
return nil, fmt.Errorf("Illegal archive: %v", history)
}
composerOut := make(chan *arbor.ProtocolMessage)
stopSending := make(chan struct{})
stopReceiving := make(chan struct{})
sessionID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("Couldn't generate session id: %s", err)
}
nc := &NetClient{
address: address,
Manager: history,
connectFunc: TCPDial,
Composer: Composer{username: username, sendChan: composerOut},
stopSending: stopSending,
stopReceiving: stopReceiving,
List: session.NewList(),
Session: session.Session{ID: sessionID.String()},
pingServer: make(chan struct{}),
}
return nc, nil
}
// SetConnector changes the function used to connect to a server address. This is
// useful both for testing purposes and to change the transport mechanism of the
// io.ReadWriteCloser. To avoid race conditions, change this before calling
// Connect() for the first time.
func (nc *NetClient) SetConnector(connector Connector) {
nc.connectFunc = connector
}
// OnDisconnect sets the handler for disconnections. This should be done before
// calling Connect() for the first time to avoid race conditions.
func (nc *NetClient) OnDisconnect(handler func(types.Connection)) {
nc.disconnectHandler = handler
}
// OnReceive sets the handler for when ChatMessages are received. This should be done before
// calling Connect() for the first time to avoid race conditions.
func (nc *NetClient) OnReceive(handler func(*arbor.ChatMessage)) {
nc.receiveHandler = handler
}
// Connect resolves the address of the NetClient and attempts to establish a connection.
func (nc *NetClient) Connect() error {
conn, err := nc.connectFunc(nc.address)
if err != nil {
return err
}
nc.ReadWriteCloser, err = arbor.NewProtocolReadWriter(conn)
if err != nil {
return err
}
go nc.send()
go nc.receive()
return nil
}
// Disconnect stops all communication with the server and closes the connection. It invokes
// the handler set by OnDisconnect, if there is one.
func (nc *NetClient) Disconnect() error {
err := nc.ReadWriteCloser.Close()
if nc.disconnectHandler != nil {
go nc.disconnectHandler(nc)
}
go func() {
nc.stopSending <- struct{}{}
nc.stopReceiving <- struct{}{}
}()
return err
}
// send reads messages from the Composer and sends them to the server.
func (nc *NetClient) send() {
errored := false
for {
select {
case protoMessage := <-nc.Composer.sendChan:
err := nc.ReadWriteCloser.Write(protoMessage)
if !errored && err != nil {
errored = true
log.Println("Error writing to server:", err)
go nc.Disconnect()
} else if errored {
continue
}
case <-nc.pingServer:
// query for the root message
root, _ := nc.Archive.Root()
go nc.Composer.Query(root)
go nc.Composer.AskWho()
case <-nc.stopSending:
return
}
}
}
// readChannel spawns its own goroutine to read from the NetClient's connection.
// You can stop the goroutine by closing the channel that it returns.
func (nc *NetClient) readChannel() chan struct {
*arbor.ProtocolMessage
error
} {
out := make(chan struct {
*arbor.ProtocolMessage
error
})
// read messages continuously and send results back on a channel
go func() {
defer func() {
// ensure send on closed channel doesn't cause panic
if err := recover(); err != nil {
if _, ok := err.(runtime.Error); !ok {
// silently cancel runtime errors, but allow other errors
// to propagate.
panic(err)
}
}
}()
for {
m := new(arbor.ProtocolMessage)
err := nc.ReadWriteCloser.Read(m)
out <- struct {
*arbor.ProtocolMessage
error
}{m, err}
}
}()
return out
}
// handleMessage processes an Arbor ProtocolMessage. The actions
// taken vary by the type of ProtocolMessage.
func (nc *NetClient) handleMessage(m *arbor.ProtocolMessage) {
switch m.Type {
case arbor.NewMessageType:
if !nc.Archive.Has(m.UUID) {
if nc.receiveHandler != nil {
nc.receiveHandler(m.ChatMessage)
// ask Notifier to handle the message
nc.Notifier.Handle(nc, m.ChatMessage)
}
if m.Parent != "" && !nc.Archive.Has(m.Parent) {
nc.Query(m.Parent)
}
}
case arbor.WelcomeType:
if !nc.Has(m.Root) {
nc.Query(m.Root)
}
for _, recent := range m.Recent {
if !nc.Has(recent) {
nc.Query(recent)
}
}
case arbor.MetaType:
nc.HandleMeta(m.Meta)
}
}
// SessionID returns the unique identifier for this session.
func (nc *NetClient) SessionID() string {
return nc.Session.ID
}
// parsePresence processes "presence/here" META values into their constituent parts.
// These values take the form "username\nsessionID\ntimestamp", where username is the user
// who is advertising their presence, sessionID is the unique identifier for their session,
// and timestamp is the UNIX epoch time at which they announced their presence.
func parsePresence(value string) (username, sessionID string, timestamp time.Time, err error) {
parts := strings.Split(value, "\n")
if len(parts) < 3 {
err = fmt.Errorf("invalid presence/here message: %s", value)
return
}
username = parts[0]
if username == "" {
err = fmt.Errorf("Username cannot be the empty string")
return
}
sessionID = parts[1]
if sessionID == "" {
err = fmt.Errorf("SessionID cannot be the empty string")
return
}
timeString := parts[2]
timeInt, err := strconv.Atoi(timeString)
if err != nil {
err = fmt.Errorf("Error decoding timestamp in presence/here message: %s", value)
return
}
timestamp = time.Unix(int64(timeInt), 0)
return
}
// HandleMeta implements META message protocol extension handlers.
func (nc *NetClient) HandleMeta(meta map[string]string) {
for key, value := range meta {
switch key {
case "presence/who":
nc.Composer.AnnounceHere(nc.Session.ID)
case "presence/here":
username, sessionID, timestamp, err := parsePresence(value)
if err != nil {
log.Println("error parsing presence/here message", err)
continue
}
if username == nc.username && sessionID == nc.Session.ID {
// don't track our own session
continue
}
err = nc.List.Track(username, session.Session{ID: sessionID, LastSeen: timestamp})
if err != nil {
log.Println("Error updating session", err)
continue
}
log.Printf("Tracking session (id=%s) for user %s\n", sessionID, username)
case "presence/leave":
username, sessionID, _, err := parsePresence(value)
if err != nil {
log.Println("error parsing presence/leave message", err)
continue
}
if username == nc.username && sessionID == nc.Session.ID {
// don't remove our own session
continue
}
err = nc.List.Remove(username, sessionID)
if err != nil {
log.Println("Error removing session", err)
continue
}
log.Printf("Removed session (id=%s) for user %s\n", sessionID, username)
default:
log.Println("Unknown meta key:", key)
}
}
}
// recieve monitors for new messages and for connection staleness.
// If the connection with the server gets too stale, receive will close
// it automatically.
func (nc *NetClient) receive() {
errored := false
tick := time.NewTimer(timeout)
defer tick.Stop()
ticks := 0
out := nc.readChannel()
defer close(out)
for {
select {
case <-nc.stopReceiving:
return
case <-tick.C:
ticks++
if ticks == 1 {
// we haven't heard from the server in 30 seconds,
// try to interact.
nc.pingServer <- struct{}{}
log.Println("No server contact in 30 seconds, pinging...")
} else if ticks > 1 {
// we haven't heard from the server in a minute,
// we're probably disconnected.
go nc.Disconnect()
log.Println("No server contact in 60 seconds, disconnecting")
}
case readMsg := <-out:
// reset our ticker to wait until 30 seconds from when we
// received this message.
tick.Reset(timeout)
ticks = 0
// check for errors
m := readMsg.ProtocolMessage
err := readMsg.error
if !errored && err != nil {
errored = true
log.Println("Error reading from server:", err)
go nc.Disconnect()
} else if errored {
continue
}
// process the message
nc.handleMessage(m)
}
}
}