@@ -17,6 +17,7 @@ package pan
17
17
import (
18
18
"context"
19
19
"math/rand"
20
+ "sync"
20
21
"time"
21
22
)
22
23
@@ -25,9 +26,10 @@ type refreshee interface {
25
26
}
26
27
27
28
type refresher struct {
28
- subscribers map [IA ][]refreshee
29
- pool * pathPool
30
- newSubscription chan bool
29
+ subscribersMutex sync.Mutex
30
+ subscribers map [IA ][]refreshee
31
+ newSubscription chan bool
32
+ pool * pathPool
31
33
}
32
34
33
35
func makeRefresher (pool * pathPool ) refresher {
@@ -48,6 +50,8 @@ func (r *refresher) subscribe(ctx context.Context, dst IA, s refreshee) ([]*Path
48
50
if len (paths ) == 0 {
49
51
return nil , errNoPathTo (dst )
50
52
}
53
+ r .subscribersMutex .Lock ()
54
+ defer r .subscribersMutex .Unlock ()
51
55
subs , ok := r .subscribers [dst ]
52
56
r .subscribers [dst ] = append (subs , s )
53
57
if ! ok {
@@ -57,6 +61,9 @@ func (r *refresher) subscribe(ctx context.Context, dst IA, s refreshee) ([]*Path
57
61
}
58
62
59
63
func (r * refresher ) unsubscribe (ia IA , s refreshee ) {
64
+ r .subscribersMutex .Lock ()
65
+ defer r .subscribersMutex .Unlock ()
66
+
60
67
idx := - 1
61
68
subs := r .subscribers [ia ]
62
69
for i , v := range subs {
@@ -100,7 +107,14 @@ func (r *refresher) run() {
100
107
func (r * refresher ) refresh () {
101
108
now := time .Now ()
102
109
// when a refresh is triggered, we batch all
103
- for dstIA , subscribers := range r .subscribers {
110
+ r .subscribersMutex .Lock ()
111
+ refreshIAs := make ([]IA , 0 , len (r .subscribers ))
112
+ for dstIA := range r .subscribers {
113
+ refreshIAs = append (refreshIAs , dstIA )
114
+ }
115
+ r .subscribersMutex .Unlock ()
116
+
117
+ for _ , dstIA := range refreshIAs {
104
118
poolEntry , _ := r .pool .entry (dstIA )
105
119
if r .shouldRefresh (now , poolEntry .earliestExpiry , poolEntry .lastQuery ) {
106
120
paths , err := r .pool .queryPaths (context .Background (), dstIA )
@@ -112,9 +126,11 @@ func (r *refresher) refresh() {
112
126
// to sciond or something like that.
113
127
continue
114
128
}
115
- for _ , subscriber := range subscribers {
129
+ r .subscribersMutex .Lock ()
130
+ for _ , subscriber := range r .subscribers [dstIA ] {
116
131
subscriber .refresh (dstIA , paths )
117
132
}
133
+ r .subscribersMutex .Unlock ()
118
134
}
119
135
}
120
136
}
0 commit comments