Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 8687fad

Browse files
committed
Handle incoming socket messages
1 parent 71f49bf commit 8687fad

File tree

3 files changed

+50
-10
lines changed

3 files changed

+50
-10
lines changed

cmd/start.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ func (x *Start) Execute(args []string) error {
684684
}()
685685
}
686686
}
687+
687688
core.Node.Service = service.New(core.Node, sqliteDB)
688689
core.Node.Service.WaitForReady()
689690
log.Info("OpenBazaar Service Ready")
@@ -692,6 +693,8 @@ func (x *Start) Execute(args []string) error {
692693
core.Node.StartPointerRepublisher()
693694
core.Node.StartRecordAgingNotifier()
694695

696+
core.Node.WebRelayManager.ConnectToRelays(core.Node.Service)
697+
695698
core.Node.PublishLock.Unlock()
696699
err = core.Node.UpdateFollow()
697700
if err != nil {

mobile/node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ func (n *Node) start() error {
480480
n.OpenBazaarNode.PointerRepublisher = PR
481481
MR.Wait()
482482

483+
n.OpenBazaarNode.WebRelayManager.ConnectToRelays(n.OpenBazaarNode.Service)
484+
483485
n.OpenBazaarNode.PublishLock.Unlock()
484486
publishUnlocked = true
485487
n.OpenBazaarNode.UpdateFollow()

net/web_relay_manager.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@ import (
55
"encoding/binary"
66
"encoding/json"
77
"fmt"
8+
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
89

9-
"github.com/btcsuite/btcutil/base58"
10+
"github.com/OpenBazaar/openbazaar-go/pb"
11+
"github.com/golang/protobuf/ptypes/any"
1012

1113
"gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash"
1214

15+
"github.com/btcsuite/btcutil/base58"
16+
1317
"github.com/gorilla/websocket"
1418
)
1519

1620
type WebRelayManager struct {
1721
webrelays []string
1822
peerID string
1923
connections []*websocket.Conn
24+
obService NetworkService
2025
}
2126

2227
type EncryptedMessage struct {
@@ -34,24 +39,34 @@ type SubscribeMessage struct {
3439
SubscriptionKey string `json:"subscriptionKey"`
3540
}
3641

42+
type SubscribeResponse struct {
43+
Subscribe string `json:"subscribe"`
44+
}
45+
3746
func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager {
47+
return &WebRelayManager{webrelays, sender, nil, nil}
48+
}
49+
50+
func (wrm *WebRelayManager) ConnectToRelays(service NetworkService) {
51+
52+
// Set WRM service
53+
wrm.obService = service
3854

3955
// Establish connections
4056
var conns []*websocket.Conn
41-
for _, relay := range webrelays {
57+
for _, relay := range wrm.webrelays {
4258

4359
// Connect and subscribe to websocket server
44-
conn, err := connectToServer(relay, sender)
60+
conn, err := wrm.connectToServer(relay, wrm.peerID)
4561
if err != nil {
4662
log.Error("Could not connect to: %s", relay)
4763
}
4864

4965
conns = append(conns, conn)
5066
}
51-
return &WebRelayManager{webrelays, sender, conns}
5267
}
5368

54-
func connectToServer(relay string, sender string) (*websocket.Conn, error) {
69+
func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*websocket.Conn, error) {
5570
// Generate subscription key for web relay
5671
peerIDMultihash, _ := multihash.FromB58String(sender)
5772
decoded, _ := multihash.Decode(peerIDMultihash)
@@ -102,7 +117,7 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) {
102117
return nil, err
103118
}
104119

105-
fmt.Printf("Successfully connected and subscribed to: %s\n", relay)
120+
fmt.Printf("Successfully connected to %s and subscribed to: %s\n", relay, base58.Encode(subscriptionKey))
106121

107122
go func() {
108123
for {
@@ -115,10 +130,30 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) {
115130
// print out that message for clarity
116131
fmt.Printf("Received incoming message from relay: %s\n", string(p))
117132

118-
//if err := c.WriteMessage(messageType, p); err != nil {
119-
// fmt.Println(err)
120-
// //return nil, err
121-
//}
133+
if string(p) == "{\"subscribe\": true}" {
134+
log.Debugf("Received subscribe success message")
135+
} else {
136+
// turn encrypted message into OFFLINE_RELAY and process normally
137+
m := new(pb.Message)
138+
m.MessageType = pb.Message_OFFLINE_RELAY
139+
m.Payload = &any.Any{Value: p}
140+
141+
handler := wrm.obService.HandlerForMsgType(m.MessageType)
142+
143+
peerID, _ := peer.IDB58Decode(sender)
144+
145+
if peerID != "" {
146+
m, err = handler(peerID, m, nil)
147+
if err != nil {
148+
if m != nil {
149+
log.Debugf("%s handle message error: %s", m.MessageType.String(), err.Error())
150+
} else {
151+
log.Errorf("Error: %s", err.Error())
152+
}
153+
}
154+
log.Debugf("Received OFFLINE_RELAY2 message from %s", peerID.Pretty())
155+
}
156+
}
122157

123158
}
124159
}()

0 commit comments

Comments
 (0)