Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 8d11169

Browse files
drwashohoffmabc
authored andcommitted
feat: Fetch offline messages from push nodes and DHT in parallel
Originally from Chris' PR 1688
1 parent 039e4e5 commit 8d11169

File tree

1 file changed

+32
-28
lines changed

1 file changed

+32
-28
lines changed

net/retriever/retriever.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
9191
WaitGroup: new(sync.WaitGroup),
9292
}
9393

94-
mr.Add(1)
94+
mr.Add(2)
9595
return &mr
9696
}
9797

@@ -100,57 +100,61 @@ func (m *MessageRetriever) Run() {
100100
peers := time.NewTicker(time.Minute * 10)
101101
defer dht.Stop()
102102
defer peers.Stop()
103-
go m.fetchPointers(true)
103+
go m.fetchPointersFromDHT()
104+
go m.fetchPointersFromPushNodes()
104105
for {
105106
select {
106107
case <-dht.C:
107108
m.Add(1)
108-
go m.fetchPointers(true)
109+
go m.fetchPointersFromDHT()
109110
case <-peers.C:
110111
m.Add(1)
111-
go m.fetchPointers(false)
112+
go m.fetchPointersFromPushNodes()
112113
}
113114
}
114115
}
115116

116117
// RunOnce - used to fetch messages only once
117118
func (m *MessageRetriever) RunOnce() {
118119
m.Add(1)
119-
go m.fetchPointers(true)
120+
go m.fetchPointersFromDHT()
120121
m.Add(1)
121-
go m.fetchPointers(false)
122+
go m.fetchPointersFromPushNodes()
122123
}
123124

124-
func (m *MessageRetriever) fetchPointers(useDHT bool) {
125+
func (m *MessageRetriever) fetchPointersFromDHT() {
125126
ctx, cancel := context.WithCancel(context.Background())
126127
defer cancel()
127-
wg := new(sync.WaitGroup)
128-
downloaded := 0
129128
mh, _ := multihash.FromB58String(m.node.Identity.Pretty())
130129
peerOut := make(chan ps.PeerInfo)
131130
go func(c chan ps.PeerInfo) {
132-
pwg := new(sync.WaitGroup)
133-
pwg.Add(1)
134-
go func(c chan ps.PeerInfo) {
135-
out := m.getPointersDataPeers()
136-
for p := range out {
137-
c <- p
138-
}
139-
pwg.Done()
140-
}(c)
141-
if useDHT {
142-
pwg.Add(1)
143-
go func(c chan ps.PeerInfo) {
144-
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
145-
for p := range iout {
146-
c <- p
147-
}
148-
pwg.Done()
149-
}(c)
131+
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
132+
for p := range iout {
133+
c <- p
134+
}
135+
close(c)
136+
137+
}(peerOut)
138+
139+
m.downloadMessages(peerOut)
140+
}
141+
142+
func (m *MessageRetriever) fetchPointersFromPushNodes() {
143+
peerOut := make(chan ps.PeerInfo)
144+
go func(c chan ps.PeerInfo) {
145+
out := m.getPointersDataPeers()
146+
for p := range out {
147+
c <- p
150148
}
151-
pwg.Wait()
152149
close(c)
150+
153151
}(peerOut)
152+
m.downloadMessages(peerOut)
153+
}
154+
155+
func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
156+
wg := new(sync.WaitGroup)
157+
downloaded := 0
154158

155159
inFlight := make(map[string]bool)
156160
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found

0 commit comments

Comments
 (0)