Skip to content

Commit 5757a0e

Browse files
author
Bas van Kervel
committed
added IPC timeout support
1 parent 04910c9 commit 5757a0e

File tree

1 file changed

+34
-26
lines changed

1 file changed

+34
-26
lines changed

rpc/codec/json.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,61 +10,69 @@ import (
1010
)
1111

1212
const (
13+
READ_TIMEOUT = 15 // read timeout in seconds
1314
MAX_REQUEST_SIZE = 1024 * 1024
1415
MAX_RESPONSE_SIZE = 1024 * 1024
1516
)
1617

1718
// Json serialization support
1819
type JsonCodec struct {
19-
c net.Conn
20-
buffer []byte
21-
bytesInBuffer int
20+
c net.Conn
2221
}
2322

2423
// Create new JSON coder instance
2524
func NewJsonCoder(conn net.Conn) ApiCoder {
2625
return &JsonCodec{
27-
c: conn,
28-
buffer: make([]byte, MAX_REQUEST_SIZE),
29-
bytesInBuffer: 0,
26+
c: conn,
3027
}
3128
}
3229

3330
// Serialize obj to JSON and write it to conn
3431
func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) {
35-
n, err := self.c.Read(self.buffer[self.bytesInBuffer:])
36-
if err != nil {
37-
self.bytesInBuffer = 0
32+
bytesInBuffer := 0
33+
buf := make([]byte, MAX_REQUEST_SIZE)
34+
35+
deadline := time.Now().Add(READ_TIMEOUT * time.Second)
36+
if err := self.c.SetDeadline(deadline); err != nil {
3837
return nil, false, err
3938
}
4039

41-
self.bytesInBuffer += n
40+
for {
41+
n, err := self.c.Read(buf[bytesInBuffer:])
42+
if err != nil {
43+
self.c.Close()
44+
return nil, false, err
45+
}
46+
47+
bytesInBuffer += n
4248

43-
singleRequest := shared.Request{}
44-
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest)
45-
if err == nil {
46-
self.bytesInBuffer = 0
47-
requests := make([]*shared.Request, 1)
48-
requests[0] = &singleRequest
49-
return requests, false, nil
50-
}
49+
singleRequest := shared.Request{}
50+
err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest)
51+
if err == nil {
52+
requests := make([]*shared.Request, 1)
53+
requests[0] = &singleRequest
54+
return requests, false, nil
55+
}
5156

52-
requests = make([]*shared.Request, 0)
53-
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests)
54-
if err == nil {
55-
self.bytesInBuffer = 0
56-
return requests, true, nil
57+
requests = make([]*shared.Request, 0)
58+
err = json.Unmarshal(buf[:bytesInBuffer], &requests)
59+
if err == nil {
60+
return requests, true, nil
61+
}
5762
}
5863

59-
return nil, false, err
64+
self.c.Close() // timeout
65+
return nil, false, fmt.Errorf("Unable to read response")
6066
}
6167

6268
func (self *JsonCodec) ReadResponse() (interface{}, error) {
6369
bytesInBuffer := 0
6470
buf := make([]byte, MAX_RESPONSE_SIZE)
6571

66-
deadline := time.Now().Add(15 * time.Second)
67-
self.c.SetDeadline(deadline)
72+
deadline := time.Now().Add(READ_TIMEOUT * time.Second)
73+
if err := self.c.SetDeadline(deadline); err != nil {
74+
return nil, err
75+
}
6876

6977
for {
7078
n, err := self.c.Read(buf[bytesInBuffer:])

0 commit comments

Comments
 (0)