9
9
"math/rand"
10
10
"net"
11
11
"sync"
12
+ "sync/atomic"
12
13
"time"
14
+ "unsafe"
13
15
)
14
16
15
17
// An ICMPRequest represents an ICMPRequest issued by ping or trace for listener
@@ -34,16 +36,20 @@ func (r *ICMPRequest) SetTimeout(duration time.Duration) {
34
36
r .Deadline = r .IssueTime .Add (duration )
35
37
}
36
38
37
- func (r ICMPRequest ) Passed (time time.Time ) bool {
39
+ func (r * ICMPRequest ) Passed (time time.Time ) bool {
38
40
return r .Deadline .Before (time )
39
41
}
40
42
41
- func (r ICMPRequest ) Deliver (response Response ) bool {
43
+ func (r * ICMPRequest ) Deliver (response Response ) bool {
44
+ if r .delivery == nil {
45
+ return false
46
+ }
47
+
42
48
if response == nil {
43
49
r .delivery <- & Result {
44
50
Code : 256 ,
45
51
}
46
- close ( r .delivery )
52
+ r .delivery = nil
47
53
return true
48
54
}
49
55
ID , TargetIP := response .GetIdentifier ()
@@ -60,7 +66,7 @@ func (r ICMPRequest) Deliver(response Response) bool {
60
66
Code : Code ,
61
67
}
62
68
}
63
- close ( r .delivery )
69
+ r .delivery = nil
64
70
return true
65
71
}
66
72
return false
@@ -82,11 +88,11 @@ type ICMPResponse struct {
82
88
Code int
83
89
}
84
90
85
- func (I ICMPResponse ) GetIdentifier () (int , net.IP ) {
91
+ func (I * ICMPResponse ) GetIdentifier () (int , net.IP ) {
86
92
return I .ID , I .TargetIP
87
93
}
88
94
89
- func (I ICMPResponse ) GetInformation () (net.IP , time.Time , int ) {
95
+ func (I * ICMPResponse ) GetInformation () (net.IP , time.Time , int ) {
90
96
return I .AddrIP , I .Received , I .Code
91
97
}
92
98
@@ -118,12 +124,10 @@ type ICMPManager struct {
118
124
// which send other Protocol message(e.g. TCP, UDP) but expect ICMP reply
119
125
// messages.
120
126
extListener map [int ]chan * RawResponse
121
- // counter will fill the sequence field of the request (precisely 16bits)
127
+ // counter will fill the sequence field of the request (use low 16bits)
122
128
// to identify packet. it will be increased for each call and can hold at
123
129
// most 65536 concurrent pending requests.
124
- counter uint16
125
- // l is the mutex to make counter increment thread safe.
126
- l sync.Mutex
130
+ counter uint32
127
131
// context to send the manager stop message
128
132
ctx context.Context
129
133
// function to call to stop the manager
@@ -138,7 +142,7 @@ type ICMPManager struct {
138
142
var manager * ICMPManager
139
143
var once sync.Once
140
144
141
- // listen to ICMP socket to receive packet
145
+ // ICMPv4Receiver listen to ICMP socket to receive packet
142
146
func ICMPv4Receiver (conn * icmp.PacketConn , wait time.Duration , icmpResponse chan * ICMPResponse ,
143
147
rawResponse chan * RawResponse , ctx context.Context ) {
144
148
select {
@@ -151,7 +155,11 @@ func ICMPv4Receiver(conn *icmp.PacketConn, wait time.Duration, icmpResponse chan
151
155
// return
152
156
//}
153
157
// wait `wait` to receive some body
154
- if err := conn .SetDeadline (time .Now ().Add (wait )); err != nil {
158
+ var deadline time.Time
159
+ if wait != 0 {
160
+ deadline = time .Now ().Add (wait )
161
+ }
162
+ if err := conn .SetDeadline (deadline ); err != nil {
155
163
return
156
164
}
157
165
readBytes := make ([]byte , 1500 ) // max MTU
@@ -238,7 +246,7 @@ func ICMPv4Receiver(conn *icmp.PacketConn, wait time.Duration, icmpResponse chan
238
246
}
239
247
}
240
248
241
- // listen to ICMPv6 socket to receive packet
249
+ // ICMPv6Receiver listen to ICMPv6 socket to receive packet
242
250
func ICMPv6Receiver (conn * icmp.PacketConn , wait time.Duration , icmpResponse chan * ICMPResponse ,
243
251
rawResponse chan * RawResponse , ctx context.Context ) {
244
252
select {
@@ -251,7 +259,11 @@ func ICMPv6Receiver(conn *icmp.PacketConn, wait time.Duration, icmpResponse chan
251
259
// return
252
260
//}
253
261
// wait `wait` to receive some body
254
- if err := conn .SetDeadline (time .Now ().Add (wait )); err != nil {
262
+ var deadline time.Time
263
+ if wait != 0 {
264
+ deadline = time .Now ().Add (wait )
265
+ }
266
+ if err := conn .SetDeadline (deadline ); err != nil {
255
267
return
256
268
}
257
269
readBytes := make ([]byte , 1500 ) // max MTU
@@ -338,8 +350,9 @@ func ICMPv6Receiver(conn *icmp.PacketConn, wait time.Duration, icmpResponse chan
338
350
}
339
351
}
340
352
341
- // return ICMPManager to caller. As listening to ICMP will receive all ICMP
342
- // packet, there will be only one manager in the whole process.
353
+ // GetICMPManager return ICMPManager to caller.
354
+ // As listening to ICMP will receive all ICMP packet,
355
+ // there will be only one manager in the whole process.
343
356
func GetICMPManager () * ICMPManager {
344
357
once .Do (func () {
345
358
ctx , cancel := context .WithCancel (context .Background ())
@@ -363,15 +376,15 @@ func GetICMPManager() *ICMPManager {
363
376
panic (fmt .Sprintf ("Can't listen to ICMPv6: %s" , err ))
364
377
}
365
378
manager .pConn6 = conn6
366
- go ICMPv4Receiver (conn4 , 1000 * time . Millisecond , result4 , raw4 , ctx )
367
- go ICMPv6Receiver (conn6 , 1000 * time . Millisecond , result6 , raw6 , ctx )
379
+ go ICMPv4Receiver (conn4 , 0 , result4 , raw4 , ctx )
380
+ go ICMPv6Receiver (conn6 , 0 , result6 , raw6 , ctx )
368
381
go manager .icmpDispatcher (result4 , result6 )
369
382
go manager .rawDispatcher (raw4 , raw6 )
370
383
// warm-up
371
384
addr , _ := net .ResolveIPAddr ("" , "127.0.0.1" )
372
- manager .Issue (addr , 100 , time .Second )
385
+ <- manager .Issue (addr , 100 , time .Second )
373
386
addr , _ = net .ResolveIPAddr ("" , "::1" )
374
- manager .Issue (addr , 100 , time .Second )
387
+ <- manager .Issue (addr , 100 , time .Second )
375
388
})
376
389
return manager
377
390
}
@@ -389,10 +402,7 @@ func (mgr *ICMPManager) Issue(ip net.Addr, ttl int, timeout time.Duration) (deli
389
402
}
390
403
dest = ipAddr .IP .To16 ()
391
404
392
- mgr .l .Lock ()
393
- count := mgr .counter
394
- mgr .counter ++
395
- mgr .l .Unlock ()
405
+ count := (atomic .AddUint32 (& mgr .counter , 1 ) - 1 ) & 0xffff
396
406
397
407
id := rand .Intn (1 << 16 )
398
408
var msg []byte
@@ -500,6 +510,14 @@ func (mgr *ICMPManager) rawDispatcher(v4, v6 chan *RawResponse) {
500
510
}
501
511
}
502
512
513
+ func (mgr * ICMPManager ) Flush () {
514
+ queue := NewCMap (32 )
515
+ queue = (* ConMapRequest )(atomic .SwapPointer ((* unsafe .Pointer )((unsafe .Pointer )(& mgr .queue )), unsafe .Pointer (queue )))
516
+ for t := range mgr .queue .IterBuffered () {
517
+ t .Val .Deliver (nil )
518
+ }
519
+ }
520
+
503
521
func (mgr * ICMPManager ) Finish () {
504
522
mgr .cancel ()
505
523
}
0 commit comments