@@ -2,71 +2,130 @@ package codec
2
2
3
3
import (
4
4
"encoding/json"
5
+ "fmt"
5
6
"net"
7
+ "time"
6
8
7
9
"github.com/ethereum/go-ethereum/rpc/shared"
8
10
)
9
11
10
12
const (
11
- MAX_RESPONSE_SIZE = 64 * 1024
13
+ READ_TIMEOUT = 15 // read timeout in seconds
14
+ MAX_REQUEST_SIZE = 1024 * 1024
15
+ MAX_RESPONSE_SIZE = 1024 * 1024
12
16
)
13
17
14
18
// Json serialization support
15
19
type JsonCodec struct {
16
20
c net.Conn
17
- d * json.Decoder
18
- e * json.Encoder
19
21
}
20
22
21
23
// Create new JSON coder instance
22
24
func NewJsonCoder (conn net.Conn ) ApiCoder {
23
25
return & JsonCodec {
24
26
c : conn ,
25
- d : json .NewDecoder (conn ),
26
- e : json .NewEncoder (conn ),
27
27
}
28
28
}
29
29
30
30
// Serialize obj to JSON and write it to conn
31
- func (self * JsonCodec ) ReadRequest () (* shared.Request , error ) {
32
- req := shared.Request {}
33
- err := self .d .Decode (& req )
34
- if err == nil {
35
- return & req , nil
31
+ func (self * JsonCodec ) ReadRequest () (requests []* shared.Request , isBatch bool , err error ) {
32
+ bytesInBuffer := 0
33
+ buf := make ([]byte , MAX_REQUEST_SIZE )
34
+
35
+ deadline := time .Now ().Add (READ_TIMEOUT * time .Second )
36
+ if err := self .c .SetDeadline (deadline ); err != nil {
37
+ return nil , false , err
38
+ }
39
+
40
+ for {
41
+ n , err := self .c .Read (buf [bytesInBuffer :])
42
+ if err != nil {
43
+ self .c .Close ()
44
+ return nil , false , err
45
+ }
46
+
47
+ bytesInBuffer += n
48
+
49
+ singleRequest := shared.Request {}
50
+ err = json .Unmarshal (buf [:bytesInBuffer ], & singleRequest )
51
+ if err == nil {
52
+ requests := make ([]* shared.Request , 1 )
53
+ requests [0 ] = & singleRequest
54
+ return requests , false , nil
55
+ }
56
+
57
+ requests = make ([]* shared.Request , 0 )
58
+ err = json .Unmarshal (buf [:bytesInBuffer ], & requests )
59
+ if err == nil {
60
+ return requests , true , nil
61
+ }
36
62
}
37
- return nil , err
63
+
64
+ self .c .Close () // timeout
65
+ return nil , false , fmt .Errorf ("Unable to read response" )
38
66
}
39
67
40
68
func (self * JsonCodec ) ReadResponse () (interface {}, error ) {
41
- var err error
69
+ bytesInBuffer := 0
42
70
buf := make ([]byte , MAX_RESPONSE_SIZE )
43
- n , _ := self .c .Read (buf )
44
71
45
- var failure shared. ErrorResponse
46
- if err = json . Unmarshal ( buf [: n ], & failure ); err == nil && failure . Error != nil {
47
- return failure , nil
72
+ deadline := time . Now (). Add ( READ_TIMEOUT * time . Second )
73
+ if err := self . c . SetDeadline ( deadline ); err != nil {
74
+ return nil , err
48
75
}
49
76
50
- var success shared.SuccessResponse
51
- if err = json .Unmarshal (buf [:n ], & success ); err == nil {
52
- return success , nil
77
+ for {
78
+ n , err := self .c .Read (buf [bytesInBuffer :])
79
+ if err != nil {
80
+ return nil , err
81
+ }
82
+ bytesInBuffer += n
83
+
84
+ var success shared.SuccessResponse
85
+ if err = json .Unmarshal (buf [:bytesInBuffer ], & success ); err == nil {
86
+ return success , nil
87
+ }
88
+
89
+ var failure shared.ErrorResponse
90
+ if err = json .Unmarshal (buf [:bytesInBuffer ], & failure ); err == nil && failure .Error != nil {
91
+ return failure , nil
92
+ }
53
93
}
54
94
55
- return nil , err
95
+ self .c .Close ()
96
+ return nil , fmt .Errorf ("Unable to read response" )
56
97
}
57
98
58
- // Encode response to encoded form in underlying stream
99
+ // Decode data
59
100
func (self * JsonCodec ) Decode (data []byte , msg interface {}) error {
60
101
return json .Unmarshal (data , msg )
61
102
}
62
103
104
+ // Encode message
63
105
func (self * JsonCodec ) Encode (msg interface {}) ([]byte , error ) {
64
106
return json .Marshal (msg )
65
107
}
66
108
67
109
// Parse JSON data from conn to obj
68
110
func (self * JsonCodec ) WriteResponse (res interface {}) error {
69
- return self .e .Encode (& res )
111
+ data , err := json .Marshal (res )
112
+ if err != nil {
113
+ self .c .Close ()
114
+ return err
115
+ }
116
+
117
+ bytesWritten := 0
118
+
119
+ for bytesWritten < len (data ) {
120
+ n , err := self .c .Write (data [bytesWritten :])
121
+ if err != nil {
122
+ self .c .Close ()
123
+ return err
124
+ }
125
+ bytesWritten += n
126
+ }
127
+
128
+ return nil
70
129
}
71
130
72
131
// Close decoder and encoder
0 commit comments