Skip to content

Commit b89afc6

Browse files
committed
Huge change: Moved from long-lived and reused TLS connections inside pluto system to on-demand per client connection based ones. This should allow for proper concurrent access and also it should tremendously increase pluto's overall resilience.
1 parent ae28958 commit b89afc6

18 files changed

+855
-1040
lines changed

auth/postgres.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ func (p *PostgresAuthenticator) AuthenticatePlain(username string, password stri
106106
// Check what type of error we received.
107107
if err == pgx.ErrNoRows {
108108
return -1, "", fmt.Errorf("username not found in users table or password wrong")
109-
} else {
110-
return -1, "", fmt.Errorf("error while trying to locate user: %s", err.Error())
111109
}
110+
111+
return -1, "", fmt.Errorf("error while trying to locate user: %s", err.Error())
112112
}
113113

114114
// Build the deterministic client-specific session identifier.

comm/connection.go

Lines changed: 10 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package comm
22

33
import (
4-
"bufio"
54
"fmt"
65
"log"
7-
"strings"
86
"time"
97

108
"crypto/tls"
@@ -15,56 +13,50 @@ import (
1513
// ReliableConnect attempts to connect to defined remote node
1614
// as longs as the error from previous attempts is possible
1715
// to be dealt with.
18-
func ReliableConnect(name string, remoteName string, remoteIP string, remotePort string, tlsConfig *tls.Config, retry int) (*tls.Conn, error) {
16+
func ReliableConnect(remoteName string, remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.Conn, error) {
1917

2018
var err error
2119
var c *tls.Conn
2220

23-
// Define address we are trying to connect to.
24-
nodeAddr := fmt.Sprintf("%s:%s", remoteIP, remotePort)
25-
2621
// Define what an error looks like we can deal with.
27-
okError := fmt.Sprintf("dial tcp %s: getsockopt: connection refused", nodeAddr)
22+
okError := fmt.Sprintf("dial tcp %s: getsockopt: connection refused", remoteAddr)
2823

2924
// Initially, set error string to the one we can deal with.
3025
err = fmt.Errorf(okError)
3126

3227
for err != nil {
3328

3429
// Attempt to connect to worker node.
35-
c, err = tls.Dial("tcp", nodeAddr, tlsConfig)
30+
c, err = tls.Dial("tcp", remoteAddr, tlsConfig)
3631
if err != nil {
3732

3833
if err.Error() == okError {
3934
time.Sleep(time.Duration(retry) * time.Millisecond)
4035
} else {
41-
return nil, fmt.Errorf("%s: Could not connect to port of node '%s' because of: %s\n", name, remoteName, err.Error())
36+
return nil, fmt.Errorf("Could not connect to port of node '%s' because of: %s\n", remoteName, err.Error())
4237
}
4338
}
4439
}
4540

46-
log.Printf("%s: Successfully connected to worker node '%s'.\n", name, remoteName)
41+
log.Printf("Successfully connected to worker node '%s'.\n", remoteName)
4742

4843
return c, nil
4944
}
5045

5146
// ReliableSend sends text to other node specified and
5247
// tries to reconnect in case of simple disconnects.
53-
func ReliableSend(conn *tls.Conn, text string, name string, remoteName string, remoteIP string, remotePort string, tlsConfig *tls.Config, timeout int, retry int) (*tls.Conn, bool, error) {
48+
func ReliableSend(conn *tls.Conn, text string, remoteName string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error {
5449

5550
var err error
5651
var replacedConn *tls.Conn
5752

58-
// Track if we replaced the connection.
59-
replaced := false
60-
6153
// Set configured timeout on waiting for response.
6254
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Millisecond))
6355

6456
// Test long-lived connection.
6557
_, err = conn.Write([]byte("> ping <\r\n"))
6658
if err != nil {
67-
return nil, false, fmt.Errorf("sending ping to node '%s' failed with: %s\n", remoteName, err.Error())
59+
return fmt.Errorf("sending ping to node '%s' failed with: %s\n", remoteName, err.Error())
6860
}
6961

7062
// Wait for configured time to pass.
@@ -85,75 +77,17 @@ func ReliableSend(conn *tls.Conn, text string, name string, remoteName string, r
8577
if err.Error() == okError {
8678

8779
// Connection was lost. Reconnect.
88-
replacedConn, err = ReliableConnect(name, remoteName, remoteIP, remotePort, tlsConfig, retry)
80+
replacedConn, err = ReliableConnect(remoteName, remoteAddr, tlsConfig, retry)
8981
if err != nil {
90-
return nil, false, fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
82+
return fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
9183
}
9284

93-
// Indicate we replaced connection.
94-
replaced = true
95-
96-
// Wait configured time before attempting next transfer.
97-
time.Sleep(time.Duration(retry) * time.Millisecond)
98-
9985
// Retry transfer.
10086
_, err = fmt.Fprintf(replacedConn, "%s\r\n", text)
10187
} else {
102-
return nil, false, fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
88+
return fmt.Errorf("could not reestablish connection with '%s': %s\n", remoteName, err.Error())
10389
}
10490
}
10591

106-
if replaced {
107-
return replacedConn, replaced, nil
108-
}
109-
110-
return conn, replaced, nil
111-
}
112-
113-
// InternalSend is used by nodes of the pluto system to
114-
// successfully transmit a message to another node or
115-
// fail definitely. This prevents further handler advancement
116-
// in case a link failed.
117-
func InternalSend(conn *tls.Conn, text string, name string, remoteName string) error {
118-
119-
// Test long-lived connection.
120-
_, err := conn.Write([]byte("> ping <\r\n"))
121-
if err != nil {
122-
return fmt.Errorf("%s: sending ping to node '%s' failed: %s\n", name, remoteName, err.Error())
123-
}
124-
125-
// Write message to TLS connections.
126-
_, err = fmt.Fprintf(conn, "%s\r\n", text)
127-
for err != nil {
128-
return fmt.Errorf("%s: sending message to node '%s' failed: %s\n", name, remoteName, err.Error())
129-
}
130-
13192
return nil
13293
}
133-
134-
// InternalReceive is used by nodes in the pluto system
135-
// receive an incoming message and filter out all prior
136-
// received ping message.
137-
func InternalReceive(reader *bufio.Reader) (string, error) {
138-
139-
var err error
140-
141-
// Initial value for received message in order
142-
// to skip past the mandatory ping message.
143-
text := "> ping <\r\n"
144-
145-
for text == "> ping <\r\n" {
146-
147-
text, err = reader.ReadString('\n')
148-
if err != nil {
149-
break
150-
}
151-
}
152-
153-
// If an error happened, return it.
154-
if err != nil {
155-
return "", err
156-
}
157-
158-
return strings.TrimRight(text, "\r\n"), nil
159-
}

comm/receiver-sender_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,25 +112,13 @@ func TestSenderReceiver(t *testing.T) {
112112
// Wait shortly for goroutines to have started.
113113
time.Sleep(200 * time.Millisecond)
114114

115-
// Connect via TLS from worker-1 to storage.
116-
cToN2, err := tls.Dial("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Storage.IP, testEnv.Config.Storage.SyncPort), internalTLSConfigN1)
117-
if err != nil {
118-
t.Fatalf("[comm_test.TestSenderReceiver] Expected to be able to connect from worker-1 to storage but received: %s\n", err.Error())
119-
}
120-
121115
// Create map of connections for worker-1.
122-
connsN1 := make(map[string]*tls.Conn)
123-
connsN1[n2] = cToN2
124-
125-
// Connect via TLS from storage to worker-1.
126-
cToN1, err := tls.Dial("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].IP, testEnv.Config.Workers[n1].SyncPort), internalTLSConfigN2)
127-
if err != nil {
128-
t.Fatalf("[comm_test.TestSenderReceiver] Expected to be able to connect from storage to worker-1 but received: %s\n", err.Error())
129-
}
116+
connsN1 := make(map[string]string)
117+
connsN1[n2] = fmt.Sprintf("%s:%s", testEnv.Config.Storage.IP, testEnv.Config.Storage.SyncPort)
130118

131119
// Create map of connections for storage.
132-
connsN2 := make(map[string]*tls.Conn)
133-
connsN2[n1] = cToN1
120+
connsN2 := make(map[string]string)
121+
connsN2[n1] = fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].IP, testEnv.Config.Workers[n1].SyncPort)
134122

135123
// Initialize sending interface for worker-1.
136124
chan1, err := comm.InitSender(n1, "../test-comm-sending-worker-1.log", internalTLSConfigN1, testEnv.Config.IntlConnTimeout, testEnv.Config.IntlConnRetry, chanIncN1, chanUpdN1, n1DownSender, connsN1)

comm/sender.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Sender struct {
2828
updLog *os.File
2929
incVClock chan string
3030
updVClock chan map[string]int
31-
nodes map[string]*tls.Conn
31+
nodes map[string]string
3232
wg *sync.WaitGroup
3333
shutdown chan struct{}
3434
}
@@ -40,7 +40,7 @@ type Sender struct {
4040
// with. It returns a channel local processes can put
4141
// CRDT changes into, so that those changes will be
4242
// communicated to connected nodes.
43-
func InitSender(name string, logFilePath string, tlsConfig *tls.Config, timeout int, retry int, incVClock chan string, updVClock chan map[string]int, downSender chan struct{}, nodes map[string]*tls.Conn) (chan string, error) {
43+
func InitSender(name string, logFilePath string, tlsConfig *tls.Config, timeout int, retry int, incVClock chan string, updVClock chan map[string]int, downSender chan struct{}, nodes map[string]string) (chan string, error) {
4444

4545
// Create and initialize what we need for
4646
// a CRDT sender routine.
@@ -282,25 +282,19 @@ func (sender *Sender) SendMsgs() {
282282
// Unlock mutex.
283283
sender.lock.Unlock()
284284

285-
for i, conn := range sender.nodes {
285+
for nodeName, nodeAddr := range sender.nodes {
286286

287-
// Extract address to reconnect to.
288-
addrParts := strings.Split(conn.RemoteAddr().String(), ":")
287+
// Connect to node.
288+
conn, err := ReliableConnect(nodeName, nodeAddr, sender.tlsConfig, sender.intlConnRetry)
289+
if err != nil {
290+
log.Fatalf("[comm.SendMsgs] Failed to connect to %s: %s\n", err.Error())
291+
}
289292

290293
// Send payload reliably to other nodes.
291-
conn, replaced, err := ReliableSend(conn, marshalledMsg, sender.name, i, addrParts[0], addrParts[1], sender.tlsConfig, sender.intlConnTimeout, sender.intlConnRetry)
294+
err = ReliableSend(conn, marshalledMsg, nodeName, nodeAddr, sender.tlsConfig, sender.intlConnTimeout, sender.intlConnRetry)
292295
if err != nil {
293296
log.Fatalf("[comm.SendMsgs] Failed to send: %s\n", err.Error())
294297
}
295-
296-
if replaced {
297-
298-
// If connection had to be re-established, lock
299-
// sender and exchange old connection with new one.
300-
sender.lock.Lock()
301-
sender.nodes[i] = conn
302-
sender.lock.Unlock()
303-
}
304298
}
305299

306300
// Lock mutex.

crypto/config.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ func NewPublicTLSConfig(certPath string, keyPath string) (*tls.Config, error) {
2222
// Good parts of them were taken from the excellent post:
2323
// "Achieving a Perfect SSL Labs Score with Go":
2424
// https://blog.bracelab.com/achieving-perfect-ssl-labs-score-with-go
25+
// With further optimizations for speed and security from here:
26+
// "So you want to expose Go on the Internet"
27+
// https://blog.gopheracademy.com/advent-2016/exposing-go-on-the-internet/
2528
config := &tls.Config{
2629
Certificates: make([]tls.Certificate, 1),
2730
InsecureSkipVerify: false,
2831
MinVersion: tls.VersionTLS12,
29-
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
32+
CurvePreferences: []tls.CurveID{tls.CurveP256},
3033
PreferServerCipherSuites: true,
3134
CipherSuites: []uint16{
3235
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
33-
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
34-
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
35-
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
3636
},
3737
}
3838

@@ -62,20 +62,21 @@ func NewInternalTLSConfig(certPath string, keyPath string, rootCertPath string)
6262
// Good parts of them were taken from the excellent post:
6363
// "Achieving a Perfect SSL Labs Score with Go":
6464
// https://blog.bracelab.com/achieving-perfect-ssl-labs-score-with-go
65+
// With further optimizations for speed and security from here:
66+
// "So you want to expose Go on the Internet"
67+
// https://blog.gopheracademy.com/advent-2016/exposing-go-on-the-internet/
6568
config := &tls.Config{
6669
RootCAs: x509.NewCertPool(),
6770
ClientCAs: x509.NewCertPool(),
6871
ClientAuth: tls.RequireAndVerifyClientCert,
6972
Certificates: make([]tls.Certificate, 1),
7073
InsecureSkipVerify: false,
7174
MinVersion: tls.VersionTLS12,
72-
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
75+
CurvePreferences: []tls.CurveID{tls.CurveP256},
7376
PreferServerCipherSuites: true,
77+
SessionTicketsDisabled: false,
7478
CipherSuites: []uint16{
7579
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
76-
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
77-
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
78-
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
7980
},
8081
}
8182

0 commit comments

Comments
 (0)