Skip to content

Commit f89b537

Browse files
committed
rename
1 parent 4a37eb4 commit f89b537

File tree

8 files changed

+68
-65
lines changed

8 files changed

+68
-65
lines changed

api/getconnectioninfo.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"context"
88

99
"github.com/NpoolPlatform/message/npool/sphinxproxy"
10-
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
10+
"github.com/NpoolPlatform/sphinx-proxy/pkg/deserver"
1111
)
1212

13-
func (s *Server) GetConnectionInfos(ctx context.Context, in *sphinxproxy.GetConnectionInfosRequest) (*sphinxproxy.GetConnectionInfosResponse, error) {
14-
infos := connection.GetConnectionMGR().GetConnectionInfos()
15-
return &sphinxproxy.GetConnectionInfosResponse{
13+
func (s *Server) GetClientInfos(ctx context.Context, in *sphinxproxy.GetClientInfosRequest) (*sphinxproxy.GetClientInfosResponse, error) {
14+
infos := deserver.GetDEServerMGR().GetClientInfos()
15+
return &sphinxproxy.GetClientInfosResponse{
1616
Infos: infos,
1717
Total: uint32(len(infos)),
1818
}, nil

api/stream/connection.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@ package stream
33
import (
44
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
55
"github.com/NpoolPlatform/message/npool/sphinxproxy"
6-
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
6+
"github.com/NpoolPlatform/sphinx-proxy/pkg/deserver"
77
)
88

9-
func (s *Server) ProxyConnnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionServer) error {
10-
conn, err := connection.RegisterConnection(stream)
9+
func (s *Server) DEStream(stream sphinxproxy.SphinxProxyStream_DEStreamServer) error {
10+
conn, err := deserver.RegisterDEServer(stream)
1111
if err != nil {
1212
logger.Sugar().Error(err)
1313
return err
1414
}
1515
defer conn.Close()
1616

17-
mgr := connection.GetConnectionMGR()
18-
mgr.AddConnection(conn)
17+
mgr := deserver.GetDEServerMGR()
18+
mgr.AddDEServer(conn)
1919

2020
conn.OnRecv()
2121
return nil

cmd/sphinx-proxy/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
99
"github.com/NpoolPlatform/sphinx-proxy/api"
1010
"github.com/NpoolPlatform/sphinx-proxy/api/stream"
11-
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
1211
"github.com/NpoolPlatform/sphinx-proxy/pkg/db"
12+
"github.com/NpoolPlatform/sphinx-proxy/pkg/deserver"
1313
"github.com/NpoolPlatform/sphinx-proxy/pkg/migrator"
1414

1515
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -45,7 +45,7 @@ func run(ctx context.Context) error {
4545

4646
func shutdown(ctx context.Context) {
4747
<-ctx.Done()
48-
connection.GetConnectionMGR().CloseAll()
48+
deserver.GetDEServerMGR().CloseAll()
4949
logger.Sugar().Warnw(
5050
"Watch",
5151
"State", "Done",
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package connection
1+
package deserver
22

33
import (
44
"context"
@@ -11,17 +11,17 @@ import (
1111
"github.com/NpoolPlatform/message/npool/sphinxproxy"
1212
)
1313

14-
type Connection struct {
15-
sphinxproxy.SphinxProxyStream_ProxyConnnectionServer
16-
*sphinxproxy.ConnectionInfo
14+
type DEServer struct {
15+
sphinxproxy.SphinxProxyStream_DEStreamServer
16+
*sphinxproxy.ClientInfo
1717
ctx context.Context
1818
cancel context.CancelFunc
19-
onCloseFuncs []func(conn *Connection)
19+
onCloseFuncs []func(conn *DEServer)
2020
recvHandlers []func(data *sphinxproxy.DataElement)
2121
closeOnce sync.Once
2222
}
2323

24-
func RegisterConnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionServer) (*Connection, error) {
24+
func RegisterDEServer(stream sphinxproxy.SphinxProxyStream_DEStreamServer) (*DEServer, error) {
2525
select {
2626
case <-time.NewTicker(time.Second * 3).C:
2727
return nil, wlog.Errorf("timeout for register connection")
@@ -34,7 +34,7 @@ func RegisterConnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionSer
3434
statusCode := sphinxproxy.StatusCode_StatusCodeSuccess
3535
statusMsg := ""
3636

37-
connInfo := &sphinxproxy.ConnectionInfo{}
37+
connInfo := &sphinxproxy.ClientInfo{}
3838
err = json.Unmarshal(data.Payload, connInfo)
3939
if err != nil {
4040
statusCode = sphinxproxy.StatusCode_StatusCodeFailed
@@ -61,38 +61,38 @@ func RegisterConnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionSer
6161
}
6262

6363
ctx, cancel := context.WithCancel(stream.Context())
64-
return &Connection{
65-
SphinxProxyStream_ProxyConnnectionServer: stream,
66-
ConnectionInfo: connInfo,
67-
ctx: ctx,
68-
cancel: cancel,
64+
return &DEServer{
65+
SphinxProxyStream_DEStreamServer: stream,
66+
ClientInfo: connInfo,
67+
ctx: ctx,
68+
cancel: cancel,
6969
}, nil
7070
}
7171
}
7272

73-
func (conn *Connection) WatchClose(onClose func(conn *Connection)) {
73+
func (conn *DEServer) WatchClose(onClose func(conn *DEServer)) {
7474
conn.onCloseFuncs = append(conn.onCloseFuncs, onClose)
7575
}
7676

77-
func (conn *Connection) WatchRecv(onRecv func(data *sphinxproxy.DataElement)) {
77+
func (conn *DEServer) WatchRecv(onRecv func(data *sphinxproxy.DataElement)) {
7878
conn.recvHandlers = append(conn.recvHandlers, onRecv)
7979
}
8080

81-
func (conn *Connection) Close() {
81+
func (conn *DEServer) Close() {
8282
conn.closeOnce.Do(func() {
8383
for _, onClose := range conn.onCloseFuncs {
8484
onClose(conn)
8585
}
8686
conn.cancel()
8787
logger.Sugar().Warnf(
8888
"connection is closed, siderType: %v, ID: %v, Position: %v",
89-
conn.SiderType,
89+
conn.ClientType,
9090
conn.Position,
9191
conn.Position)
9292
})
9393
}
9494

95-
func (conn *Connection) OnRecv() {
95+
func (conn *DEServer) OnRecv() {
9696
go func() {
9797
defer conn.Close()
9898
for {
Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package connection
1+
package deserver
22

33
import (
44
"fmt"
@@ -10,43 +10,43 @@ import (
1010
"github.com/google/uuid"
1111
)
1212

13-
type ConnectionMGR struct {
14-
siderInfos map[string]*sphinxproxy.SiderInfo
15-
connInfos map[string]map[sphinxproxy.SiderType][]*Connection
13+
type DEServerMGR struct {
14+
coinInfos map[string]*sphinxproxy.CoinInfo
15+
connInfos map[string]map[sphinxproxy.ClientType][]*DEServer
1616
recvChannel sync.Map
17-
connections []*Connection
17+
connections []*DEServer
1818
}
1919

20-
var cmgr *ConnectionMGR
20+
var cmgr *DEServerMGR
2121

22-
func GetConnectionMGR() *ConnectionMGR {
22+
func GetDEServerMGR() *DEServerMGR {
2323
if cmgr == nil {
24-
cmgr = &ConnectionMGR{
25-
siderInfos: make(map[string]*sphinxproxy.SiderInfo),
26-
connInfos: make(map[string]map[sphinxproxy.SiderType][]*Connection),
24+
cmgr = &DEServerMGR{
25+
coinInfos: make(map[string]*sphinxproxy.CoinInfo),
26+
connInfos: make(map[string]map[sphinxproxy.ClientType][]*DEServer),
2727
recvChannel: sync.Map{},
2828
}
2929
}
3030
return cmgr
3131
}
3232

33-
func (mgr *ConnectionMGR) AddConnection(conn *Connection) {
33+
func (mgr *DEServerMGR) AddDEServer(conn *DEServer) {
3434
for _, info := range conn.Infos {
35-
mgr.siderInfos[info.Name] = info
35+
mgr.coinInfos[info.Name] = info
3636
if _, ok := mgr.connInfos[info.Name]; !ok {
37-
mgr.connInfos[info.Name] = make(map[sphinxproxy.SiderType][]*Connection)
37+
mgr.connInfos[info.Name] = make(map[sphinxproxy.ClientType][]*DEServer)
3838
}
39-
mgr.connInfos[info.Name][conn.SiderType] = append(mgr.connInfos[info.Name][conn.SiderType], conn)
39+
mgr.connInfos[info.Name][conn.ClientType] = append(mgr.connInfos[info.Name][conn.ClientType], conn)
4040
}
4141
mgr.connections = append(mgr.connections, conn)
4242
conn.WatchRecv(mgr.DealDataElement)
4343
conn.WatchClose(mgr.deleteConnection)
4444
}
4545

46-
func (mgr *ConnectionMGR) GetConnectionInfos() []*sphinxproxy.ConnectionInfo {
47-
ret := []*sphinxproxy.ConnectionInfo{}
46+
func (mgr *DEServerMGR) GetClientInfos() []*sphinxproxy.ClientInfo {
47+
ret := []*sphinxproxy.ClientInfo{}
4848
for _, info := range mgr.connections {
49-
ret = append(ret, info.ConnectionInfo)
49+
ret = append(ret, info.ClientInfo)
5050
}
5151
return ret
5252
}
@@ -58,13 +58,13 @@ type MsgInfo struct {
5858
}
5959

6060
// delete conn from connectionMGR
61-
func (mgr *ConnectionMGR) deleteConnection(conn *Connection) {
61+
func (mgr *DEServerMGR) deleteConnection(conn *DEServer) {
6262
for _, info := range conn.Infos {
6363
if _, ok := mgr.connInfos[info.Name]; !ok {
6464
continue
6565
}
6666

67-
conns, ok := mgr.connInfos[info.Name][conn.SiderType]
67+
conns, ok := mgr.connInfos[info.Name][conn.ClientType]
6868
if !ok || len(conns) == 0 {
6969
continue
7070
}
@@ -85,18 +85,19 @@ func (mgr *ConnectionMGR) deleteConnection(conn *Connection) {
8585
}
8686
}
8787

88-
func (mgr *ConnectionMGR) CloseAll() {
88+
func (mgr *DEServerMGR) CloseAll() {
8989
for _, conn := range mgr.connections {
9090
conn.Close()
9191
}
9292
}
9393

9494
// if recvChannel is not nil, recv response will send to it
9595
// default value of statusCode is success
96-
func (mgr *ConnectionMGR) SendMsg(
96+
func (mgr *DEServerMGR) SendMsg(
9797
name string,
98-
siderType sphinxproxy.SiderType,
98+
clientType sphinxproxy.ClientType,
9999
msgType sphinxproxy.MsgType,
100+
msgID *string,
100101
connID *string,
101102
msg *MsgInfo,
102103
recvChannel *chan MsgInfo,
@@ -105,11 +106,11 @@ func (mgr *ConnectionMGR) SendMsg(
105106
return fmt.Errorf("cannot find any sider,for %v", name)
106107
}
107108

108-
conns, ok := mgr.connInfos[name][siderType]
109+
conns, ok := mgr.connInfos[name][clientType]
109110
if !ok || len(conns) == 0 {
110-
return fmt.Errorf("cannot find any sider,for %v-%v", name, siderType)
111+
return fmt.Errorf("cannot find any sider,for %v-%v", name, clientType)
111112
}
112-
var conn *Connection
113+
var conn *DEServer
113114
if connID == nil {
114115
conn = conns[time.Now().Second()%len(conns)]
115116
} else {
@@ -120,22 +121,23 @@ func (mgr *ConnectionMGR) SendMsg(
120121
}
121122
}
122123
if conn == nil {
123-
return fmt.Errorf("cannot find any sider,for %v-%v-%v", name, siderType, connID)
124+
return fmt.Errorf("cannot find any sider,for %v-%v-%v", name, clientType, connID)
124125
}
125126
}
126127

127-
return mgr.sendMsg(msgType, nil, msg, conn, recvChannel)
128+
return mgr.sendMsg(msgType, msgID, msg, conn, recvChannel)
128129
}
129130

130131
// if recvChannel is not nil, recv response will send to it
131132
// default value of statusCode is success
132-
func (mgr *ConnectionMGR) SendMsgWithConnID(
133+
func (mgr *DEServerMGR) SendMsgWithConnID(
133134
msgType sphinxproxy.MsgType,
134135
connID string,
136+
msgID *string,
135137
msg *MsgInfo,
136138
recvChannel *chan MsgInfo,
137139
) error {
138-
var conn *Connection
140+
var conn *DEServer
139141
for _, _conn := range mgr.connections {
140142
if _conn.ID == connID {
141143
conn = _conn
@@ -146,16 +148,16 @@ func (mgr *ConnectionMGR) SendMsgWithConnID(
146148
return fmt.Errorf("cannot find any sider,for %v", connID)
147149
}
148150

149-
return mgr.sendMsg(msgType, nil, msg, conn, recvChannel)
151+
return mgr.sendMsg(msgType, msgID, msg, conn, recvChannel)
150152
}
151153

152154
// if recvChannel is not nil, recv response will send to it
153155
// default value of statusCode is success
154-
func (mgr *ConnectionMGR) sendMsg(
156+
func (mgr *DEServerMGR) sendMsg(
155157
msgType sphinxproxy.MsgType,
156158
msgID *string,
157159
msg *MsgInfo,
158-
conn *Connection,
160+
conn *DEServer,
159161
recvChannel *chan MsgInfo,
160162
) error {
161163
if conn == nil {
@@ -188,7 +190,7 @@ func (mgr *ConnectionMGR) sendMsg(
188190
})
189191
}
190192

191-
func (mgr *ConnectionMGR) DealDataElement(data *sphinxproxy.DataElement) {
193+
func (mgr *DEServerMGR) DealDataElement(data *sphinxproxy.DataElement) {
192194
if ch, ok := mgr.recvChannel.LoadAndDelete(data.MsgID); ok {
193195
select {
194196
case <-time.NewTimer(time.Second).C:
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package connection
1+
package deserver
22

33
import (
44
"context"
@@ -66,9 +66,10 @@ func (mgr *DEHandlerMGR) RegisterDEHandler(
6666
statusMsg = err.Error()
6767
}
6868

69-
return GetConnectionMGR().SendMsgWithConnID(
69+
return GetDEServerMGR().SendMsgWithConnID(
7070
data.MsgType,
7171
data.ConnectID,
72+
&data.MsgID,
7273
&MsgInfo{
7374
Payload: outPayload,
7475
StatusCode: statusCode,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package connection
1+
package deserver
22

33
import (
44
"context"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package connection
1+
package deserver
22

33
import (
44
"context"

0 commit comments

Comments
 (0)