41
41
)
42
42
43
43
const (
44
- autoRefreshInterval = 1 * time .Hour
45
- seedCount = 30
46
- seedMaxAge = 5 * 24 * time .Hour
44
+ autoRefreshInterval = 1 * time .Hour
45
+ bucketRefreshInterval = 1 * time .Minute
46
+ seedCount = 30
47
+ seedMaxAge = 5 * 24 * time .Hour
47
48
)
48
49
49
50
const testTopic = "foo"
@@ -62,8 +63,9 @@ func debugLog(s string) {
62
63
// BootNodes are the enode URLs of the P2P bootstrap nodes for the experimental RLPx v5 "Topic Discovery" network
63
64
// warning: local bootnodes for testing!!!
64
65
var BootNodes = []* Node {
65
- //MustParseNode("enode://6f974ede10d07334e7e651c1501cb540d087dd3a6dea81432620895c913f281790b49459d72cb8011bfbbfbd24fad956356189c31b7181a96cd44ccfb68bfc71@127.0.0.1:30301"),
66
66
MustParseNode ("enode://0cc5f5ffb5d9098c8b8c62325f3797f56509bff942704687b6530992ac706e2cb946b90a34f1f19548cd3c7baccbcaea354531e5983c7d1bc0dee16ce4b6440b@40.118.3.223:30305" ),
67
+ MustParseNode ("enode://1c7a64d76c0334b0418c004af2f67c50e36a3be60b5e4790bdac0439d21603469a85fad36f2473c9a80eb043ae60936df905fa28f1ff614c3e5dc34f15dcd2dc@40.118.3.223:30308" ),
68
+ MustParseNode ("enode://85c85d7143ae8bb96924f2b54f1b3e70d8c4d367af305325d30a61385a432f247d2c75c45c6b4a60335060d072d7f5b35dd1d4c45f76941f62a4f83b6e75daaf@40.118.3.223:30309" ),
67
69
}
68
70
69
71
// Network manages the table and all protocol interaction.
@@ -82,7 +84,6 @@ type Network struct {
82
84
tableOpResp chan struct {}
83
85
topicRegisterReq chan topicRegisterReq
84
86
topicSearchReq chan topicSearchReq
85
- bucketFillChn chan chan struct {}
86
87
87
88
// State of the main loop.
88
89
tab * Table
@@ -169,7 +170,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
169
170
queryReq : make (chan * findnodeQuery ),
170
171
topicRegisterReq : make (chan topicRegisterReq ),
171
172
topicSearchReq : make (chan topicSearchReq ),
172
- bucketFillChn : make (chan chan struct {}, 1 ),
173
173
nodes : make (map [NodeID ]* Node ),
174
174
}
175
175
go net .loop ()
@@ -353,8 +353,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {
353
353
354
354
func (net * Network ) loop () {
355
355
var (
356
- refreshTimer = time .NewTicker (autoRefreshInterval )
357
- refreshDone chan struct {} // closed when the 'refresh' lookup has ended
356
+ refreshTimer = time .NewTicker (autoRefreshInterval )
357
+ bucketRefreshTimer = time .NewTimer (bucketRefreshInterval )
358
+ refreshDone chan struct {} // closed when the 'refresh' lookup has ended
358
359
)
359
360
360
361
// Tracking the next ticket to register.
@@ -389,6 +390,7 @@ func (net *Network) loop() {
389
390
topicRegisterLookupDone chan []* Node
390
391
topicRegisterLookupTick = time .NewTimer (0 )
391
392
topicSearchLookupTarget lookupInfo
393
+ searchReqWhenRefreshDone []topicSearchReq
392
394
)
393
395
topicSearchLookupDone := make (chan []* Node , 1 )
394
396
<- topicRegisterLookupTick .C
@@ -406,6 +408,7 @@ loop:
406
408
407
409
// Ingress packet handling.
408
410
case pkt := <- net .read :
411
+ //fmt.Println("read", pkt.ev)
409
412
debugLog ("<-net.read" )
410
413
n := net .internNode (& pkt )
411
414
prestate := n .state
@@ -503,14 +506,18 @@ loop:
503
506
net .conn .sendTopicRegister (nextTicket .t .node , nextTicket .t .topics , nextTicket .idx , nextTicket .t .pong )
504
507
505
508
case req := <- net .topicSearchReq :
506
- debugLog ("<-net.topicSearchReq" )
507
- if req .found == nil {
508
- net .ticketStore .removeSearchTopic (req .topic )
509
- continue
510
- }
511
- net .ticketStore .addSearchTopic (req .topic , req .found )
512
- if (topicSearchLookupTarget .target == common.Hash {}) {
513
- topicSearchLookupDone <- nil
509
+ if refreshDone == nil {
510
+ debugLog ("<-net.topicSearchReq" )
511
+ if req .found == nil {
512
+ net .ticketStore .removeSearchTopic (req .topic )
513
+ continue
514
+ }
515
+ net .ticketStore .addSearchTopic (req .topic , req .found )
516
+ if (topicSearchLookupTarget .target == common.Hash {}) {
517
+ topicSearchLookupDone <- nil
518
+ }
519
+ } else {
520
+ searchReqWhenRefreshDone = append (searchReqWhenRefreshDone , req )
514
521
}
515
522
516
523
case nodes := <- topicSearchLookupDone :
@@ -519,7 +526,14 @@ loop:
519
526
net .ping (n , n .addr ())
520
527
return n .pingEcho
521
528
}, func (n * Node , topic Topic ) []byte {
522
- return net .conn .send (n , topicQueryPacket , topicQuery {Topic : topic }) // TODO: set expiration
529
+ if n .state == known {
530
+ return net .conn .send (n , topicQueryPacket , topicQuery {Topic : topic }) // TODO: set expiration
531
+ } else {
532
+ if n .state == unknown {
533
+ net .ping (n , n .addr ())
534
+ }
535
+ return nil
536
+ }
523
537
})
524
538
topicSearchLookupTarget = net .ticketStore .nextSearchLookup ()
525
539
target := topicSearchLookupTarget .target
@@ -564,9 +578,12 @@ loop:
564
578
refreshDone = make (chan struct {})
565
579
net .refresh (refreshDone )
566
580
}
567
- case doneChn := <- net .bucketFillChn :
568
- debugLog ("bucketFill" )
569
- net .bucketFill (doneChn )
581
+ case <- bucketRefreshTimer .C :
582
+ target := net .tab .chooseBucketRefreshTarget ()
583
+ go func () {
584
+ net .lookup (target , false )
585
+ bucketRefreshTimer .Reset (bucketRefreshInterval )
586
+ }()
570
587
case newNursery := <- net .refreshReq :
571
588
debugLog ("<-net.refreshReq" )
572
589
if newNursery != nil {
@@ -580,6 +597,13 @@ loop:
580
597
case <- refreshDone :
581
598
debugLog ("<-net.refreshDone" )
582
599
refreshDone = nil
600
+ list := searchReqWhenRefreshDone
601
+ searchReqWhenRefreshDone = nil
602
+ go func () {
603
+ for _ , req := range list {
604
+ net .topicSearchReq <- req
605
+ }
606
+ }()
583
607
}
584
608
}
585
609
debugLog ("loop stopped" )
@@ -643,28 +667,13 @@ func (net *Network) refresh(done chan<- struct{}) {
643
667
}()
644
668
}
645
669
646
- func (net * Network ) bucketFill (done chan <- struct {}) {
647
- target := net .tab .chooseBucketFillTarget ()
648
- go func () {
649
- net .lookup (target , false )
650
- close (done )
651
- }()
652
- }
653
-
654
- func (net * Network ) BucketFill () {
655
- done := make (chan struct {})
656
- select {
657
- case net .bucketFillChn <- done :
658
- <- done
659
- case <- net .closed :
660
- close (done )
661
- }
662
- }
663
-
664
670
// Node Interning.
665
671
666
672
func (net * Network ) internNode (pkt * ingressPacket ) * Node {
667
673
if n := net .nodes [pkt .remoteID ]; n != nil {
674
+ n .IP = pkt .remoteAddr .IP
675
+ n .UDP = uint16 (pkt .remoteAddr .Port )
676
+ n .TCP = uint16 (pkt .remoteAddr .Port )
668
677
return n
669
678
}
670
679
n := NewNode (pkt .remoteID , pkt .remoteAddr .IP , uint16 (pkt .remoteAddr .Port ), uint16 (pkt .remoteAddr .Port ))
@@ -967,8 +976,10 @@ func init() {
967
976
968
977
// handle processes packets sent by n and events related to n.
969
978
func (net * Network ) handle (n * Node , ev nodeEvent , pkt * ingressPacket ) error {
979
+ //fmt.Println("handle", n.addr().String(), n.state, ev)
970
980
if pkt != nil {
971
981
if err := net .checkPacket (n , ev , pkt ); err != nil {
982
+ //fmt.Println("check err:", err)
972
983
return err
973
984
}
974
985
// Start the background expiration goroutine after the first
@@ -985,6 +996,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
985
996
}
986
997
next , err := n .state .handle (net , n , ev , pkt )
987
998
net .transition (n , next )
999
+ //fmt.Println("new state:", n.state)
988
1000
return err
989
1001
}
990
1002
@@ -1040,6 +1052,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
1040
1052
}
1041
1053
1042
1054
func (net * Network ) ping (n * Node , addr * net.UDPAddr ) {
1055
+ //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
1056
+ if n .pingEcho != nil || n .ID == net .tab .self .ID {
1057
+ //fmt.Println(" not sent")
1058
+ return
1059
+ }
1043
1060
debugLog (fmt .Sprintf ("ping(node = %x)" , n .ID [:8 ]))
1044
1061
n .pingTopics = net .ticketStore .regTopicSet ()
1045
1062
n .pingEcho = net .conn .sendPing (n , addr , n .pingTopics )
0 commit comments