Skip to content

Commit 5e1fcd5

Browse files
authored
Release v1.2.4
2 parents 3731bc9 + 41968fd commit 5e1fcd5

File tree

3 files changed

+81
-68
lines changed

3 files changed

+81
-68
lines changed

cmd/main.go

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"flag"
99
"fmt"
10+
"log"
1011

1112
"io"
1213
"os"
@@ -27,26 +28,21 @@ import (
2728
"github.com/multiformats/go-multiaddr"
2829
)
2930

30-
/*
31+
// TODO: Update Readme & checkout and replace better comments
3132

32-
// TODO: Update Readme & checkout and replace better comments
33+
var (
34+
myself host.Host
35+
pubSub *pubsub.PubSub
3336

37+
globalCtx context.Context
38+
globalCtxCancel context.CancelFunc
3439

40+
pbMutex sync.Mutex
41+
networkTopics = mapset.NewSet()
42+
serviceTopic string
3543

36-
37-
*/
38-
39-
var myself host.Host
40-
var pubSub *pubsub.PubSub
41-
42-
var globalCtx context.Context
43-
var globalCtxCancel context.CancelFunc
44-
45-
var pbMutex sync.Mutex
46-
var networkTopics = mapset.NewSet()
47-
var serviceTopic string
48-
49-
var handler pkg.Handler
44+
handler pkg.Handler
45+
)
5046

5147
// Read messages from subscription (topic)
5248
// NOTE: in this function we are providing subscription object, which means we should subscribe somewhere else before invoke this function
@@ -61,8 +57,8 @@ func readSub(subscription *pubsub.Subscription, incomingMessagesChan chan pubsub
6157
}
6258
msg, err := subscription.Next(context.Background())
6359
if err != nil {
64-
fmt.Println("Error reading from buffer")
65-
panic(err)
60+
log.Println("Error reading from buffer", err)
61+
return
6662
}
6763

6864
if string(msg.Data) == "" {
@@ -71,8 +67,8 @@ func readSub(subscription *pubsub.Subscription, incomingMessagesChan chan pubsub
7167
if string(msg.Data) != "\n" {
7268
addr, err := peer.IDFromBytes(msg.From)
7369
if err != nil {
74-
fmt.Println("Error occurred when reading message From field...")
75-
panic(err)
70+
log.Println("Error occurred when reading message From field...", err)
71+
return
7672
}
7773

7874
// This checks if sender address of incoming message is ours. It is need because we get our messages when subscribed to the same topic.
@@ -90,8 +86,8 @@ func newTopic(topic string) {
9086
ctx := globalCtx
9187
subscription, err := pubSub.Subscribe(topic)
9288
if err != nil {
93-
fmt.Println("Error occurred when subscribing to topic")
94-
panic(err)
89+
log.Println("Error occurred when subscribing to topic", err)
90+
return
9591
}
9692
time.Sleep(3 * time.Second)
9793
incomingMessages := make(chan pubsub.Message)
@@ -104,7 +100,7 @@ func newTopic(topic string) {
104100
case msg := <-incomingMessages:
105101
{
106102
handler.HandleIncomingMessage(serviceTopic, msg, func(textMessage pkg.TextMessage) {
107-
fmt.Printf("%s \x1b[32m%s\x1b[0m> ", textMessage.From, textMessage.Body)
103+
log.Printf("%s \x1b[32m%s\x1b[0m> ", textMessage.From, textMessage.Body)
108104
})
109105
}
110106
}
@@ -122,16 +118,16 @@ func writeTopic(topic string) {
122118
return
123119
default:
124120
}
125-
fmt.Print("> ")
121+
log.Print("> ")
126122
text, err := stdReader.ReadString('\n')
127123
if err != nil {
128124

129125
if err == io.EOF {
130126
break
131127
}
132128

133-
fmt.Println("Error reading from stdin")
134-
panic(err)
129+
log.Println("Error reading from stdin", err)
130+
return
135131
}
136132
message := &api.BaseMessage{
137133
Body: text,
@@ -140,13 +136,13 @@ func writeTopic(topic string) {
140136

141137
sendData, err := json.Marshal(message)
142138
if err != nil {
143-
fmt.Println("Error occurred when marshalling message object")
139+
log.Println("Error occurred when marshalling message object")
144140
continue
145141
}
146142
err = pubSub.Publish(topic, sendData)
147143
if err != nil {
148-
fmt.Println("Error occurred when publishing")
149-
panic(err)
144+
log.Println("Error occurred when publishing", err)
145+
return
150146
}
151147
}
152148
}
@@ -156,13 +152,13 @@ func main() {
156152
cfg := parseFlags()
157153

158154
if *help {
159-
fmt.Printf("Simple example for peer discovery using mDNS. mDNS is great when you have multiple peers in local LAN.")
160-
fmt.Printf("Usage: \n Run './chat-with-mdns'\nor Run './chat-with-mdns -wrapped_host [wrapped_host] -port [port] -rendezvous [string] -pid [proto ID]'\n")
155+
log.Printf("Simple example for peer discovery using mDNS. mDNS is great when you have multiple peers in local LAN.")
156+
log.Printf("Usage: \n Run './chat-with-mdns'\nor Run './chat-with-mdns -wrapped_host [wrapped_host] -port [port] -rendezvous [string] -pid [proto ID]'\n")
161157

162158
os.Exit(0)
163159
}
164160

165-
fmt.Printf("[*] Listening on: %s with port: %d\n", cfg.listenHost, cfg.listenPort)
161+
log.Printf("[*] Listening on: %s with port: %d\n", cfg.listenHost, cfg.listenPort)
166162

167163
ctx, ctxCancel := context.WithCancel(context.Background())
168164
globalCtx = ctx
@@ -173,7 +169,7 @@ func main() {
173169
// Creates a new RSA key pair for this wrapped_host.
174170
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
175171
if err != nil {
176-
panic(err)
172+
log.Fatalln(err)
177173
}
178174

179175
// 0.0.0.0 will listen on any interface device.
@@ -188,37 +184,41 @@ func main() {
188184
)
189185

190186
if err != nil {
191-
panic(err)
187+
log.Fatalln(err)
192188
}
193189

194-
fmt.Printf("\n[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", cfg.listenHost, cfg.listenPort, host.ID().Pretty())
190+
multiaddress := fmt.Sprintf("/ip4/%s/tcp/%v/p2p/%s", cfg.listenHost, cfg.listenPort, host.ID().Pretty())
191+
log.Printf("\n[*] Your Multiaddress Is: %s\n", multiaddress)
195192

196193
myself = host
197194

198195
pb, err := pubsub.NewFloodsubWithProtocols(context.Background(), host, []protocol.ID{protocol.ID(cfg.ProtocolID)}, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true))
199196
if err != nil {
200-
fmt.Println("Error occurred when create PubSub")
201-
panic(err)
197+
log.Println("Error occurred when create PubSub")
198+
log.Fatalln(err)
202199
}
203200

204201
// Set global PubSub object
205202
pubSub = pb
206203

207-
handler = pkg.NewHandler(pb, serviceTopic, sourceMultiAddr.String(), &networkTopics)
204+
handler = pkg.NewHandler(pb, serviceTopic, multiaddress, &networkTopics)
208205

209206
// Randezvous string = service tag
210207
// Disvover all peers with our service (all ms devices)
211-
peerChan := pkg.InitMDNS(ctx, host, cfg.RendezvousString)
208+
peerChan, err := pkg.InitMDNS(ctx, host, cfg.RendezvousString)
209+
if err != nil {
210+
panic(err)
211+
}
212212

213213
// NOTE: here we use Randezvous string as 'topic' by default .. topic != service tag
214214
subscription, err := pb.Subscribe(cfg.RendezvousString)
215215
serviceTopic = cfg.RendezvousString
216216
if err != nil {
217-
fmt.Println("Error occurred when subscribing to topic")
218-
panic(err)
217+
log.Println("Error occurred when subscribing to topic", err)
218+
return
219219
}
220220

221-
fmt.Println("Waiting for correct set up of PubSub...")
221+
log.Println("Waiting for correct set up of PubSub...")
222222
time.Sleep(3 * time.Second)
223223

224224
incomingMessages := make(chan pubsub.Message)
@@ -240,30 +240,30 @@ MainLoop:
240240
handler.HandleIncomingMessage(serviceTopic, msg, func(textMessage pkg.TextMessage) {
241241
// Green console colour: \x1b[32m
242242
// Reset console colour: \x1b[0m
243-
fmt.Printf("%s > \x1b[32m%s\x1b[0m", textMessage.From, textMessage.Body)
244-
fmt.Print("> ")
243+
log.Printf("%s > \x1b[32m%s\x1b[0m", textMessage.From, textMessage.Body)
244+
log.Print("> ")
245245
})
246246
}
247247
case newPeer := <-peerChan:
248248
{
249-
fmt.Println("\nFound peer:", newPeer, ", add address to peerstore")
249+
log.Println("\nFound peer:", newPeer, ", add address to peerstore")
250250

251251
// Adding peer addresses to local peerstore
252252
host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
253253
// Connect to the peer
254254
if err := host.Connect(ctx, newPeer); err != nil {
255-
fmt.Println("Connection failed:", err)
255+
log.Println("Connection failed:", err)
256256
}
257-
fmt.Println("Connected to:", newPeer)
258-
fmt.Println("> ")
257+
log.Println("Connected to:", newPeer)
258+
log.Println("> ")
259259
}
260260
}
261261
}
262262

263263
if err := host.Close(); err != nil {
264-
fmt.Println("\nClosing host failed:", err)
264+
log.Println("\nClosing host failed:", err)
265265
}
266-
fmt.Println("\nBye")
266+
log.Println("\nBye")
267267
}
268268

269269
func getNetworkTopics() {

pkg/handler.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package pkg
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
6+
"log"
77
"sync"
88
"time"
99

@@ -20,6 +20,7 @@ type Handler struct {
2020
networkTopics mapset.Set
2121
identityMap map[string]string
2222
multiaddress string
23+
matrixID string
2324
PbMutex sync.Mutex
2425
}
2526

@@ -43,11 +44,12 @@ func NewHandler(pb *pubsub.PubSub, serviceTopic, multiaddress string, networkTop
4344
func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage)) {
4445
addr, err := peer.IDFromBytes(msg.From)
4546
if err != nil {
46-
fmt.Println("Error occurred when reading message From field...")
47-
panic(err)
47+
log.Println("Error occurred when reading message from field...")
48+
return
4849
}
4950
message := &api.BaseMessage{}
5051
if err = json.Unmarshal(msg.Data, message); err != nil {
52+
log.Println("Error occurred during unmarshalling the base message data")
5153
return
5254
}
5355
switch message.Flag {
@@ -58,7 +60,6 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
5860
Body: message.Body,
5961
From: addr,
6062
}
61-
6263
handleTextMessage(textMessage)
6364
// Getting topic request, answer topic response
6465
case api.FlagTopicsRequest:
@@ -71,6 +72,7 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
7172
}
7273
sendData, err := json.Marshal(respond)
7374
if err != nil {
75+
log.Println("Error occurred during marshalling the respond from TopicsRequest")
7476
return
7577
}
7678
go func() {
@@ -83,7 +85,8 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
8385
case api.FlagTopicsResponse:
8486
respond := &api.GetTopicsRespondMessage{}
8587
if err = json.Unmarshal(msg.Data, respond); err != nil {
86-
panic(err)
88+
log.Println("Error occurred during unmarshalling the message data from TopicsResponse")
89+
return
8790
}
8891
for i := 0; i < len(respond.Topics); i++ {
8992
h.networkTopics.Add(respond.Topics[i])
@@ -96,10 +99,11 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
9699
Flag: api.FlagIdentityResponse,
97100
},
98101
Multiaddress: h.multiaddress,
99-
MatrixID: "",
102+
MatrixID: h.matrixID,
100103
}
101104
sendData, err := json.Marshal(respond)
102105
if err != nil {
106+
log.Println("Error occurred during marshalling the respond from IdentityRequest")
103107
return
104108
}
105109
go func() {
@@ -111,14 +115,20 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
111115
case api.FlagIdentityResponse:
112116
respond := &api.GetIdentityRespondMessage{}
113117
if err := json.Unmarshal(msg.Data, respond); err != nil {
114-
panic(err)
118+
log.Println("Error occurred during unmarshalling the message data from IdentityResponse")
119+
return
115120
}
116121
h.identityMap[respond.Multiaddress] = respond.MatrixID
117122
default:
118-
fmt.Printf("\nUnknown message type: %#x\n", message.Flag)
123+
log.Printf("\nUnknown message type: %#x\n", message.Flag)
119124
}
120125
}
121126

127+
// Set Matrix ID
128+
func (h *Handler) SetMatrixID(mxID string) {
129+
h.matrixID = mxID
130+
}
131+
122132
// Get list of topics **this** node is subscribed to
123133
func (h *Handler) GetTopics() []string {
124134
topics := h.pb.GetTopics()
@@ -143,7 +153,7 @@ func (h *Handler) RequestNetworkTopics(ctx context.Context) {
143153
Flag: api.FlagTopicsRequest,
144154
}
145155

146-
h.sendMessageToServiceTopic(requestTopicsMessage, ctx)
156+
h.sendMessageToServiceTopic(ctx, requestTopicsMessage)
147157
}
148158

149159
// Requests MatrixID from other peers
@@ -153,14 +163,15 @@ func (h *Handler) RequestPeersIdentity(ctx context.Context) {
153163
Flag: api.FlagIdentityRequest,
154164
}
155165

156-
h.sendMessageToServiceTopic(requestPeersIdentity, ctx)
166+
h.sendMessageToServiceTopic(ctx, requestPeersIdentity)
157167
}
158168

159169
// Sends marshaled message to the service topic
160-
func (h *Handler) sendMessageToServiceTopic(message *api.BaseMessage, ctx context.Context) {
170+
func (h *Handler) sendMessageToServiceTopic(ctx context.Context, message *api.BaseMessage) {
161171
sendData, err := json.Marshal(message)
162172
if err != nil {
163-
panic(err)
173+
log.Println(err.Error())
174+
return
164175
}
165176

166177
ticker := time.NewTicker(3 * time.Second)

0 commit comments

Comments
 (0)