Skip to content

Commit ce17eb7

Browse files
committed
Fixed internal communications problems including missing removal of adapted newline symbol. Also made InternalSend reliable towards reconnections.
1 parent d369a99 commit ce17eb7

File tree

7 files changed

+136
-137
lines changed

7 files changed

+136
-137
lines changed

comm/connection.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// ReliableConnect attempts to connect to defined remote node
1414
// as longs as the error from previous attempts is possible
1515
// to be dealt with.
16-
func ReliableConnect(remoteName string, remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.Conn, error) {
16+
func ReliableConnect(remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.Conn, error) {
1717

1818
var err error
1919
var c *tls.Conn
@@ -33,30 +33,32 @@ func ReliableConnect(remoteName string, remoteAddr string, tlsConfig *tls.Config
3333
if err.Error() == okError {
3434
time.Sleep(time.Duration(retry) * time.Millisecond)
3535
} else {
36-
return nil, fmt.Errorf("Could not connect to port of node '%s' because of: %s\n", remoteName, err.Error())
36+
return nil, fmt.Errorf("could not connect to port of node '%s' because of: %s", remoteAddr, err.Error())
3737
}
3838
}
3939
}
4040

41-
log.Printf("Successfully connected to worker node '%s'.\n", remoteName)
42-
4341
return c, nil
4442
}
4543

46-
// ReliableSend sends text to other node specified and
47-
// tries to reconnect in case of simple disconnects.
48-
func ReliableSend(conn *tls.Conn, text string, remoteName string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error {
44+
// ReliableSend attempts to transmit a message inbetween
45+
// pluto nodes. If the first attempt fails, the node will
46+
// try to reconnect and resend the message until successfully
47+
// transmitted.
48+
func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error {
4949

50-
var err error
51-
var replacedConn *tls.Conn
50+
// TODO: Make this routine first check for closed
51+
// pipe and after that attempting 'ping' test
52+
// with exponential (?) backoff for write deadline
53+
// up to timeout.
5254

5355
// Set configured timeout on waiting for response.
5456
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Millisecond))
5557

5658
// Test long-lived connection.
57-
_, err = conn.Write([]byte("> ping <\r\n"))
59+
_, err := conn.Write([]byte("> ping <\r\n"))
5860
if err != nil {
59-
return fmt.Errorf("sending ping to node '%s' failed with: %s\n", remoteName, err.Error())
61+
return fmt.Errorf("sending ping to node '%s' failed with: %s\n", remoteAddr, err.Error())
6062
}
6163

6264
// Wait for configured time to pass.
@@ -69,23 +71,25 @@ func ReliableSend(conn *tls.Conn, text string, remoteName string, remoteAddr str
6971
_, err = fmt.Fprintf(conn, "%s\r\n", text)
7072
for err != nil {
7173

72-
log.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteName)
74+
log.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteAddr)
7375

7476
// Define an error we can deal with.
75-
okError := fmt.Sprintf("write tcp %s->%s: write: broken pipe", conn.LocalAddr(), conn.RemoteAddr())
77+
okError := fmt.Sprintf("write tcp %s->%s: write: broken pipe", conn.LocalAddr().String(), remoteAddr)
7678

7779
if err.Error() == okError {
7880

7981
// Connection was lost. Reconnect.
80-
replacedConn, err = ReliableConnect(remoteName, remoteAddr, tlsConfig, retry)
82+
conn, err = ReliableConnect(remoteAddr, tlsConfig, retry)
8183
if err != nil {
82-
return fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
84+
return fmt.Errorf("could not reestablish connection with '%s': %s", remoteAddr, err.Error())
8385
}
8486

85-
// Retry transfer.
86-
_, err = fmt.Fprintf(replacedConn, "%s\r\n", text)
87+
log.Printf("[comm.ReliableSend] Reconnected to '%s'.\n", remoteAddr)
88+
89+
// Resend message.
90+
_, err = fmt.Fprintf(conn, "%s\r\n", text)
8791
} else {
88-
return fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
92+
return fmt.Errorf("could not reestablish connection with '%s': %s", remoteAddr, err.Error())
8993
}
9094
}
9195

comm/receiver.go

Lines changed: 29 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
217217
// Wait for signal.
218218
<-downRecv
219219

220-
log.Printf("[comm.Shutdown] receiver: shutting down...\n")
220+
log.Printf("[comm.Shutdown] Receiver: shutting down...\n")
221221

222222
// Instruct other goroutines to shutdown.
223223
recv.shutdown <- struct{}{}
@@ -237,7 +237,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
237237
recv.socket.Close()
238238
recv.lock.Unlock()
239239

240-
log.Printf("[comm.Shutdown] receiver: done!\n")
240+
log.Printf("[comm.Shutdown] Receiver: done!\n")
241241
}
242242

243243
// IncVClockEntry waits for an incoming name of a node on
@@ -309,25 +309,13 @@ func (recv *Receiver) AcceptIncMsgs() error {
309309
// routine on exiting this function.
310310
defer recv.wg.Done()
311311

312-
// Initilize channel to signal end to
313-
// storing goroutines further below.
314-
downStoring := make(chan struct{})
315-
316-
// Count how many storing routines were spawned.
317-
downNumber := 0
318-
319312
for {
320313

321314
select {
322315

323316
// Check if a shutdown signal was sent.
324317
case <-recv.shutdown:
325318

326-
// Send shutdown signal to each storing routine.
327-
for i := 0; i < downNumber; i++ {
328-
downStoring <- struct{}{}
329-
}
330-
331319
// Close file descriptor.
332320
recv.lock.Lock()
333321
recv.writeLog.Close()
@@ -343,104 +331,64 @@ func (recv *Receiver) AcceptIncMsgs() error {
343331
return fmt.Errorf("[comm.AcceptIncMsgs] Accepting incoming sync messages at %s failed with: %s\n", recv.name, err.Error())
344332
}
345333

346-
// Dispatch into own goroutine.
347-
downNumber += 1
348-
go recv.StoreIncMsgs(conn, downStoring)
334+
go recv.StoreIncMsgs(conn)
349335
}
350336
}
351337
}
352338

353339
// StoreIncMsgs takes received message string and saves
354340
// it into incoming CRDT message log file.
355-
func (recv *Receiver) StoreIncMsgs(conn net.Conn, downStoring chan struct{}) {
341+
func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
356342

357343
var err error
358344

359345
// Initial value for received message in order
360346
// to skip past the mandatory ping message.
361-
msgRaw := "> ping <\r\n"
347+
msgRaw := "> ping <"
362348

363349
// Create new buffered reader for connection.
364350
r := bufio.NewReader(conn)
365351

366-
for msgRaw == "> ping <\r\n" {
352+
for msgRaw == "> ping <" {
367353

368354
// Read string until newline character is received.
369355
msgRaw, err = r.ReadString('\n')
370356
if err != nil {
371357

372358
if err.Error() == "EOF" {
373-
374-
// Error caused by disconnect. Do not crash.
375-
log.Printf("[comm.StoreIncMsgs] Node at %s disconnected...\n", conn.RemoteAddr())
376-
377-
// Simply end this function.
378-
msgRaw = "> done <\r\n"
359+
log.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
360+
return
379361
} else {
380362
log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
381363
}
382364
}
383-
}
384-
385-
// Unless we do not receive the signal that continuous CRDT
386-
// message transmission is done, we accept new messages.
387-
for msgRaw != "> done <\r\n" {
388365

389-
// Lock mutex.
390-
recv.lock.Lock()
391-
392-
// Write it to message log file.
393-
_, err = recv.writeLog.WriteString(msgRaw)
394-
if err != nil {
395-
log.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
396-
}
397-
398-
// Save to stable storage.
399-
err = recv.writeLog.Sync()
400-
if err != nil {
401-
log.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
402-
}
403-
404-
// Unlock mutex.
405-
recv.lock.Unlock()
406-
407-
// Indicate to applying routine that a new message
408-
// is available to process.
409-
if len(recv.msgInLog) < 1 {
410-
recv.msgInLog <- struct{}{}
411-
}
412-
413-
select {
414-
415-
case <-downStoring:
416-
417-
// Break from inifinte loop.
418-
msgRaw = "> done <\r\n"
419-
420-
default:
421-
422-
// Reset message to expected ping.
423-
msgRaw = "> ping <\r\n"
366+
// Remove trailing characters denoting line end.
367+
msgRaw = strings.TrimRight(msgRaw, "\r\n")
368+
}
424369

425-
for msgRaw == "> ping <\r\n" {
370+
// Lock mutex.
371+
recv.lock.Lock()
426372

427-
// Read next CRDT message until newline character is received.
428-
msgRaw, err = r.ReadString('\n')
429-
if err != nil {
373+
// Write it to message log file.
374+
_, err = recv.writeLog.WriteString(msgRaw)
375+
if err != nil {
376+
log.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
377+
}
430378

431-
if err.Error() == "EOF" {
379+
// Save to stable storage.
380+
err = recv.writeLog.Sync()
381+
if err != nil {
382+
log.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
383+
}
432384

433-
// Error caused by disconnect. Do not crash.
434-
log.Printf("[comm.StoreIncMsgs] Node at %s disconnected...\n", conn.RemoteAddr())
385+
// Unlock mutex.
386+
recv.lock.Unlock()
435387

436-
// Simply end this function.
437-
msgRaw = "> done <\r\n"
438-
} else {
439-
log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
440-
}
441-
}
442-
}
443-
}
388+
// Indicate to applying routine that a new message
389+
// is available to process.
390+
if len(recv.msgInLog) < 1 {
391+
recv.msgInLog <- struct{}{}
444392
}
445393
}
446394

comm/sender.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,16 +282,16 @@ func (sender *Sender) SendMsgs() {
282282
// Unlock mutex.
283283
sender.lock.Unlock()
284284

285-
for nodeName, nodeAddr := range sender.nodes {
285+
for _, nodeAddr := range sender.nodes {
286286

287287
// Connect to node.
288-
conn, err := ReliableConnect(nodeName, nodeAddr, sender.tlsConfig, sender.intlConnRetry)
288+
conn, err := ReliableConnect(nodeAddr, sender.tlsConfig, sender.intlConnRetry)
289289
if err != nil {
290290
log.Fatalf("[comm.SendMsgs] Failed to connect to %s: %s\n", err.Error())
291291
}
292292

293293
// Send payload reliably to other nodes.
294-
err = ReliableSend(conn, marshalledMsg, nodeName, nodeAddr, sender.tlsConfig, sender.intlConnTimeout, sender.intlConnRetry)
294+
err = ReliableSend(conn, marshalledMsg, nodeAddr, sender.tlsConfig, sender.intlConnTimeout, sender.intlConnRetry)
295295
if err != nil {
296296
log.Fatalf("[comm.SendMsgs] Failed to send: %s\n", err.Error())
297297
}

imap/connection.go

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"crypto/tls"
1010
"path/filepath"
11+
12+
"github.com/numbleroot/pluto/comm"
1113
)
1214

1315
// Constants
@@ -35,14 +37,15 @@ type IMAPState int
3537
// a pluto node that only authenticates and proxies
3638
// IMAP connections.
3739
type Connection struct {
38-
IncConn *tls.Conn
39-
IncReader *bufio.Reader
40-
OutConn *tls.Conn
41-
OutReader *bufio.Reader
42-
OutIP string
43-
OutPort string
44-
ClientID string
45-
UserName string
40+
IncConn *tls.Conn
41+
IncReader *bufio.Reader
42+
OutConn *tls.Conn
43+
OutReader *bufio.Reader
44+
IntlTLSConfig *tls.Config
45+
IntlConnRetry int
46+
OutAddr string
47+
ClientID string
48+
UserName string
4649
}
4750

4851
// IMAPConnection contains additional elements needed
@@ -83,8 +86,8 @@ func (c *Connection) Send(inc bool, text string) error {
8386

8487
// InternalSend is used by nodes of the pluto system to
8588
// successfully transmit a message to another node or fail
86-
// definitely. This prevents further handler advancement
87-
// in case a link failed.
89+
// definitely if no reconnection is possible. This prevents
90+
// further handler advancement in case a link failed.
8891
func (c *Connection) InternalSend(inc bool, text string) error {
8992

9093
// Check which attached connection should be used.
@@ -96,13 +99,51 @@ func (c *Connection) InternalSend(inc bool, text string) error {
9699
// Test if connection is still healthy.
97100
_, err := conn.Write([]byte("> ping <\r\n"))
98101
if err != nil {
99-
return fmt.Errorf("sending ping to node '%s' failed: %s\n", conn.RemoteAddr().String(), err.Error())
102+
return fmt.Errorf("sending ping to node '%s' failed: %s", conn.RemoteAddr().String(), err.Error())
100103
}
101104

102105
// Write message to TLS connections.
103106
_, err = fmt.Fprintf(conn, "%s\r\n", text)
104107
for err != nil {
105-
return fmt.Errorf("sending message to node '%s' failed: %s\n", conn.RemoteAddr().String(), err.Error())
108+
109+
log.Printf("[imap.InternalSend] Sending to node '%s' failed, trying to recover...\n", conn.RemoteAddr())
110+
111+
// Define what IP and port of remote node look like.
112+
remoteAddr := conn.RemoteAddr().String()
113+
114+
// Define an error we can deal with.
115+
okError := fmt.Sprintf("write tcp %s->%s: write: broken pipe", conn.LocalAddr().String(), remoteAddr)
116+
117+
if err.Error() == okError {
118+
119+
// Reestablish TLS connection to remote node.
120+
conn, err = comm.ReliableConnect(remoteAddr, c.IntlTLSConfig, c.IntlConnRetry)
121+
if err != nil {
122+
return fmt.Errorf("failed to reestablish connection with '%s': %s", remoteAddr, err.Error())
123+
}
124+
125+
// Save context to connection.
126+
if inc {
127+
c.IncConn = conn
128+
c.IncReader = bufio.NewReader(conn)
129+
} else {
130+
c.OutConn = conn
131+
c.OutReader = bufio.NewReader(conn)
132+
133+
// Inform remote node about which session was active.
134+
err = c.SignalSessionStart(false)
135+
if err != nil {
136+
return fmt.Errorf("signalling session to remote node failed with: %s", err.Error())
137+
}
138+
}
139+
140+
log.Printf("[imap.InternalSend] Reconnected to '%s'.\n", remoteAddr)
141+
142+
// Resend message to remote node.
143+
_, err = fmt.Fprintf(conn, "%s\r\n", text)
144+
} else {
145+
return fmt.Errorf("failed to send message to remote node '%s': %s", remoteAddr, err.Error())
146+
}
106147
}
107148

108149
return nil
@@ -205,7 +246,7 @@ func (c *Connection) SignalSessionStart(inc bool) error {
205246
func (c *Connection) SignalSessionStartFailover(inc bool, recvClientID string, origWorker string) error {
206247

207248
// Text to send.
208-
msg := strings.Replace(recvClientID, " <", fmt.Sprintf("%s <", origWorker), -1)
249+
msg := strings.Replace(recvClientID, " <", fmt.Sprintf(" %s <", origWorker), -1)
209250

210251
// Send session information internally.
211252
err := c.InternalSend(inc, msg)
@@ -279,7 +320,7 @@ func (c *Connection) Terminate() error {
279320
func (c *Connection) Error(msg string, err error) {
280321

281322
// Log error.
282-
log.Printf("%s: %s. Terminating connection to %s\n", msg, err.Error(), c.IncConn.RemoteAddr().String())
323+
log.Printf("%s: %s. Terminating connection to %s.\n", msg, err.Error(), c.IncConn.RemoteAddr().String())
283324

284325
// Terminate connection.
285326
err = c.Terminate()

0 commit comments

Comments
 (0)