Skip to content

Commit 46d69e9

Browse files
committed
ipv6 p2p
1 parent 178fba9 commit 46d69e9

File tree

6 files changed

+131
-6
lines changed

6 files changed

+131
-6
lines changed

chans/chans.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package chans
2+
3+
import "github.com/OpenIoTHub/gateway-go/v2/models"
4+
5+
// ClientTaskChan 传入访问者的ipv6监听ip+端口,任务从本chan接受配置创建客户端
6+
var ClientTaskChan = make(chan models.Ipv6ClientHandleTask)

client/lib.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/OpenIoTHub/gateway-go/v2/config"
77
"github.com/OpenIoTHub/gateway-go/v2/info"
88
"github.com/OpenIoTHub/gateway-go/v2/services"
9+
"github.com/OpenIoTHub/gateway-go/v2/tasks"
910
"github.com/OpenIoTHub/openiothub_grpc_api/pb-go/proto/gateway"
1011
"github.com/OpenIoTHub/utils/v2/models"
1112
"github.com/gin-gonic/gin"
@@ -34,6 +35,7 @@ func Run() {
3435
}
3536

3637
func start() {
38+
tasks.RunTasks()
3739
//启动http服务
3840
go func() {
3941
if HttpPort == 0 {

config/global.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package config
2+
3+
// Ipv6ListenTcpHandlePort ipv6监听访问端直接的连接,
4+
// 处理测试链接,测试链接显示可以连通后面有请求直接使用,当连接不通则使用其他连接
5+
// 这个端口在启动时开启,开启后将实际端口号存到这里,当接到访问者请求之后将该端口和ipv6地址发送到访问者
6+
var Ipv6ListenTcpHandlePort int = 0

models/chansData.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package models
2+
3+
type Ipv6ClientHandleTask struct {
4+
RunId string `json:"RunId"`
5+
Ipv6AddrIp string `json:"Ipv6AddrIp"`
6+
Ipv6AddrPort int `json:"Ipv6AddrPort"`
7+
}

netservice/services/connect/service/getIPv6Addr.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,38 @@ package service
22

33
import (
44
"encoding/json"
5+
"github.com/OpenIoTHub/gateway-go/v2/chans"
6+
"github.com/OpenIoTHub/gateway-go/v2/config"
7+
models2 "github.com/OpenIoTHub/gateway-go/v2/models"
58
"github.com/OpenIoTHub/getip/v2/iputils"
69
"github.com/OpenIoTHub/utils/v2/models"
710
"github.com/OpenIoTHub/utils/v2/msg"
811
"log"
912
"net"
1013
)
1114

12-
// GetIPv6Addr 传过来一个ipv6Addr+port返回一个ipv6Addr+port
15+
// GetIPv6Addr 传过来一个ipv6Addr+port返回一个ipv6Addr+port,传过来的ipv6Addr+port用于这边通过channel通知连接,
16+
// 访问端获取的ipv6Addr+port用于先校验连通性再有连接需求的时候使用连接
1317
func GetIPv6Addr(stream net.Conn, service *models.NewService) error {
14-
ipv6Map := make(map[string]interface{})
15-
16-
// 获取磁盘信息
18+
//从service读取访问者监听ipv6端口号,这里将ipv6地址+port通过chan发送到任务创建连接并处理请求
19+
var remoteIpv6ServerConfig models2.Ipv6ClientHandleTask
20+
err := json.Unmarshal([]byte(service.Config), &remoteIpv6ServerConfig)
21+
if err != nil {
22+
log.Println("json.Unmarshal([]byte(service.Config), &config):" + err.Error())
23+
return err
24+
}
25+
chans.ClientTaskChan <- remoteIpv6ServerConfig
26+
// 获取ipv6公网地址,可能为空字符串代表没有或者没获取到
1727
var ipv6Addr = iputils.GetMyPublicIpv6()
18-
ipv6Map["Ipv6Addr"] = ipv6Addr
19-
rstByte, err := json.Marshal(ipv6Map)
28+
ipv6Info := models2.Ipv6ClientHandleTask{}
29+
//访问者只要保存提供服务的ipv6地址+端口,有连接请求时创建连接
30+
ipv6Info.Ipv6AddrIp = ipv6Addr
31+
ipv6Info.Ipv6AddrPort = config.Ipv6ListenTcpHandlePort
32+
rstByte, err := json.Marshal(ipv6Info)
2033
if err != nil {
2134
log.Println("json.Marshal(ipv6Map):")
2235
log.Println(err.Error())
36+
return err
2337
}
2438
//log.Println(string(rstByte))
2539
err = msg.WriteMsg(stream, &models.JsonResponse{Code: 0, Msg: "Success", Result: string(rstByte)})

tasks/ipv6ClientServer.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package tasks
2+
3+
import (
4+
"github.com/OpenIoTHub/gateway-go/v2/chans"
5+
"github.com/OpenIoTHub/gateway-go/v2/config"
6+
"github.com/OpenIoTHub/gateway-go/v2/netservice/handle"
7+
"github.com/OpenIoTHub/utils/v2/models"
8+
"github.com/OpenIoTHub/utils/v2/msg"
9+
"github.com/libp2p/go-yamux"
10+
"log"
11+
"net"
12+
"time"
13+
)
14+
15+
func RunTasks() {
16+
go ipv6ServerTask()
17+
go ipv6ClientTask()
18+
}
19+
20+
// Ipv6ClientTask 接收配置创建新的Client handle
21+
func ipv6ClientTask() {
22+
// 主动连接访问者的APP
23+
for remoteIpv6Server := range chans.ClientTaskChan {
24+
ip := remoteIpv6Server.Ipv6AddrIp
25+
port := remoteIpv6Server.Ipv6AddrPort
26+
runId := remoteIpv6Server.RunId
27+
// 使用配置创建连接,并且发送带RunId的凭证给访问者
28+
ipv6conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
29+
IP: net.ParseIP(ip),
30+
Port: port,
31+
})
32+
if err != nil {
33+
continue
34+
}
35+
//TODO 发送凭证
36+
runIdMsg := &models.Msg{
37+
MsgType: "RunId",
38+
MsgContent: runId,
39+
}
40+
err = msg.WriteMsg(ipv6conn, runIdMsg)
41+
if err != nil {
42+
ipv6conn.Close()
43+
return
44+
}
45+
//创建session,session handle
46+
yamuxConfig := yamux.DefaultConfig()
47+
//remoteIpv6Server.EnableKeepAlive = false
48+
session, err := yamux.Server(ipv6conn, yamuxConfig)
49+
if err != nil {
50+
ipv6conn.Close()
51+
return
52+
}
53+
log.Printf("ipv6 p2p client login OK!")
54+
go handle.HandleSession(session, "")
55+
}
56+
}
57+
58+
func ipv6ServerTask() {
59+
listener, err := net.ListenTCP("tcp6", &net.TCPAddr{})
60+
if err != nil {
61+
log.Println(err)
62+
return
63+
}
64+
listenerPort := listener.Addr().(*net.TCPAddr).Port
65+
log.Println("ipv6 server listening on", listenerPort)
66+
config.Ipv6ListenTcpHandlePort = listenerPort
67+
//接受验证连通性,接受连接和服务请求
68+
for {
69+
conn, err := listener.AcceptTCP()
70+
if err != nil {
71+
time.Sleep(100 * time.Millisecond)
72+
continue
73+
}
74+
// 验证token,回复
75+
go ipv6ClientHandle(conn)
76+
}
77+
}
78+
79+
func ipv6ClientHandle(conn net.Conn) {
80+
rawMsg, err := msg.ReadMsg(conn)
81+
if err != nil {
82+
log.Println(err.Error() + "从stream读取数据错误")
83+
conn.Close()
84+
return
85+
}
86+
// TODO 验证token,RunId
87+
_ = rawMsg
88+
// Token为空
89+
handle.HandleStream(conn, "")
90+
}

0 commit comments

Comments
 (0)