Skip to content

Commit c325c6b

Browse files
committed
[Server] add fixes to cluster on closing a connection
1 parent 098d2d4 commit c325c6b

File tree

7 files changed

+36
-31
lines changed

7 files changed

+36
-31
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Unitdb supports Get, Put, Delete operations. It also supports encryption, batch
3636
Samples are available in the examples directory for reference.
3737

3838
## Clustering
39-
To bring up the Unitdb cluster start 2 or more nodes, 3 nodes or more nodes are recommended for fault tolerance.
39+
To bring up the Unitdb cluster start 2 or more nodes. For fault tolerance 3 nodes or more are recommended.
4040

4141
```
4242
> ./bin/unitdb -listen=:6060 -grpc_listen=:6080 -cluster_self=one -db_path=/tmp/unitdb/node1

server/.gitignore

Lines changed: 0 additions & 12 deletions
This file was deleted.

server/internal/cluster.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type ClusterSess struct {
8383
// IP address of the client. For long polling this is the IP of the last poll
8484
RemoteAddr string
8585

86+
// protocol - NONE (unset), RPC, GRPC, GRPC_WEB, WEBSOCK
87+
Proto lp.Proto
8688
// Connection ID
8789
ConnID uid.LID
8890

@@ -305,11 +307,13 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
305307
}
306308

307309
log.Info("cluster.Master", "new connection request"+string(msg.Conn.ConnID))
308-
conn = Globals.Service.newRpcConn(node, msg.Conn.ConnID, msg.Conn.ClientID)
310+
conn = Globals.Service.newRpcConn(node, msg.Conn.Proto, msg.Conn.ConnID, msg.Conn.ClientID)
309311
go conn.rpcWriteLoop()
310312
}
311313
// Update session params which may have changed since the last call.
314+
conn.proto = msg.Conn.Proto
312315
conn.connid = msg.Conn.ConnID
316+
conn.clientid = msg.Conn.ClientID
313317

314318
switch msg.Type {
315319
case message.SUBSCRIBE:
@@ -410,6 +414,7 @@ func (c *Cluster) routeToContract(msg lp.Packet, topic *security.Topic, msgType
410414
Message: m,
411415
Conn: &ClusterSess{
412416
//RemoteAddr: conn.(),
417+
Proto: conn.proto,
413418
ConnID: conn.connid,
414419
ClientID: conn.clientid}})
415420
}
@@ -525,7 +530,10 @@ func (c *_Conn) rpcWriteLoop() {
525530
// channel closed
526531
return
527532
}
528-
m, err := lp.Encode(c.proto, msg)
533+
if c.adp == nil {
534+
return
535+
}
536+
m, err := lp.Encode(c.adp, msg)
529537
if err != nil {
530538
log.Error("conn.writeRpc", err.Error())
531539
return

server/internal/conn.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ import (
3939
type _Conn struct {
4040
sync.Mutex
4141
tracked uint32 // Whether the connection was already tracked or not.
42-
// protocol - NONE (unset), RPC, GRPC, WEBSOCK, CLUSTER
43-
proto lp.ProtoAdapter
42+
// protocol - NONE (unset), RPC, GRPC, GRPC_WEB, WEBSOCK, CLUSTER
43+
proto lp.Proto
44+
adp lp.ProtoAdapter
4445
socket net.Conn
4546
// send chan []byte
4647
send chan lp.Packet
@@ -65,16 +66,16 @@ type _Conn struct {
6566
}
6667

6768
func (s *_Service) newConn(t net.Conn, proto lp.Proto) *_Conn {
68-
var lineProto lp.ProtoAdapter
69+
var adp lp.ProtoAdapter
6970
switch proto {
7071
case lp.GRPC:
71-
lineProto = &grpc.LineProto{}
72+
adp = &grpc.LineProto{}
7273
case lp.GRPC_WEB:
73-
lineProto = &grpc.LineProto{}
74+
adp = &grpc.LineProto{}
7475
}
7576

7677
c := &_Conn{
77-
proto: lineProto,
78+
adp: adp,
7879
socket: t,
7980
MessageIds: message.NewMessageIds(),
8081
send: make(chan lp.Packet, 1), // buffered
@@ -96,8 +97,16 @@ func (s *_Service) newConn(t net.Conn, proto lp.Proto) *_Conn {
9697
}
9798

9899
// newRpcConn a new connection in cluster
99-
func (s *_Service) newRpcConn(conn interface{}, connid uid.LID, clientid uid.ID) *_Conn {
100+
func (s *_Service) newRpcConn(conn interface{}, proto lp.Proto, connid uid.LID, clientid uid.ID) *_Conn {
101+
var adp lp.ProtoAdapter
102+
switch proto {
103+
case lp.GRPC:
104+
adp = &grpc.LineProto{}
105+
case lp.GRPC_WEB:
106+
adp = &grpc.LineProto{}
107+
}
100108
c := &_Conn{
109+
adp: adp,
101110
connid: connid,
102111
clientid: clientid,
103112
MessageIds: message.NewMessageIds(),
@@ -307,7 +316,7 @@ func (c *_Conn) storeInbound(m lp.Packet) {
307316
blockId := uint64(c.clientid.Contract())
308317
k := uint64(c.inboundID(m.Info().MessageID))<<32 + blockId
309318
fmt.Println("inbound: type, key, qos", m.Type(), k, m.Info().Qos)
310-
store.Log.PersistInbound(c.proto, k, m)
319+
store.Log.PersistInbound(c.adp, k, m)
311320
}
312321
}
313322

@@ -316,7 +325,7 @@ func (c *_Conn) storeOutbound(m lp.Packet) {
316325
blockId := uint64(c.clientid.Contract())
317326
k := uint64(c.inboundID(m.Info().MessageID))<<32 + blockId
318327
fmt.Println("inbound: type, key, qos", m.Type(), k, m.Info().Qos)
319-
store.Log.PersistOutbound(c.proto, k, m)
328+
store.Log.PersistOutbound(c.adp, k, m)
320329
}
321330
}
322331

server/internal/hdl_conn.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (c *_Conn) readLoop() error {
5252
c.socket.SetDeadline(time.Now().Add(time.Second * 120))
5353

5454
// Decode an incoming packet
55-
pkt, err := lp.ReadPacket(c.proto, reader)
55+
pkt, err := lp.ReadPacket(c.adp, reader)
5656
if err != nil {
5757
fmt.Println("readPacket: err", err)
5858
return err
@@ -205,7 +205,7 @@ func (c *_Conn) writeLoop(ctx context.Context) {
205205
// Channel closed.
206206
return
207207
}
208-
m, err := lp.Encode(c.proto, msg)
208+
m, err := lp.Encode(c.adp, msg)
209209
if err != nil {
210210
log.Error("conn.writeLoop", err.Error())
211211
return
@@ -216,7 +216,7 @@ func (c *_Conn) writeLoop(ctx context.Context) {
216216
// Channel closed.
217217
return
218218
}
219-
m, err := lp.Encode(c.proto, msg)
219+
m, err := lp.Encode(c.adp, msg)
220220
if err != nil {
221221
log.Error("conn.writeLoop", err.Error())
222222
return
@@ -396,7 +396,7 @@ func (c *_Conn) resume() {
396396
// contract is used as blockId and key prefix
397397
keys := store.Log.Keys()
398398
for _, k := range keys {
399-
msg := store.Log.Get(c.proto, k)
399+
msg := store.Log.Get(c.adp, k)
400400
if msg == nil {
401401
continue
402402
}

server/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func main() {
4242
var listenOn = flag.String("listen", "", "Override address and port to listen on for HTTP(S) clients.")
4343
var listenGrpcOn = flag.String("grpc_listen", "", "Override address and port to listen on for GRPC clients.")
4444
var clusterSelf = flag.String("cluster_self", "", "Override the name of the current cluster node")
45-
var dbPath = flag.String("db_path", "tmp/unitdb", "Override the db path.")
45+
var dbPath = flag.String("db_path", "/tmp/unitdb", "Override the db path.")
4646
var varzPath = flag.String("varz", "/varz", "Expose runtime stats at the given endpoint, e.g. /varz. Disabled if not set")
4747
flag.Parse()
4848

@@ -70,7 +70,7 @@ func main() {
7070
}
7171

7272
// Set up gRPC server, if one is configured
73-
if *listenGrpcOn == "" {
73+
if *listenGrpcOn != "" {
7474
cfg.GrpcListen = *listenGrpcOn
7575
}
7676

server/unitdb.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"cluster_config": {
4242
// Name of this node. Can be assigned from the command line.
4343
// Empty string disables clustering.
44-
"self": "",
44+
"self": "one",
4545

4646
// List of available nodes.
4747
"nodes": [

0 commit comments

Comments
 (0)