-
Notifications
You must be signed in to change notification settings - Fork 13
Description
issues:
- service-interface :fixed
old service-interfce:
// old service/NodeAccountService api
Login(obdClient *ObdNode, msgData string) (retData interface{}, err error)
Logout(obdClient *ObdNode) (err error)
UserLogin(obdClient *ObdNode, msgData string) (retData interface{}, err error)
UserLogout(obdClient *ObdNode, msgData string) (err error)
UpdateUserInfo(obdP2pNodeId, obdClientId, userId string) (retData interface{}, err error)
UpdateUsers(obdClient *ObdNode, msgData string) (err error)
GetNodeInfoByP2pAddress(context *gin.Context)
GetUserState(context *gin.Context)
GetUserP2pNodeId(context *gin.Context)
GetAllUsers(context *gin.Context)
GetAllObdNodes(context *gin.Context)
// service/channel api:
func (manager *channelManager) updateChannelInfo(obdP2pNodeId string, msgData string) (err error)
func (manager *channelManager) GetChannelState(context *gin.Context)
func (manager *channelManager) GetChannels(context *gin.Context)
// service/htlc api:
func (manager *htlcManager) getPath(msgData string) (path interface{}, err error)
func (manager *htlcManager) updateHtlcInfo( msgData string) (err error)
func (manager *htlcManager) GetHtlcCurrState(context *gin.Context)
The old intrerfaces: have no document; input/output parameters are week types; mixed gin-handler and data-api service
New service definition applies protobuf. Details are in tracker/tkrpc/info-tracker.proto
New interfaces:
service InfoTracker {
rpc HeartBeat (stream UpdateNodeInfoReq) returns (EmptyRes);
//map to old func Logout Login
rpc UpdateNodeInfo (UpdateNodeInfoReq) returns (EmptyRes);
//map to old func userLogout userLogin
rpc UpdateUserInfo (UpdateUserInfoReq) returns (EmptyRes);
//map to old func updateUsers
rpc UpdateUserInfos (UpdateUserInfosReq) returns (EmptyRes);
//map to old func: GetUserState , GetUserP2pNodeId
//Request:SetUserInfoReq{user_id,node_Id}
//old GetUserP2pNodeId use request:SetUserInfoReq{user_id} ; may not work when user login on multi node
rpc GetUserInfo (UpdateUserInfoReq) returns (UserInfo);
//Map to old function GetAllUsers
rpc GetUserInfos (ListReq) returns (UserInfosRes);
//map old function GetAllObdNodes
rpc GetNodes (ListReq) returns (NodeInfosRes);
// map to old ChannelService.updateChannelInfo
rpc UpdateChannelInfo(ChannelInfo)returns(ChannelInfo);
rpc UpdateChannelInfos(UpdateChannelInfosReq)returns(EmptyRes);
//map to old ChannelService.GetChannelState
rpc GetChannelInfo(SimpleFilter)returns(ChannelInfo);
rpc GetChannels(ListReq)returns (NodeInfosRes);
// map old function updateHtlcInfo
rpc UpdateHtlcInfo(HtlcInfo)returns (HtlcInfo);
//map old function getPath
rpc HtlcGetPath(HtlcGetPathReq)returns (HtlcGetPathRes);
//map old function GetHtlcCurrState
rpc GetHtlcInfo(GetHtlcInfoReq)returns (HtlcInfo);
}
//obd-user-info
message UserInfo {
int32 id = 1;
//user_peer_id
string user_id = 2;
//node_peer_id
string node_id = 3;
// 1 online 2 offline
int32 is_online = 4;
string access_ip = 5;
//unix timestamp in seconds ; google.protobuf.Timestamp will more better,but sql database not support protobuf.Timestamp.
int64 updated_at = 6;
int64 created_at = 7;
}
...
...
Strong-typed data structure. Added/updated some comments as document in the proto definition file, and the document is displayed synchronously in proto genrated go-files, and will be automatically integrated in swagger document too, or obd js-sdk which invokes the rpc-service.
-
(fixed) one user connects to multiple obd-nodes ( tracker? ) which call getUserP2pNodeId from remote trackers. This will not work. Fixed: now an obd only connects one tracker when sync/query data.
-
(fixed) Tracker side updateUsers does not set userinfo.ObdP2pNodeId. userInfo should sync this field.
-
(fixed) omnibolt-report-final.pdf refer: OBD uses not maintained dependencies ( ile-rotatelogs, asdine/storm, etc.). Fixed: storm is delete, tracker uses gorm database model now.
-
(fixed) Tracker/dao/pojo.go UserInfo.ObdNodeId fieldname hard to understand.
-
(fixed) func (manager *htlcManager) getPath(obdClient *ObdNode, msgData string) (path interface{}, err error) at tracker/service/htlc_service.go lines 40.
the retured paramter path may be a string or a map; When consume returned value, you should use reflect to detect type of the return value at lightclient/connect_tracker.go lines 107, which is a bad desgin
obd/lightclient/connect_tracker.go
Lines 98 to 114 in 5829315
replyMessage := bean.ReplyMessage{} err = json.Unmarshal(message, &replyMessage) if err == nil { switch replyMessage.Type { case enum.MsgType_Tracker_GetHtlcPath_351: v := reflect.ValueOf(replyMessage.Result) requestMessage := bean.RequestMessage{} requestMessage.Type = replyMessage.Type requestMessage.Data = "" if v.Kind() == reflect.Map { dataMap := replyMessage.Result.(map[string]interface{}) requestMessage.RecipientUserPeerId = dataMap["senderPeerId"].(string) requestMessage.Data = dataMap["h"].(string) + "_" + dataMap["path"].(string) + "_" + tool.FloatToString(dataMap["amount"].(float64), 8) //requestMessage.Data = dataMap["h"].(string) + "_" + dataMap["path"].(string) } htlcTrackerDealModule(requestMessage) case enum.MsgType_Tracker_Connect_301: -
(fixed) userState mananger, userOnlineOfOtherObdMap .fixed
old code
obd/tracker/service/p2p_service.go
Lines 213 to 245 in 5829315
func handleUserStateStream(stream network.Stream) { rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) str, err := rw.ReadString('~') if err != nil { return } if str == "" { return } if str != "" { str = strings.TrimSuffix(str, "~") log.Println("handleUserStateStream", str) params := strings.Split(str, "_") if len(params) > 1 { if _, ok := userOnlineOfOtherObdMap[params[0]]; ok == true { if _, ok = userOnlineOfOtherObdMap[params[0]][params[1]]; ok == true { delete(userOnlineOfOtherObdMap[params[0]], params[1]) } else { if userOnlineOfOtherObdMap[params[0]] == nil { userOnlineOfOtherObdMap[params[0]] = make(map[string]string) } userOnlineOfOtherObdMap[params[0]][params[1]] = params[2] } } else { if userOnlineOfOtherObdMap[params[0]] == nil { userOnlineOfOtherObdMap[params[0]] = make(map[string]string) } userOnlineOfOtherObdMap[params[0]][params[1]] = params[2] } } } _ = stream.Close() }
it's hard to understand the the meaning of the var-params at line 225 from the context, you must seek the message source to understand the exact meaning of this variable, which is hard to maintain the source code. Concurrent map access is a problom too, which is now discarded.
the new code version :
func (s *ImpInfoServer) UpdateUserInfo(ctx context.Context, req *tkrpc.UpdateUserInfoReq) (*tkrpc.EmptyRes, error) {
uinfo := new(tkrpc.UserInfo)
err := Orm.FirstOrCreate(uinfo, tkrpc.UserInfo{UserId: req.UserId, NodeId: req.NodeId}).Assign(tkrpc.UserInfo{IsOnline: req.IsOnline}).Error
return &tkrpc.EmptyRes{}, err
}
-
(fixed) Two duplicated function: tracker/service/p2p_service.go sendChannelUnlockInfoToObd SendChannelLockInfoToObd
obd/tracker/service/p2p_service.go
Lines 266 to 300 in 5829315
func sendChannelLockInfoToObd(channelId, userId, obdP2pNodeId string) bool { findID, err := peer.Decode(obdP2pNodeId) if err == nil { findPeer, err := kademliaDHT.FindPeer(ctx, findID) if err == nil { stream, err := hostNode.NewStream(ctx, findPeer.ID, bean.ProtocolIdForLockChannel) if err == nil { rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) request := bean.TrackerLockChannelRequest{UserId: userId, ChannelId: channelId} marshal, _ := json.Marshal(request) _, _ = rw.WriteString(string(marshal) + "~") err = rw.Flush() if err == nil { str, err := rw.ReadString('~') if err != nil { return false } if str == "" { return false } if str != "" { str = strings.TrimSuffix(str, "~") log.Println("OnSendChannelLockInfoToObd", str) if str == "1" { return true } } } _ = stream.Close() } } } return false }
obd/tracker/service/p2p_service.go
Lines 349 to 385 in 5829315
func sendChannelUnlockInfoToObd(channelId, userId, obdP2pNodeId string) bool { if len(obdP2pNodeId) == 0 { return false } findID, err := peer.Decode(obdP2pNodeId) if err == nil { findPeer, err := kademliaDHT.FindPeer(ctx, findID) if err == nil { stream, err := hostNode.NewStream(ctx, findPeer.ID, bean.ProtocolIdForUnlockChannel) if err == nil { rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) request := bean.TrackerLockChannelRequest{UserId: userId, ChannelId: channelId} marshal, _ := json.Marshal(request) _, _ = rw.WriteString(string(marshal) + "~") err = rw.Flush() if err == nil { str, err := rw.ReadString('~') if err != nil { return false } if str == "" { return false } if str != "" { str = strings.TrimSuffix(str, "~") if str == "1" { return true } } } _ = stream.Close() } } } return false } -
(to to)channelInfo will be updated on both sides of tracker and obd-server; When an obd starts, obd's channels-Info data will cover the data from the tracker, the tracker's update will lost. The sync mechanism between obd and tracker causes data error.
-
(to to) channelInfo's fields on tracker are different to the obd.
When obd commits channelInfo to a tracker, now we have the code to convert:
//infoRequest is the data submit to tracker
infoRequest.PeerIdA = channelInfo.PeerIdA
infoRequest.PeerIdB = channelInfo.PeerIdB
infoRequest.IsAlice = false
if commitmentTx.Id > 0 {
if commitmentTx.Owner == channelInfo.PeerIdA {
infoRequest.IsAlice = true
infoRequest.AmountA = commitmentTx.AmountToRSMC
infoRequest.AmountB = commitmentTx.AmountToCounterparty
} else {
infoRequest.AmountB = commitmentTx.AmountToRSMC
infoRequest.AmountA = commitmentTx.AmountToCounterparty
}
}
We should modify channelInfo's fields to be the same as the tracker. It will be easy when updates the sync mode between the obd and tracker.
channelInfo.IsAlice should be a function, it's a computed field. For example:
func (c *channelInfo)IsAlice(currentUserPeerID) bool{
return c.PeerIdA==currentUserPeerID
}
-
(to to) func (manager *htlcManager) getPath(msgData string) (path interface{}, err error) shoud have a document: the main flow,and how to use the returned value.
obd/tracker/service/htlc_service.go
Lines 40 to 83 in 5829315
func (manager *htlcManager) getPath(obdClient *ObdNode, msgData string) (path interface{}, err error) { manager.mu.Lock() defer manager.mu.Unlock() log.Println("getPath", msgData) if tool.CheckIsString(&msgData) == false { return "", errors.New("wrong inputData") } pathRequest := &bean.HtlcPathRequest{} err = json.Unmarshal([]byte(msgData), pathRequest) if err != nil { return "", err } if tool.CheckIsString(&pathRequest.RealPayerPeerId) == false { return "", errors.New("wrong realPayerPeerId") } if tool.CheckIsString(&pathRequest.PayeePeerId) == false { return "", errors.New("wrong SendeePeerId") } if pathRequest.Amount < tool.GetOmniDustBtc() { return "", errors.New("wrong amount") } manager.createChannelNetwork(pathRequest.RealPayerPeerId, pathRequest.PayeePeerId, pathRequest.PropertyId, pathRequest.Amount, nil, true) resultIndex := manager.getPathIndex() retNode := make(map[string]interface{}) retNode["senderPeerId"] = pathRequest.RealPayerPeerId retNode["h"] = pathRequest.H retNode["amount"] = pathRequest.Amount retNode["path"] = "" if resultIndex != -1 { splitArr := strings.Split(manager.openList[resultIndex].ChannelIds, ",") path := "" for i := len(splitArr) - 1; i > -1; i-- { path += splitArr[i] + "," } path = strings.TrimSuffix(path, ",") retNode["path"] = path } log.Println("return path info", retNode) return retNode, nil } -
(fixed) obd/service/user.go line 60 calls noticeTrackerUserLogin().sendMsgToTracker() .
Lines 20 to 76 in 5829315
func (service *UserManager) UserLogin(user *bean.User) error { if user == nil { return errors.New(enum.Tips_user_nilUser) } if user.IsAdmin { if tool.CheckIsString(&user.Mnemonic) == false || bip39.IsMnemonicValid(user.Mnemonic) == false { return errors.New(enum.Tips_common_wrong + "mnemonic") } changeExtKey, err := HDWalletService.CreateChangeExtKey(user.Mnemonic) if err != nil { return err } user.PeerId = tool.GetUserPeerId(user.Mnemonic) user.ChangeExtKey = changeExtKey } userDB, err := dao.DBService.GetUserDB(user.PeerId) if err != nil { return err } var node dao.User err = userDB.Select(q.Eq("PeerId", user.PeerId)).First(&node) if node.Id == 0 { node = dao.User{} node.PeerId = user.PeerId node.P2PLocalPeerId = user.P2PLocalPeerId node.P2PLocalAddress = user.P2PLocalAddress node.CurrState = bean.UserState_OnLine node.CreateAt = time.Now() node.LatestLoginTime = node.CreateAt node.CurrAddrIndex = 0 err = userDB.Save(&node) } else { node.P2PLocalPeerId = user.P2PLocalPeerId node.P2PLocalAddress = user.P2PLocalAddress node.CurrState = bean.UserState_OnLine node.LatestLoginTime = time.Now() err = userDB.Update(&node) } noticeTrackerUserLogin(node) if err != nil { return err } loginLog := &dao.UserLoginLog{} loginLog.PeerId = user.PeerId loginLog.LoginAt = time.Now() _ = userDB.Save(loginLog) user.State = node.CurrState user.CurrAddrIndex = node.CurrAddrIndex user.Db = userDB return nil }
- issue 1: if some err occurs ,we MUST not invoke it (L60). If it noticeTrackerUserLogin just record a audit-log, we can ignore the err and invoke it.
- issue 2: obd/service is just a database/data operation module, it MUST not call sendMsgToTracker to send async message; obd/lightclient module is async message center, now 95% async messages are recieved and sent here, all the client-websocket-conn p2p-conn are initialized here; obd/service/htlc_tx_forward.go PayerRequestFindPath invoke "sendMsgToTracker(enum.MsgType_Tracker_GetHtlcPath_351, pathRequest)" have the same probolem.
- now i have fixed the two issue, and obd/service.sendMsgToTracker have disbled, and below messages submit by grpc-conn now,no need to sync userInfo/channelInfo/htlcInfo to tracker with complex async message Goroutines. the old msgList:
- MsgType_Tracker_UpdateChannelInfo_350
- MsgType_Tracker_UserLogin_304
- MsgType_Tracker_UserLogout_305
- MsgType_Tracker_UpdateHtlcTxState_352
- (to to) may be we should design the obd-tracker info sync-mode and security first. Maybe new mechanism won't need the above work.