|
1 | 1 | package connection |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "context" |
5 | | - "encoding/json" |
6 | 4 | "fmt" |
7 | 5 | "sync" |
8 | 6 | "time" |
9 | 7 |
|
10 | 8 | "github.com/NpoolPlatform/go-service-framework/pkg/logger" |
11 | | - "github.com/NpoolPlatform/go-service-framework/pkg/wlog" |
12 | 9 | "github.com/NpoolPlatform/message/npool/sphinxproxy" |
13 | 10 | "github.com/google/uuid" |
14 | 11 | ) |
15 | 12 |
|
16 | | -type Connection struct { |
17 | | - sphinxproxy.SphinxProxyStream_ProxyConnnectionServer |
18 | | - *sphinxproxy.ConnectionInfo |
19 | | - ctx context.Context |
20 | | - cancel context.CancelFunc |
21 | | - onCloseFuncs []func(conn *Connection) |
22 | | - recvHandlers []func(data *sphinxproxy.DataElement) |
23 | | - closeOnce sync.Once |
24 | | -} |
25 | | - |
26 | | -func RegisterConnection(stream sphinxproxy.SphinxProxyStream_ProxyConnnectionServer) (*Connection, error) { |
27 | | - select { |
28 | | - case <-time.NewTicker(time.Second * 3).C: |
29 | | - return nil, wlog.Errorf("timeout for register connection") |
30 | | - default: |
31 | | - data, err := stream.Recv() |
32 | | - if err != nil { |
33 | | - return nil, wlog.WrapError(err) |
34 | | - } |
35 | | - |
36 | | - statusCode := sphinxproxy.StatusCode_StatusCodeSuccess |
37 | | - statusMsg := "" |
38 | | - |
39 | | - connInfo := &sphinxproxy.ConnectionInfo{} |
40 | | - err = json.Unmarshal(data.Payload, connInfo) |
41 | | - if err != nil { |
42 | | - statusCode = sphinxproxy.StatusCode_StatusCodeFailed |
43 | | - statusMsg = err.Error() |
44 | | - } |
45 | | - |
46 | | - if statusCode == sphinxproxy.StatusCode_StatusCodeSuccess && data.ConnectID != connInfo.ID { |
47 | | - statusCode = sphinxproxy.StatusCode_StatusCodeFailed |
48 | | - statusMsg = err.Error() |
49 | | - } |
50 | | - |
51 | | - err = stream.Send(&sphinxproxy.DataElement{ |
52 | | - ConnectID: data.ConnectID, |
53 | | - MsgID: data.MsgID, |
54 | | - StatusCode: statusCode, |
55 | | - StatusMsg: &statusMsg, |
56 | | - }) |
57 | | - if err != nil { |
58 | | - return nil, wlog.WrapError(err) |
59 | | - } |
60 | | - |
61 | | - if statusCode != sphinxproxy.StatusCode_StatusCodeSuccess { |
62 | | - return nil, wlog.Errorf(statusMsg) |
63 | | - } |
64 | | - |
65 | | - ctx, cancel := context.WithCancel(stream.Context()) |
66 | | - return &Connection{ |
67 | | - SphinxProxyStream_ProxyConnnectionServer: stream, |
68 | | - ConnectionInfo: connInfo, |
69 | | - ctx: ctx, |
70 | | - cancel: cancel, |
71 | | - }, nil |
72 | | - } |
73 | | -} |
74 | | - |
75 | | -func (conn *Connection) WatchClose(onClose func(conn *Connection)) { |
76 | | - conn.onCloseFuncs = append(conn.onCloseFuncs, onClose) |
77 | | -} |
78 | | - |
79 | | -func (conn *Connection) WatchRecv(onRecv func(data *sphinxproxy.DataElement)) { |
80 | | - conn.recvHandlers = append(conn.recvHandlers, onRecv) |
81 | | -} |
82 | | - |
83 | | -func (conn *Connection) Close() { |
84 | | - conn.closeOnce.Do(func() { |
85 | | - for _, onClose := range conn.onCloseFuncs { |
86 | | - onClose(conn) |
87 | | - } |
88 | | - conn.cancel() |
89 | | - logger.Sugar().Warnf( |
90 | | - "connection is closed, siderType: %v, ID: %v, Position: %v", |
91 | | - conn.SiderType, |
92 | | - conn.Position, |
93 | | - conn.Position) |
94 | | - }) |
95 | | -} |
96 | | - |
97 | | -func (conn *Connection) OnRecv() { |
98 | | - go func() { |
99 | | - defer conn.Close() |
100 | | - for { |
101 | | - data, err := conn.Recv() |
102 | | - if err != nil { |
103 | | - logger.Sugar().Error(err) |
104 | | - return |
105 | | - } |
106 | | - for _, recvHandler := range conn.recvHandlers { |
107 | | - go recvHandler(data) |
108 | | - } |
109 | | - } |
110 | | - }() |
111 | | - |
112 | | - <-conn.ctx.Done() |
113 | | -} |
114 | | - |
115 | 13 | type ConnectionMGR struct { |
116 | 14 | siderInfos map[string]*sphinxproxy.SiderInfo |
117 | 15 | connInfos map[string]map[sphinxproxy.SiderType][]*Connection |
|
0 commit comments