Skip to content

Commit 4308bc2

Browse files
committed
add connection and mgr
1 parent 0988d55 commit 4308bc2

File tree

5 files changed

+285
-31
lines changed

5 files changed

+285
-31
lines changed

api/connection.go

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,20 @@
11
package api
22

33
import (
4-
"fmt"
5-
"time"
6-
7-
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
84
"github.com/NpoolPlatform/message/npool/sphinxproxy"
9-
"github.com/google/uuid"
5+
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
106
)
117

128
func (s *Server) ProxyConnnection(stream sphinxproxy.SphinxProxy_ProxyConnnectionServer) error {
13-
fmt.Println("recv")
14-
go TestConnectionRecv(stream)
15-
TestConnectionSend(stream)
16-
return nil
17-
}
18-
19-
func TestConnectionRecv(stream sphinxproxy.SphinxProxy_ProxyConnnectionServer) {
20-
for i := 0; i < 100; i++ {
21-
data, err := stream.Recv()
22-
fmt.Println("recv:", data, err)
9+
conn, err := connection.RegisterConnection(stream)
10+
if err != nil {
11+
return err
2312
}
24-
}
13+
defer conn.Close()
2514

26-
func TestConnectionSend(stream sphinxproxy.SphinxProxy_ProxyConnnectionServer) {
27-
for i := 0; i < 100; i++ {
28-
time.Sleep(time.Second)
29-
err := stream.Send(&sphinxproxy.DataElement{
30-
ConnectID: uuid.NewString(),
31-
Payload: []byte(fmt.Sprintf("send it from proxy, %v", i)),
32-
})
33-
logger.Sugar().Error(err)
34-
}
15+
mgr := connection.GetConnectionMGR()
16+
mgr.AddConnection(conn)
17+
18+
conn.OnRecv()
19+
return nil
3520
}

cmd/sphinx-proxy/run.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/NpoolPlatform/go-service-framework/pkg/action"
88
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
99
"github.com/NpoolPlatform/sphinx-proxy/api"
10+
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
1011
"github.com/NpoolPlatform/sphinx-proxy/pkg/db"
1112
"github.com/NpoolPlatform/sphinx-proxy/pkg/migrator"
1213

@@ -43,7 +44,8 @@ func run(ctx context.Context) error {
4344

4445
func shutdown(ctx context.Context) {
4546
<-ctx.Done()
46-
logger.Sugar().Infow(
47+
connection.GetConnectionMGR().CloseAll()
48+
logger.Sugar().Warnw(
4749
"Watch",
4850
"State", "Done",
4951
"Error", ctx.Err(),

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ require (
77
github.com/NpoolPlatform/basal-middleware v0.0.0-20240731030616-5ed3dac01fec
88
github.com/NpoolPlatform/go-service-framework v0.0.0-20241209085313-8052627db885
99
github.com/NpoolPlatform/libent-cruder v0.0.0-20231008051607-f64457cf6ab6
10-
github.com/NpoolPlatform/message v0.0.0-20241204082339-3301c2f34176
10+
github.com/NpoolPlatform/message v0.0.0-20241212064321-176d5ab75352
1111
github.com/google/uuid v1.3.0
1212
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2
1313
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
1414
google.golang.org/grpc v1.55.0
1515
google.golang.org/protobuf v1.30.0
1616
)
1717

18+
replace github.com/NpoolPlatform/go-service-framework => ../go-service-framework
19+
1820
require (
1921
ariga.io/atlas v0.10.0 // indirect
2022
github.com/Shonminh/apollo-client v0.4.0 // indirect

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,10 @@ github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20O
5151
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
5252
github.com/NpoolPlatform/basal-middleware v0.0.0-20240731030616-5ed3dac01fec h1:h895TOi4aBRTdlCysVPSys3CdLtKMEDrDmfMgV6/bz0=
5353
github.com/NpoolPlatform/basal-middleware v0.0.0-20240731030616-5ed3dac01fec/go.mod h1:2vsOMsJ6aLHp6nKc/GBOv+sFydXNtqxk9y3HW7LdclI=
54-
github.com/NpoolPlatform/go-service-framework v0.0.0-20241209085313-8052627db885 h1:RpIbKx5zK/KCMq2tOOwl3BehafPTxeH64+EaZBFgUDU=
55-
github.com/NpoolPlatform/go-service-framework v0.0.0-20241209085313-8052627db885/go.mod h1:8ZqSAAL4eLX/09HLWwDzQY0fMORy8nqHj3kwiUHEyS4=
5654
github.com/NpoolPlatform/libent-cruder v0.0.0-20231008051607-f64457cf6ab6 h1:zbM3Zq0qObWEiw+wYJihNFOsBfXey8mBgAhT7HzGB8Y=
5755
github.com/NpoolPlatform/libent-cruder v0.0.0-20231008051607-f64457cf6ab6/go.mod h1:EXCihQpRNNyWaPu1IphiZFLTyyUK/CJMZdW4My7y48Q=
58-
github.com/NpoolPlatform/message v0.0.0-20241204082339-3301c2f34176 h1:zdrRpiN8mSAPLEg2xxA5KmN6/qVp4vVmOeL6KD2NpEo=
59-
github.com/NpoolPlatform/message v0.0.0-20241204082339-3301c2f34176/go.mod h1:PxJpF0vSSOke+NJskbRBRQ2fK2K3SRe8zaEja+GxSHk=
56+
github.com/NpoolPlatform/message v0.0.0-20241212064321-176d5ab75352 h1:MYGh3N/gnYZ4ynQW7e0OdU2N01jirI1SLr4ScwANM8g=
57+
github.com/NpoolPlatform/message v0.0.0-20241212064321-176d5ab75352/go.mod h1:PxJpF0vSSOke+NJskbRBRQ2fK2K3SRe8zaEja+GxSHk=
6058
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
6159
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
6260
github.com/Shonminh/apollo-client v0.4.0 h1:AXGp4wOahrEKjheMXehgsG9B8dEfLQHttRLHeEefvus=

pkg/connection/connection.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package connection
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
11+
"github.com/NpoolPlatform/go-service-framework/pkg/wlog"
12+
"github.com/NpoolPlatform/message/npool/sphinxproxy"
13+
"github.com/google/uuid"
14+
)
15+
16+
type Connection struct {
17+
sphinxproxy.SphinxProxy_ProxyConnnectionServer
18+
*sphinxproxy.ConnectionInfo
19+
ctx context.Context
20+
cancel context.CancelFunc
21+
onCloseFuncs []func(conn *Connection)
22+
recvHandlers []func(data *sphinxproxy.DataElement)
23+
closeOnce sync.Once
24+
}
25+
26+
func RegisterConnection(stream sphinxproxy.SphinxProxy_ProxyConnnectionServer) (*Connection, error) {
27+
select {
28+
case <-time.NewTicker(time.Second * 3).C:
29+
return nil, wlog.Errorf("timeout for register connection")
30+
default:
31+
data, err := stream.Recv()
32+
if err != nil {
33+
return nil, wlog.WrapError(err)
34+
}
35+
36+
statusCode := sphinxproxy.StatusCode_StatusCodeSuccess
37+
statusMsg := ""
38+
39+
connInfo := &sphinxproxy.ConnectionInfo{}
40+
err = json.Unmarshal(data.Payload, connInfo)
41+
if err != nil {
42+
statusCode = sphinxproxy.StatusCode_StatusCodeFailed
43+
statusMsg = err.Error()
44+
}
45+
46+
if statusCode == sphinxproxy.StatusCode_StatusCodeSuccess && data.ConnectID != connInfo.ID {
47+
statusCode = sphinxproxy.StatusCode_StatusCodeFailed
48+
statusMsg = err.Error()
49+
}
50+
51+
err = stream.Send(&sphinxproxy.DataElement{
52+
ConnectID: data.ConnectID,
53+
MsgID: data.MsgID,
54+
StatusCode: statusCode,
55+
StatusMsg: &statusMsg,
56+
})
57+
if err != nil {
58+
return nil, wlog.WrapError(err)
59+
}
60+
61+
if statusCode != sphinxproxy.StatusCode_StatusCodeSuccess {
62+
return nil, wlog.Errorf(statusMsg)
63+
}
64+
65+
ctx, cancel := context.WithCancel(stream.Context())
66+
return &Connection{
67+
SphinxProxy_ProxyConnnectionServer: stream,
68+
ConnectionInfo: connInfo,
69+
ctx: ctx,
70+
cancel: cancel,
71+
}, nil
72+
}
73+
}
74+
75+
func (conn *Connection) WatchClose(onClose func(conn *Connection)) {
76+
conn.onCloseFuncs = append(conn.onCloseFuncs, onClose)
77+
}
78+
79+
func (conn *Connection) WatchRecv(onRecv func(data *sphinxproxy.DataElement)) {
80+
conn.recvHandlers = append(conn.recvHandlers, onRecv)
81+
}
82+
83+
func (conn *Connection) Close() {
84+
conn.closeOnce.Do(func() {
85+
for _, onClose := range conn.onCloseFuncs {
86+
onClose(conn)
87+
}
88+
conn.cancel()
89+
logger.Sugar().Warnf(
90+
"connection is closed, siderType: %v, ID: %v, Position: %v",
91+
conn.SiderType,
92+
conn.Position,
93+
conn.Position)
94+
})
95+
}
96+
97+
func (conn *Connection) OnRecv() {
98+
defer conn.Close()
99+
for {
100+
select {
101+
case <-conn.ctx.Done():
102+
return
103+
default:
104+
data, err := conn.Recv()
105+
if err != nil {
106+
logger.Sugar().Error(err)
107+
return
108+
}
109+
for _, recvHandler := range conn.recvHandlers {
110+
recvHandler(data)
111+
}
112+
}
113+
}
114+
}
115+
116+
type ConnectionMGR struct {
117+
siderInfos map[string]*sphinxproxy.SiderInfo
118+
connInfos map[string]map[sphinxproxy.SiderType][]*Connection
119+
recvChannel sync.Map
120+
connections []*Connection
121+
}
122+
123+
var mgr *ConnectionMGR
124+
125+
func GetConnectionMGR() *ConnectionMGR {
126+
if mgr == nil {
127+
mgr = &ConnectionMGR{
128+
siderInfos: make(map[string]*sphinxproxy.SiderInfo),
129+
connInfos: make(map[string]map[sphinxproxy.SiderType][]*Connection),
130+
recvChannel: sync.Map{},
131+
}
132+
}
133+
return mgr
134+
}
135+
136+
func (mgr *ConnectionMGR) AddConnection(conn *Connection) {
137+
for _, info := range conn.Infos {
138+
mgr.siderInfos[info.Name] = info
139+
if _, ok := mgr.connInfos[info.Name]; !ok {
140+
mgr.connInfos[info.Name] = make(map[sphinxproxy.SiderType][]*Connection)
141+
}
142+
mgr.connInfos[info.Name][conn.SiderType] = append(mgr.connInfos[info.Name][conn.SiderType], conn)
143+
}
144+
mgr.connections = append(mgr.connections, conn)
145+
conn.WatchRecv(mgr.DealDataElement)
146+
conn.WatchClose(mgr.deleteConnection)
147+
}
148+
149+
func (mgr *ConnectionMGR) GetConnectionInfos() []*sphinxproxy.ConnectionInfo {
150+
ret := []*sphinxproxy.ConnectionInfo{}
151+
for _, info := range mgr.connections {
152+
ret = append(ret, info.ConnectionInfo)
153+
}
154+
return ret
155+
}
156+
157+
type MsgInfo struct {
158+
Payload []byte
159+
StatusCode *sphinxproxy.StatusCode
160+
StatusMsg *string
161+
}
162+
163+
// delete conn from connectionMGR
164+
func (mgr *ConnectionMGR) deleteConnection(conn *Connection) {
165+
for _, info := range conn.Infos {
166+
if _, ok := mgr.connInfos[info.Name]; !ok {
167+
continue
168+
}
169+
170+
conns, ok := mgr.connInfos[info.Name][conn.SiderType]
171+
if !ok || len(conns) == 0 {
172+
continue
173+
}
174+
175+
for i := 0; i < len(conns); i++ {
176+
idx := len(conns) - 1 - i
177+
if conns[idx].ID == conn.ID {
178+
conns = append(conns[:idx], conns[idx+1:]...)
179+
}
180+
}
181+
}
182+
183+
for i := 0; i < len(mgr.connections); i++ {
184+
idx := len(mgr.connections) - 1 - i
185+
if mgr.connections[idx].ID == conn.ID {
186+
mgr.connections = append(mgr.connections[:idx], mgr.connections[idx+1:]...)
187+
}
188+
}
189+
}
190+
191+
func (mgr *ConnectionMGR) CloseAll() {
192+
for _, conn := range mgr.connections {
193+
conn.Close()
194+
}
195+
}
196+
197+
// if recvChannel is not nil, recv response will send to it
198+
// default value of statusCode is success
199+
func (mgr *ConnectionMGR) SendMsg(
200+
name string,
201+
siderType sphinxproxy.SiderType,
202+
msgType sphinxproxy.MsgType,
203+
connID *string,
204+
msg *MsgInfo,
205+
recvChannel *chan MsgInfo,
206+
) error {
207+
if _, ok := mgr.connInfos[name]; !ok {
208+
return fmt.Errorf("cannot find any sider,for %v", name)
209+
}
210+
211+
conns, ok := mgr.connInfos[name][siderType]
212+
if !ok || len(conns) == 0 {
213+
return fmt.Errorf("cannot find any sider,for %v-%v", name, siderType)
214+
}
215+
var conn *Connection
216+
if connID == nil {
217+
conn = conns[time.Now().Second()%len(conns)]
218+
} else {
219+
for _, _conn := range conns {
220+
if _conn.ID == *connID {
221+
conn = _conn
222+
break
223+
}
224+
}
225+
if conn == nil {
226+
return fmt.Errorf("cannot find any sider,for %v-%v-%v", name, siderType, connID)
227+
}
228+
}
229+
230+
if msg == nil {
231+
msg = &MsgInfo{}
232+
}
233+
234+
msgID := uuid.NewString()
235+
if recvChannel != nil {
236+
mgr.recvChannel.Store(msgID, *recvChannel)
237+
}
238+
239+
if msg.StatusCode == nil {
240+
msg.StatusCode = sphinxproxy.StatusCode_StatusCodeSuccess.Enum()
241+
}
242+
243+
conn.Send(&sphinxproxy.DataElement{
244+
ConnectID: conn.ID,
245+
MsgID: msgID,
246+
MsgType: msgType,
247+
Payload: msg.Payload,
248+
StatusCode: *msg.StatusCode,
249+
StatusMsg: msg.StatusMsg,
250+
})
251+
252+
return nil
253+
}
254+
255+
func (mgr *ConnectionMGR) DealDataElement(data *sphinxproxy.DataElement) {
256+
if ch, ok := mgr.recvChannel.LoadAndDelete(data.MsgID); ok {
257+
select {
258+
case <-time.NewTimer(time.Second).C:
259+
case ch.(chan MsgInfo) <- MsgInfo{
260+
Payload: data.Payload,
261+
StatusCode: &data.StatusCode,
262+
StatusMsg: data.StatusMsg,
263+
}:
264+
}
265+
}
266+
logger.Sugar().Warn(data)
267+
}

0 commit comments

Comments
 (0)