Skip to content

Commit db4c41d

Browse files
authored
fix decoding json streams (ipfs#180)
The decoder buffers internally.
1 parent b959c9b commit db4c41d

File tree

2 files changed

+13
-18
lines changed

2 files changed

+13
-18
lines changed

pubsub.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package shell
22

33
import (
44
"encoding/json"
5+
"io"
56

67
"github.com/libp2p/go-libp2p-peer"
78
)
@@ -16,33 +17,27 @@ type Message struct {
1617

1718
// PubSubSubscription allow you to receive pubsub records that where published on the network.
1819
type PubSubSubscription struct {
19-
resp *Response
20+
resp io.Closer
21+
dec *json.Decoder
2022
}
2123

22-
func newPubSubSubscription(resp *Response) *PubSubSubscription {
23-
sub := &PubSubSubscription{
24+
func newPubSubSubscription(resp io.ReadCloser) *PubSubSubscription {
25+
return &PubSubSubscription{
2426
resp: resp,
27+
dec: json.NewDecoder(resp),
2528
}
26-
27-
return sub
2829
}
2930

3031
// Next waits for the next record and returns that.
3132
func (s *PubSubSubscription) Next() (*Message, error) {
32-
if s.resp.Error != nil {
33-
return nil, s.resp.Error
34-
}
35-
36-
d := json.NewDecoder(s.resp.Output)
37-
3833
var r struct {
3934
From []byte `json:"from,omitempty"`
4035
Data []byte `json:"data,omitempty"`
4136
Seqno []byte `json:"seqno,omitempty"`
4237
TopicIDs []string `json:"topicIDs,omitempty"`
4338
}
4439

45-
err := d.Decode(&r)
40+
err := s.dec.Decode(&r)
4641
if err != nil {
4742
return nil, err
4843
}
@@ -61,9 +56,5 @@ func (s *PubSubSubscription) Next() (*Message, error) {
6156

6257
// Cancel cancels the given subscription.
6358
func (s *PubSubSubscription) Cancel() error {
64-
if s.resp.Output == nil {
65-
return nil
66-
}
67-
68-
return s.resp.Output.Close()
59+
return s.resp.Close()
6960
}

shell.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,11 @@ func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
437437
if err != nil {
438438
return nil, err
439439
}
440-
return newPubSubSubscription(resp), nil
440+
if resp.Error != nil {
441+
resp.Close()
442+
return nil, resp.Error
443+
}
444+
return newPubSubSubscription(resp.Output), nil
441445
}
442446

443447
func (s *Shell) PubSubPublish(topic, data string) (err error) {

0 commit comments

Comments
 (0)