Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 604f33c

Browse files
committed
Refactor SendOfflineMessage to publish to push nodes first.
The code currently publishes to the DHT first and then to the pushnodes when it finishes. This creates an opportunity where the node may be killed while it's publishing to the DHT and before it has an opportunity to publish to the pushnodes. This commit changes it to publish to the pushnodes in parrallel to the DHT so as to avoid the potentional problem from above.
1 parent 73d558b commit 604f33c

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

core/net.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,23 +98,32 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
9898
}
9999
}
100100
log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String())
101-
OfflineMessageWaitGroup.Add(2)
102-
go func() {
103-
ctx, cancel := context.WithCancel(context.Background())
104-
defer cancel()
105-
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
106-
if err != nil {
107-
log.Error(err)
108-
}
109101

110-
// Push provider to our push nodes for redundancy
111-
for _, p := range n.PushNodes {
102+
// We publish our pointers to three different locations:
103+
// 1. The pushnodes
104+
// 2. The DHT
105+
// 3. Pubsub
106+
// Each one is done in a separate goroutine so as to not block but we
107+
// do increment the OfflineMessageWaitGroup which is used to block
108+
// shutdown until all publishing is finished.
109+
OfflineMessageWaitGroup.Add(2 + len(n.PushNodes))
110+
for _, p := range n.PushNodes {
111+
go func(pid peer.ID) {
112112
ctx, cancel := context.WithCancel(context.Background())
113113
defer cancel()
114-
err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer)
114+
err := ipfs.PutPointerToPeer(n.DHT, ctx, pid, pointer)
115115
if err != nil {
116116
log.Error(err)
117117
}
118+
OfflineMessageWaitGroup.Done()
119+
}(p)
120+
}
121+
go func() {
122+
ctx, cancel := context.WithCancel(context.Background())
123+
defer cancel()
124+
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
125+
if err != nil {
126+
log.Error(err)
118127
}
119128

120129
OfflineMessageWaitGroup.Done()

0 commit comments

Comments
 (0)