Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit c2c89b3

Browse files
authored
Merge pull request #1817 from OpenBazaar/inbound_msg_persistence
(#1797) Inbound msg persistence
2 parents 3adcf87 + 16952f3 commit c2c89b3

File tree

19 files changed

+793
-18
lines changed

19 files changed

+793
-18
lines changed

cmd/start.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@ func (x *Start) Execute(args []string) error {
697697
core.Node.StartMessageRetriever()
698698
core.Node.StartPointerRepublisher()
699699
core.Node.StartRecordAgingNotifier()
700+
core.Node.StartInboundMsgScanner()
700701

701702
core.Node.PublishLock.Unlock()
702703
err = core.Node.UpdateFollow()

core/core.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ type OpenBazaarNode struct {
121121
seedLock sync.Mutex
122122

123123
InitalPublishComplete bool
124+
125+
// InboundMsgScanner is a worker that scans the messages
126+
// table and tries to retry a failed order message
127+
InboundMsgScanner *inboundMessageScanner
124128
}
125129

126130
// TestNetworkEnabled indicates whether the node is operating with test parameters

core/inbound_message_scanner.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package core
2+
3+
import (
4+
"time"
5+
6+
"github.com/op/go-logging"
7+
libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
8+
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
9+
10+
"github.com/OpenBazaar/openbazaar-go/net"
11+
"github.com/OpenBazaar/openbazaar-go/pb"
12+
"github.com/OpenBazaar/openbazaar-go/repo"
13+
)
14+
15+
const (
16+
scannerTestingInterval = time.Duration(1) * time.Minute
17+
scannerRegularInterval = time.Duration(10) * time.Minute
18+
)
19+
20+
type inboundMessageScanner struct {
21+
// PerformTask dependencies
22+
datastore repo.Datastore
23+
service net.NetworkService
24+
getHandler func(t pb.Message_MessageType) func(peer.ID, *pb.Message, interface{}) (*pb.Message, error)
25+
extractID func([]byte) (*peer.ID, error)
26+
broadcast chan repo.Notifier
27+
28+
// Worker-handling dependencies
29+
intervalDelay time.Duration
30+
logger *logging.Logger
31+
watchdogTimer *time.Ticker
32+
stopWorker chan bool
33+
}
34+
35+
func peerIDExtractor(data []byte) (*peer.ID, error) {
36+
pubkey, err := libp2p.UnmarshalPublicKey(data)
37+
if err != nil {
38+
log.Errorf("err extracting peerID: %v", err.Error())
39+
return nil, err
40+
}
41+
i, err := peer.IDFromPublicKey(pubkey)
42+
if err != nil {
43+
log.Errorf("err extracting peerID: %v", err.Error())
44+
return nil, err
45+
}
46+
return &i, nil
47+
}
48+
49+
// StartInboundMsgScanner - start the notifier
50+
func (n *OpenBazaarNode) StartInboundMsgScanner() {
51+
n.InboundMsgScanner = &inboundMessageScanner{
52+
datastore: n.Datastore,
53+
service: n.Service,
54+
getHandler: n.Service.HandlerForMsgType,
55+
extractID: peerIDExtractor,
56+
broadcast: n.Broadcast,
57+
intervalDelay: n.scannerIntervalDelay(),
58+
logger: logging.MustGetLogger("inboundMessageScanner"),
59+
}
60+
go n.InboundMsgScanner.Run()
61+
}
62+
63+
func (n *OpenBazaarNode) scannerIntervalDelay() time.Duration {
64+
if n.TestnetEnable {
65+
return scannerTestingInterval
66+
}
67+
return scannerRegularInterval
68+
}
69+
70+
func (scanner *inboundMessageScanner) Run() {
71+
scanner.watchdogTimer = time.NewTicker(scanner.intervalDelay)
72+
scanner.stopWorker = make(chan bool)
73+
74+
// Run once on start, then wait for watchdog
75+
scanner.PerformTask()
76+
for {
77+
select {
78+
case <-scanner.watchdogTimer.C:
79+
scanner.PerformTask()
80+
case <-scanner.stopWorker:
81+
scanner.watchdogTimer.Stop()
82+
return
83+
}
84+
}
85+
}
86+
87+
func (scanner *inboundMessageScanner) Stop() {
88+
scanner.stopWorker <- true
89+
close(scanner.stopWorker)
90+
}
91+
92+
func (scanner *inboundMessageScanner) PerformTask() {
93+
msgs, err := scanner.datastore.Messages().GetAllErrored()
94+
if err != nil {
95+
scanner.logger.Error(err)
96+
return
97+
}
98+
for _, m := range msgs {
99+
if m.MsgErr == ErrInsufficientFunds.Error() {
100+
// Get handler for this msg type
101+
handler := scanner.getHandler(pb.Message_MessageType(m.MessageType))
102+
if handler == nil {
103+
log.Errorf("err fetching handler for msg: %v", pb.Message_MessageType(m.MessageType))
104+
continue
105+
}
106+
i, err := scanner.extractID(m.PeerPubkey)
107+
if err != nil {
108+
log.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
109+
continue
110+
}
111+
msg := new(repo.Message)
112+
113+
if len(m.Message) > 0 {
114+
err = msg.UnmarshalJSON(m.Message)
115+
if err != nil {
116+
log.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
117+
continue
118+
}
119+
}
120+
// Dispatch handler
121+
_, err = handler(*i, &msg.Msg, nil)
122+
if err != nil {
123+
log.Errorf("%d handle message error from %s: %s", m.MessageType, m.PeerID, err)
124+
continue
125+
}
126+
err = scanner.datastore.Messages().MarkAsResolved(m)
127+
if err != nil {
128+
log.Errorf("marking message resolved: %s", err)
129+
}
130+
}
131+
}
132+
}

core/inbound_message_scanner_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package core
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"testing"
7+
8+
"github.com/OpenBazaar/openbazaar-go/pb"
9+
"github.com/OpenBazaar/openbazaar-go/repo"
10+
"github.com/OpenBazaar/openbazaar-go/repo/db"
11+
"github.com/OpenBazaar/openbazaar-go/schema"
12+
wi "github.com/OpenBazaar/wallet-interface"
13+
"github.com/op/go-logging"
14+
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
15+
)
16+
17+
func TestPerformTaskInboundMessageScanner(t *testing.T) {
18+
var (
19+
orderMsgWithNoErr = repo.OrderMessage{
20+
MessageID: "1",
21+
OrderID: "1",
22+
MessageType: int32(pb.Message_ORDER),
23+
Message: []byte("sample message"),
24+
MsgErr: "",
25+
PeerID: "sample",
26+
PeerPubkey: []byte("sample"),
27+
}
28+
29+
orderMsgWithErr = repo.OrderMessage{
30+
MessageID: "2",
31+
OrderID: "2",
32+
MessageType: int32(pb.Message_ORDER),
33+
Message: []byte("sample message"),
34+
MsgErr: ErrInsufficientFunds.Error(),
35+
PeerID: "sample",
36+
PeerPubkey: []byte("sample"),
37+
}
38+
39+
orderMsgWithOtherErr = repo.OrderMessage{
40+
MessageID: "3",
41+
OrderID: "3",
42+
MessageType: int32(pb.Message_ORDER),
43+
Message: []byte("sample message"),
44+
MsgErr: "not a retryable err",
45+
PeerID: "sample",
46+
PeerPubkey: []byte("sample"),
47+
}
48+
49+
existingRecords = []repo.OrderMessage{
50+
orderMsgWithNoErr,
51+
orderMsgWithErr,
52+
orderMsgWithOtherErr,
53+
}
54+
55+
appSchema = schema.MustNewCustomSchemaManager(schema.SchemaContext{
56+
DataPath: schema.GenerateTempPath(),
57+
TestModeEnabled: true,
58+
})
59+
)
60+
61+
if err := appSchema.BuildSchemaDirectories(); err != nil {
62+
t.Fatal(err)
63+
}
64+
defer appSchema.DestroySchemaDirectories()
65+
if err := appSchema.InitializeDatabase(); err != nil {
66+
t.Fatal(err)
67+
}
68+
69+
database, err := appSchema.OpenDatabase()
70+
if err != nil {
71+
t.Fatal(err)
72+
}
73+
s, err := database.Prepare("insert into messages (messageID, orderID, message_type, message, peerID, err, pubkey) values (?, ?, ?, ?, ?, ?, ?)")
74+
if err != nil {
75+
t.Fatal(err)
76+
}
77+
78+
for _, r := range existingRecords {
79+
_, err = s.Exec(r.MessageID, r.OrderID, r.MessageType, r.Message, r.PeerID, r.MsgErr, r.PeerPubkey)
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
}
84+
85+
handler := func(t pb.Message_MessageType) func(peer.ID, *pb.Message, interface{}) (*pb.Message, error) {
86+
return func(peer.ID, *pb.Message, interface{}) (*pb.Message, error) {
87+
if t == pb.Message_ORDER {
88+
return nil, nil
89+
}
90+
return nil, errors.New("unknown message type")
91+
}
92+
}
93+
94+
extractor := func(data []byte) (*peer.ID, error) {
95+
return nil, nil
96+
}
97+
98+
datastore := db.NewSQLiteDatastore(database, new(sync.Mutex), wi.Bitcoin)
99+
worker := &inboundMessageScanner{
100+
datastore: datastore,
101+
logger: logging.MustGetLogger("testInboundMsgScanner"),
102+
getHandler: handler,
103+
extractID: extractor,
104+
}
105+
106+
worker.PerformTask()
107+
msgs, err := worker.datastore.Messages().GetAllErrored()
108+
if err != nil {
109+
t.Errorf("err fetching msgs : %v", err)
110+
}
111+
112+
if len(msgs) != 2 {
113+
t.Errorf("did not fetch the correct no of err records")
114+
}
115+
116+
count := 0
117+
for _, msg := range msgs {
118+
if msg.MsgErr == ErrInsufficientFunds.Error() {
119+
count++
120+
}
121+
}
122+
123+
if count != 1 {
124+
t.Errorf("did not pick the correct no of records to process")
125+
}
126+
127+
}

core/net.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract
318318
} else {
319319
err = n.Datastore.Messages().Put(
320320
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER)),
321-
orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m})
321+
orderID0, pb.Message_ORDER, peerID, repo.Message{Msg: m},
322+
"", 0, contract.VendorListings[0].VendorID.Pubkeys.Identity)
322323
if err != nil {
323324
log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER), err)
324325
}
@@ -358,7 +359,8 @@ func (n *OpenBazaarNode) SendOrderConfirmation(peerID string, contract *pb.Ricar
358359
} else {
359360
err = n.Datastore.Messages().Put(
360361
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_CONFIRMATION)),
361-
orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m})
362+
orderID0, pb.Message_ORDER_CONFIRMATION, peerID, repo.Message{Msg: m},
363+
"", 0, contract.BuyerOrder.BuyerID.Pubkeys.Identity)
362364
if err != nil {
363365
log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_CONFIRMATION), err)
364366
}
@@ -376,18 +378,22 @@ func (n *OpenBazaarNode) SendCancel(peerID, orderID string) error {
376378
//try to get public key from order
377379
order, _, _, _, _, _, err := n.Datastore.Purchases().GetByOrderId(orderID)
378380
var kp *libp2p.PubKey
381+
var pub []byte
379382
if err != nil { //probably implies we can't find the order in the Datastore
380383
kp = nil //instead SendOfflineMessage can try to get the key from the peerId
384+
pub = order.BuyerOrder.BuyerID.Pubkeys.Identity
381385
} else {
382386
k, err := libp2p.UnmarshalPublicKey(order.GetVendorListings()[0].GetVendorID().GetPubkeys().Identity)
383387
if err != nil {
384388
return err
385389
}
386390
kp = &k
391+
pub = order.VendorListings[0].VendorID.Pubkeys.Identity
387392
}
388393
err = n.Datastore.Messages().Put(
389394
fmt.Sprintf("%s-%d", orderID, int(pb.Message_ORDER_CANCEL)),
390-
orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m})
395+
orderID, pb.Message_ORDER_CANCEL, peerID, repo.Message{Msg: m},
396+
"", 0, pub)
391397
if err != nil {
392398
log.Errorf("failed putting message (%s-%d): %v", orderID, int(pb.Message_ORDER_CANCEL), err)
393399
}
@@ -406,21 +412,24 @@ func (n *OpenBazaarNode) SendReject(peerID string, rejectMessage *pb.OrderReject
406412
Payload: a,
407413
}
408414
var kp *libp2p.PubKey
415+
var pub []byte
409416
//try to get public key from order
410417
order, _, _, _, _, _, err := n.Datastore.Sales().GetByOrderId(rejectMessage.OrderID)
411418
if err != nil { //probably implies we can't find the order in the Datastore
412419
kp = nil //instead SendOfflineMessage can try to get the key from the peerId
420+
pub = order.BuyerOrder.BuyerID.Pubkeys.Identity
413421
} else {
414422
k, err := libp2p.UnmarshalPublicKey(order.GetBuyerOrder().GetBuyerID().GetPubkeys().Identity)
415423
if err != nil {
416424
log.Errorf("failed to unmarshal publicKey: %v", err)
417425
return err
418426
}
419427
kp = &k
428+
pub = order.VendorListings[0].VendorID.Pubkeys.Identity
420429
}
421430
err = n.Datastore.Messages().Put(
422431
fmt.Sprintf("%s-%d", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT)),
423-
rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m})
432+
rejectMessage.OrderID, pb.Message_ORDER_REJECT, peerID, repo.Message{Msg: m}, "", 0, pub)
424433
if err != nil {
425434
log.Errorf("failed putting message (%s-%d): %v", rejectMessage.OrderID, int(pb.Message_ORDER_REJECT), err)
426435
}
@@ -463,7 +472,8 @@ func (n *OpenBazaarNode) SendOrderFulfillment(peerID string, k *libp2p.PubKey, f
463472
} else {
464473
err = n.Datastore.Messages().Put(
465474
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_FULFILLMENT)),
466-
orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m})
475+
orderID0, pb.Message_ORDER_FULFILLMENT, peerID, repo.Message{Msg: m},
476+
"", 0, fulfillmentMessage.VendorListings[0].VendorID.Pubkeys.Identity)
467477
if err != nil {
468478
log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_FULFILLMENT), err)
469479
}
@@ -488,7 +498,8 @@ func (n *OpenBazaarNode) SendOrderCompletion(peerID string, k *libp2p.PubKey, co
488498
} else {
489499
err = n.Datastore.Messages().Put(
490500
fmt.Sprintf("%s-%d", orderID0, int(pb.Message_ORDER_COMPLETION)),
491-
orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m})
501+
orderID0, pb.Message_ORDER_COMPLETION, peerID, repo.Message{Msg: m},
502+
"", 0, completionMessage.BuyerOrder.BuyerID.Pubkeys.Identity)
492503
if err != nil {
493504
log.Errorf("failed putting message (%s-%d): %v", orderID0, int(pb.Message_ORDER_COMPLETION), err)
494505
}

0 commit comments

Comments
 (0)