Skip to content

Commit a71c03a

Browse files
committed
addjust code
1 parent 93570d9 commit a71c03a

File tree

2 files changed

+130
-0
lines changed

2 files changed

+130
-0
lines changed

api/getconnectioninfo.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//go:build !codeanalysis
2+
// +build !codeanalysis
3+
4+
package api
5+
6+
import (
7+
"context"
8+
9+
"github.com/NpoolPlatform/message/npool/sphinxproxy"
10+
"github.com/NpoolPlatform/sphinx-proxy/pkg/connection"
11+
)
12+
13+
func (s *Server) GetConnectionInfos(ctx context.Context, in *sphinxproxy.GetConnectionInfosRequest) (*sphinxproxy.GetConnectionInfosResponse, error) {
14+
infos := connection.GetConnectionMGR().GetConnectionInfos()
15+
return &sphinxproxy.GetConnectionInfosResponse{
16+
Infos: infos,
17+
Total: uint32(len(infos)),
18+
}, nil
19+
}

pkg/connection/connection.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package connection
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"sync"
7+
"time"
8+
9+
"github.com/NpoolPlatform/go-service-framework/pkg/logger"
10+
"github.com/NpoolPlatform/go-service-framework/pkg/wlog"
11+
"github.com/NpoolPlatform/message/npool/sphinxproxy"
12+
)
13+
14+
type Connection struct {
15+
sphinxproxy.SphinxProxyStream_ProxyConnnectionServer
16+
*sphinxproxy.ConnectionInfo
17+
ctx context.Context
18+
cancel context.CancelFunc
19+
onCloseFuncs []func(conn *Connection)
20+
recvHandlers []func(data *sphinxproxy.DataElement)
21+
closeOnce sync.Once
22+
}
23+
24+
func RegisterConnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionServer) (*Connection, error) {
25+
select {
26+
case <-time.NewTicker(time.Second * 3).C:
27+
return nil, wlog.Errorf("timeout for register connection")
28+
default:
29+
data, err := stream.Recv()
30+
if err != nil {
31+
return nil, wlog.WrapError(err)
32+
}
33+
34+
statusCode := sphinxproxy.StatusCode_StatusCodeSuccess
35+
statusMsg := ""
36+
37+
connInfo := &sphinxproxy.ConnectionInfo{}
38+
err = json.Unmarshal(data.Payload, connInfo)
39+
if err != nil {
40+
statusCode = sphinxproxy.StatusCode_StatusCodeFailed
41+
statusMsg = err.Error()
42+
}
43+
44+
if statusCode == sphinxproxy.StatusCode_StatusCodeSuccess && data.ConnectID != connInfo.ID {
45+
statusCode = sphinxproxy.StatusCode_StatusCodeFailed
46+
statusMsg = err.Error()
47+
}
48+
49+
err = stream.Send(&sphinxproxy.DataElement{
50+
ConnectID: data.ConnectID,
51+
MsgID: data.MsgID,
52+
StatusCode: statusCode,
53+
StatusMsg: &statusMsg,
54+
})
55+
if err != nil {
56+
return nil, wlog.WrapError(err)
57+
}
58+
59+
if statusCode != sphinxproxy.StatusCode_StatusCodeSuccess {
60+
return nil, wlog.Errorf(statusMsg)
61+
}
62+
63+
ctx, cancel := context.WithCancel(stream.Context())
64+
return &Connection{
65+
SphinxProxyStream_ProxyConnnectionServer: stream,
66+
ConnectionInfo: connInfo,
67+
ctx: ctx,
68+
cancel: cancel,
69+
}, nil
70+
}
71+
}
72+
73+
func (conn *Connection) WatchClose(onClose func(conn *Connection)) {
74+
conn.onCloseFuncs = append(conn.onCloseFuncs, onClose)
75+
}
76+
77+
func (conn *Connection) WatchRecv(onRecv func(data *sphinxproxy.DataElement)) {
78+
conn.recvHandlers = append(conn.recvHandlers, onRecv)
79+
}
80+
81+
func (conn *Connection) Close() {
82+
conn.closeOnce.Do(func() {
83+
for _, onClose := range conn.onCloseFuncs {
84+
onClose(conn)
85+
}
86+
conn.cancel()
87+
logger.Sugar().Warnf(
88+
"connection is closed, siderType: %v, ID: %v, Position: %v",
89+
conn.SiderType,
90+
conn.Position,
91+
conn.Position)
92+
})
93+
}
94+
95+
func (conn *Connection) OnRecv() {
96+
go func() {
97+
defer conn.Close()
98+
for {
99+
data, err := conn.Recv()
100+
if err != nil {
101+
logger.Sugar().Error(err)
102+
return
103+
}
104+
for _, recvHandler := range conn.recvHandlers {
105+
go recvHandler(data)
106+
}
107+
}
108+
}()
109+
110+
<-conn.ctx.Done()
111+
}

0 commit comments

Comments
 (0)