Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 62a6a88

Browse files
committed
use handler func in place of the networkService object
1 parent f24fa4b commit 62a6a88

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

core/inbound_message_scanner.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type inboundMessageScanner struct {
2222
datastore repo.Datastore
2323
service net.NetworkService
2424
getHandler func(t pb.Message_MessageType) func(peer.ID, *pb.Message, interface{}) (*pb.Message, error)
25+
extractID func([]byte) (*peer.ID, error)
2526
broadcast chan repo.Notifier
2627

2728
// Worker-handling dependencies
@@ -31,12 +32,27 @@ type inboundMessageScanner struct {
3132
stopWorker chan bool
3233
}
3334

35+
func peerIDExtractor(data []byte) (*peer.ID, error) {
36+
pubkey, err := libp2p.UnmarshalPublicKey(data)
37+
if err != nil {
38+
log.Errorf("err extracting peerID: %v", err.Error())
39+
return nil, err
40+
}
41+
i, err := peer.IDFromPublicKey(pubkey)
42+
if err != nil {
43+
log.Errorf("err extracting peerID: %v", err.Error())
44+
return nil, err
45+
}
46+
return &i, nil
47+
}
48+
3449
// StartInboundMsgScanner - start the notifier
3550
func (n *OpenBazaarNode) StartInboundMsgScanner() {
3651
n.InboundMsgScanner = &inboundMessageScanner{
3752
datastore: n.Datastore,
3853
service: n.Service,
3954
getHandler: n.Service.HandlerForMsgType,
55+
extractID: peerIDExtractor,
4056
broadcast: n.Broadcast,
4157
intervalDelay: n.scannerIntervalDelay(),
4258
logger: logging.MustGetLogger("inboundMessageScanner"),
@@ -81,19 +97,13 @@ func (scanner *inboundMessageScanner) PerformTask() {
8197
}
8298
for _, m := range msgs {
8399
if m.MsgErr == ErrInsufficientFunds.Error() {
84-
85100
// Get handler for this msg type
86101
handler := scanner.getHandler(pb.Message_MessageType(m.MessageType))
87102
if handler == nil {
88103
log.Errorf("err fetching handler for msg: %v", pb.Message_MessageType(m.MessageType))
89104
continue
90105
}
91-
pubkey, err := libp2p.UnmarshalPublicKey(m.PeerPubkey)
92-
if err != nil {
93-
log.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
94-
continue
95-
}
96-
i, err := peer.IDFromPublicKey(pubkey)
106+
i, err := scanner.extractID(m.PeerPubkey)
97107
if err != nil {
98108
log.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
99109
continue
@@ -107,9 +117,8 @@ func (scanner *inboundMessageScanner) PerformTask() {
107117
continue
108118
}
109119
}
110-
111120
// Dispatch handler
112-
_, err = handler(i, &msg.Msg, nil)
121+
_, err = handler(*i, &msg.Msg, nil)
113122
if err != nil {
114123
log.Debugf("%d handle message error from %s: %s", m.MessageType, m.PeerID, err)
115124
continue
@@ -119,8 +128,6 @@ func (scanner *inboundMessageScanner) PerformTask() {
119128
if err != nil {
120129
log.Errorf("err putting message : %v", err)
121130
}
122-
123131
}
124132
}
125-
126133
}

core/inbound_message_scanner_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,19 @@ func TestPerformTaskInboundMessageScanner(t *testing.T) {
9191
}
9292
}
9393

94+
extractor := func(data []byte) (*peer.ID, error) {
95+
return nil, nil
96+
}
97+
9498
datastore := db.NewSQLiteDatastore(database, new(sync.Mutex), wi.Bitcoin)
9599
worker := &inboundMessageScanner{
96100
datastore: datastore,
97101
logger: logging.MustGetLogger("testInboundMsgScanner"),
98102
getHandler: handler,
103+
extractID: extractor,
99104
}
100105

101-
//worker.PerformTask()
106+
worker.PerformTask()
102107
msgs, err := worker.datastore.Messages().GetAllErrored()
103108
if err != nil {
104109
t.Errorf("err fetching msgs : %v", err)

0 commit comments

Comments
 (0)