@@ -13,6 +13,7 @@ import (
13
13
"math/big"
14
14
"strings"
15
15
"sync"
16
+ "time"
16
17
17
18
"github.com/ethereum/go-ethereum/core"
18
19
"github.com/ethereum/go-ethereum/core/types"
@@ -31,13 +32,14 @@ const (
31
32
32
33
type EthereumApi struct {
33
34
xeth * xeth.XEth
35
+ quit chan struct {}
34
36
filterManager * filter.FilterManager
35
37
36
38
logMut sync.RWMutex
37
- logs map [int ]state. Logs
39
+ logs map [int ]* logFilter
38
40
39
41
messagesMut sync.RWMutex
40
- messages map [int ][]xeth. WhisperMessage
42
+ messages map [int ]* whisperFilter
41
43
// Register keeps a list of accounts and transaction data
42
44
regmut sync.Mutex
43
45
register map [string ][]* NewTxArgs
@@ -49,12 +51,14 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi {
49
51
db , _ := ethdb .NewLDBDatabase ("dapps" )
50
52
api := & EthereumApi {
51
53
xeth : eth ,
54
+ quit : make (chan struct {}),
52
55
filterManager : filter .NewFilterManager (eth .Backend ().EventMux ()),
53
- logs : make (map [int ]state. Logs ),
54
- messages : make (map [int ][]xeth. WhisperMessage ),
56
+ logs : make (map [int ]* logFilter ),
57
+ messages : make (map [int ]* whisperFilter ),
55
58
db : db ,
56
59
}
57
60
go api .filterManager .Start ()
61
+ go api .start ()
58
62
59
63
return api
60
64
}
@@ -97,7 +101,11 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
97
101
self .logMut .Lock ()
98
102
defer self .logMut .Unlock ()
99
103
100
- self .logs [id ] = append (self .logs [id ], logs ... )
104
+ if self .logs [id ] == nil {
105
+ self .logs [id ] = & logFilter {timeout : time .Now ()}
106
+ }
107
+
108
+ self .logs [id ].add (logs ... )
101
109
}
102
110
id = self .filterManager .InstallFilter (filter )
103
111
* reply = id
@@ -113,7 +121,11 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error
113
121
self .logMut .Lock ()
114
122
defer self .logMut .Unlock ()
115
123
116
- self .logs [id ] = append (self .logs [id ], & state.StateLog {})
124
+ if self .logs [id ] == nil {
125
+ self .logs [id ] = & logFilter {timeout : time .Now ()}
126
+ }
127
+
128
+ self .logs [id ].add (& state.StateLog {})
117
129
}
118
130
if args == "pending" {
119
131
filter .PendingCallback = callback
@@ -131,9 +143,9 @@ func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
131
143
self .logMut .Lock ()
132
144
defer self .logMut .Unlock ()
133
145
134
- * reply = toLogs ( self .logs [id ])
135
-
136
- self . logs [ id ] = nil // empty the logs
146
+ if self .logs [id ] != nil {
147
+ * reply = toLogs ( self . logs [ id ]. get ())
148
+ }
137
149
138
150
return nil
139
151
}
@@ -331,7 +343,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e
331
343
args .Fn = func (msg xeth.WhisperMessage ) {
332
344
p .messagesMut .Lock ()
333
345
defer p .messagesMut .Unlock ()
334
- p .messages [id ] = append (p .messages [id ], msg )
346
+ if p .messages [id ] == nil {
347
+ p .messages [id ] = & whisperFilter {timeout : time .Now ()}
348
+ }
349
+ p .messages [id ].add (msg ) // = append(p.messages[id], msg)
335
350
}
336
351
id = p .xeth .Whisper ().Watch (args )
337
352
* reply = id
@@ -342,9 +357,9 @@ func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
342
357
self .messagesMut .Lock ()
343
358
defer self .messagesMut .Unlock ()
344
359
345
- * reply = self .messages [id ]
346
-
347
- self . messages [ id ] = nil // empty the messages
360
+ if self .messages [id ] != nil {
361
+ * reply = self . messages [ id ]. get ()
362
+ }
348
363
349
364
return nil
350
365
}
@@ -535,3 +550,34 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
535
550
rpclogger .DebugDetailf ("Reply: %T %s" , reply , reply )
536
551
return nil
537
552
}
553
+
554
+ var filterTickerTime = 15 * time .Second
555
+
556
+ func (self * EthereumApi ) start () {
557
+ timer := time .NewTicker (filterTickerTime )
558
+ done:
559
+ for {
560
+ select {
561
+ case <- timer .C :
562
+ self .logMut .Lock ()
563
+ self .messagesMut .Lock ()
564
+ for id , filter := range self .logs {
565
+ if time .Since (filter .timeout ) > 20 * time .Second {
566
+ delete (self .logs , id )
567
+ }
568
+ }
569
+
570
+ for id , filter := range self .messages {
571
+ if time .Since (filter .timeout ) > 20 * time .Second {
572
+ delete (self .messages , id )
573
+ }
574
+ }
575
+ case <- self .quit :
576
+ break done
577
+ }
578
+ }
579
+ }
580
+
581
+ func (self * EthereumApi ) stop () {
582
+ close (self .quit )
583
+ }
0 commit comments