Skip to content

Commit d3aad37

Browse files
committed
[server] add suport for more transport protocols
1 parent c325c6b commit d3aad37

File tree

16 files changed

+178
-126
lines changed

16 files changed

+178
-126
lines changed

examples/pubsub/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ var f unitdb.MessageHandler = func(client unitdb.Client, msg unitdb.Message) {
1616
}
1717

1818
func main() {
19+
// Create a new client to open a network connection using the protocol indicated in the URL.
1920
client, err := unitdb.NewClient(
21+
//"tcp://localhost:6060",
22+
// "ws://localhost:6080",
2023
"grpc://localhost:6080",
2124
"UCBFDONCNJLaKMCAIeJBaOVfbAXUZHNPLDKKLDKLHZHKYIZLCDPQ",
2225
unitdb.WithInsecure(),

server/internal/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
307307
}
308308

309309
log.Info("cluster.Master", "new connection request"+string(msg.Conn.ConnID))
310-
conn = Globals.Service.newRpcConn(node, msg.Conn.Proto, msg.Conn.ConnID, msg.Conn.ClientID)
310+
conn = Globals.Service.newRpcConn(node, msg.Conn.ConnID, msg.Conn.ClientID)
311311
go conn.rpcWriteLoop()
312312
}
313313
// Update session params which may have changed since the last call.
@@ -374,7 +374,7 @@ func (c *Cluster) isRemoteContract(contract string) bool {
374374
}
375375

376376
// Forward client message to the Master (cluster node which owns the topic)
377-
func (c *Cluster) routeToContract(msg lp.Packet, topic *security.Topic, msgType uint8, m *message.Message, conn *_Conn) error {
377+
func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, msgType uint8, m *message.Message, conn *_Conn) error {
378378
// Find the cluster node which owns the topic, then forward to it.
379379
n := c.nodeForContract(string(conn.clientid.Contract()))
380380
if n == nil {

server/internal/conn.go

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/unit-io/unitdb/server/internal/message"
3030
"github.com/unit-io/unitdb/server/internal/message/security"
3131
lp "github.com/unit-io/unitdb/server/internal/net"
32-
"github.com/unit-io/unitdb/server/internal/net/grpc"
32+
"github.com/unit-io/unitdb/server/internal/net/pubsub"
3333
"github.com/unit-io/unitdb/server/internal/pkg/log"
3434
"github.com/unit-io/unitdb/server/internal/pkg/uid"
3535
"github.com/unit-io/unitdb/server/internal/store"
@@ -44,8 +44,8 @@ type _Conn struct {
4444
adp lp.ProtoAdapter
4545
socket net.Conn
4646
// send chan []byte
47-
send chan lp.Packet
48-
recv chan lp.Packet
47+
send chan lp.LineProtocol
48+
recv chan lp.LineProtocol
4949
pub chan *lp.Publish
5050
stop chan interface{}
5151
insecure bool // The insecure flag provided by client will not perform key validation and permissions check on the topic.
@@ -65,21 +65,13 @@ type _Conn struct {
6565
closeC chan struct{}
6666
}
6767

68-
func (s *_Service) newConn(t net.Conn, proto lp.Proto) *_Conn {
69-
var adp lp.ProtoAdapter
70-
switch proto {
71-
case lp.GRPC:
72-
adp = &grpc.LineProto{}
73-
case lp.GRPC_WEB:
74-
adp = &grpc.LineProto{}
75-
}
76-
68+
func (s *_Service) newConn(t net.Conn) *_Conn {
7769
c := &_Conn{
78-
adp: adp,
70+
adp: &pubsub.Packet{},
7971
socket: t,
8072
MessageIds: message.NewMessageIds(),
81-
send: make(chan lp.Packet, 1), // buffered
82-
recv: make(chan lp.Packet),
73+
send: make(chan lp.LineProtocol, 1), // buffered
74+
recv: make(chan lp.LineProtocol),
8375
pub: make(chan *lp.Publish),
8476
stop: make(chan interface{}, 1), // Buffered by 1 just to make it non-blocking
8577
connid: uid.NewLID(),
@@ -97,21 +89,14 @@ func (s *_Service) newConn(t net.Conn, proto lp.Proto) *_Conn {
9789
}
9890

9991
// newRpcConn a new connection in cluster
100-
func (s *_Service) newRpcConn(conn interface{}, proto lp.Proto, connid uid.LID, clientid uid.ID) *_Conn {
101-
var adp lp.ProtoAdapter
102-
switch proto {
103-
case lp.GRPC:
104-
adp = &grpc.LineProto{}
105-
case lp.GRPC_WEB:
106-
adp = &grpc.LineProto{}
107-
}
92+
func (s *_Service) newRpcConn(conn interface{}, connid uid.LID, clientid uid.ID) *_Conn {
10893
c := &_Conn{
109-
adp: adp,
94+
adp: &pubsub.Packet{},
11095
connid: connid,
11196
clientid: clientid,
11297
MessageIds: message.NewMessageIds(),
113-
send: make(chan lp.Packet, 1), // buffered
114-
recv: make(chan lp.Packet),
98+
send: make(chan lp.LineProtocol, 1), // buffered
99+
recv: make(chan lp.LineProtocol),
115100
pub: make(chan *lp.Publish),
116101
stop: make(chan interface{}, 1), // Buffered by 1 just to make it non-blocking
117102
service: s,
@@ -311,7 +296,7 @@ func (c *_Conn) outboundID(mid message.MID) (id uint16) {
311296
return uint16(uint32(c.connid) - (uint32(mid)))
312297
}
313298

314-
func (c *_Conn) storeInbound(m lp.Packet) {
299+
func (c *_Conn) storeInbound(m lp.LineProtocol) {
315300
if c.clientid != nil {
316301
blockId := uint64(c.clientid.Contract())
317302
k := uint64(c.inboundID(m.Info().MessageID))<<32 + blockId
@@ -320,7 +305,7 @@ func (c *_Conn) storeInbound(m lp.Packet) {
320305
}
321306
}
322307

323-
func (c *_Conn) storeOutbound(m lp.Packet) {
308+
func (c *_Conn) storeOutbound(m lp.LineProtocol) {
324309
if c.clientid != nil {
325310
blockId := uint64(c.clientid.Contract())
326311
k := uint64(c.inboundID(m.Info().MessageID))<<32 + blockId

server/internal/hdl_conn.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/unit-io/unitdb/server/internal/message"
2727
"github.com/unit-io/unitdb/server/internal/message/security"
2828
lp "github.com/unit-io/unitdb/server/internal/net"
29+
"github.com/unit-io/unitdb/server/internal/net/pubsub"
2930
"github.com/unit-io/unitdb/server/internal/pkg/crypto"
3031
"github.com/unit-io/unitdb/server/internal/pkg/log"
3132
"github.com/unit-io/unitdb/server/internal/pkg/stats"
@@ -66,7 +67,7 @@ func (c *_Conn) readLoop() error {
6667
}
6768

6869
// handle handles inbound packets.
69-
func (c *_Conn) handle(pkt lp.Packet) error {
70+
func (c *_Conn) handle(pkt lp.LineProtocol) error {
7071
start := time.Now()
7172
var status int = 200
7273
defer func() {
@@ -83,9 +84,16 @@ func (c *_Conn) handle(pkt lp.Packet) error {
8384
var returnCode uint8
8485
packet := *pkt.(*lp.Connect)
8586

87+
// switch the proto adapter based on protoname in the connect packet.
88+
switch packet.ProtoName {
89+
case "TELEMETRY":
90+
c.adp = &pubsub.Packet{}
91+
case "INGESTION":
92+
case "QUERY":
93+
}
8694
c.insecure = packet.InsecureFlag
8795
c.username = string(packet.Username)
88-
clientid, err := c.onConnect(packet.ClientID)
96+
clientid, err := c.onConnect([]byte(packet.ClientID))
8997
if err != nil {
9098
status = err.Status
9199
returnCode = 0x05 // Unauthorized

server/internal/net/hdl_grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (s *GrpcServer) Stream(stream pbx.Unitdb_StreamServer) error {
7272
conn := StreamConn(stream)
7373
defer conn.Close()
7474

75-
go s.Handler(conn, GRPC)
75+
go s.Handler(conn)
7676
<-stream.Context().Done()
7777
return nil
7878
}

server/internal/net/hdl_grpc_web.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (s *HttpServer) HandleFunc(w http.ResponseWriter, r *http.Request) {
9393
return nil
9494
})
9595

96-
go s.Handler(newConn(ws), GRPC_WEB)
96+
go s.Handler(newConn(ws))
9797
}
9898

9999
func (s *HttpServer) Serve(list net.Listener) error {

server/internal/net/hdl_tcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@ func (s *TcpServer) Serve(list net.Listener) error {
6767
}
6868

6969
tempDelay = 0
70-
go s.Handler(conn, GRPC)
70+
go s.Handler(conn)
7171
}
7272
}

0 commit comments

Comments
 (0)