Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 8d0e84f

Browse files
committed
make PubSubRecord an interface
1 parent c47982f commit 8d0e84f

File tree

2 files changed

+32
-36
lines changed

2 files changed

+32
-36
lines changed

pubsub.go

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,46 @@
11
package shell
22

33
import (
4-
"encoding/base64"
4+
"encoding/binary"
55
"encoding/json"
6-
)
76

8-
// floodsub uses base64 encoding for just about everything.
9-
// To Decode the base64 while also decoding the JSON the type b64String was added.
10-
type b64String string
7+
"github.com/libp2p/go-floodsub"
8+
"github.com/libp2p/go-libp2p-peer"
9+
)
1110

12-
// UnmarshalJSON implements the json.Unmarshaler interface.
13-
func (bs *b64String) UnmarshalJSON(in []byte) error {
14-
var b64 string
11+
// PubSubRecord is a record received via PubSub.
12+
type PubSubRecord interface {
13+
// From returns the peer ID of the node that published this record
14+
From() peer.ID
1515

16-
err := json.Unmarshal(in, &b64)
17-
if err != nil {
18-
return err
19-
}
16+
// Data returns the data field
17+
Data() []byte
2018

21-
bsStr, err := base64.StdEncoding.DecodeString(b64)
19+
// SeqNo is the sequence number of this record
20+
SeqNo() int64
2221

23-
*bs = b64String(bsStr)
24-
return err
22+
//TopicIDs is the list of topics this record belongs to
23+
TopicIDs() []string
2524
}
2625

27-
func (bs *b64String) Marshal() (string, error) {
28-
jsonBytes, err := json.Marshal(
29-
base64.StdEncoding.EncodeToString(
30-
[]byte(*bs)))
26+
type floodsubRecord struct {
27+
msg *floodsub.Message
28+
}
3129

32-
return string(jsonBytes), err
30+
func (r floodsubRecord) From() peer.ID {
31+
return r.msg.GetFrom()
3332
}
3433

35-
///
34+
func (r floodsubRecord) Data() []byte {
35+
return r.msg.GetData()
36+
}
3637

37-
// PubSubRecord is a record received via PubSub.
38-
type PubSubRecord struct {
39-
From string `json:"from"`
40-
Data b64String `json:"data"`
41-
SeqNo b64String `json:"seqno"`
42-
TopicIDs []string `json:"topicIDs"`
38+
func (r floodsubRecord) SeqNo() int64 {
39+
return int64(binary.BigEndian.Uint64(r.msg.GetSeqno()))
4340
}
4441

45-
// DataString returns the string representation of the data field.
46-
func (r PubSubRecord) DataString() string {
47-
return string(r.Data)
42+
func (r floodsubRecord) TopicIDs() []string {
43+
return r.msg.GetTopicIDs()
4844
}
4945

5046
///
@@ -64,17 +60,17 @@ func newPubSubSubscription(resp *Response) *PubSubSubscription {
6460
}
6561

6662
// Next waits for the next record and returns that.
67-
func (s *PubSubSubscription) Next() (*PubSubRecord, error) {
63+
func (s *PubSubSubscription) Next() (PubSubRecord, error) {
6864
if s.resp.Error != nil {
6965
return nil, s.resp.Error
7066
}
7167

7268
d := json.NewDecoder(s.resp.Output)
7369

74-
r := &PubSubRecord{}
70+
r := &floodsub.Message{}
7571
err := d.Decode(r)
7672

77-
return r, err
73+
return floodsubRecord{msg: r}, err
7874
}
7975

8076
// Cancel cancels the given subscription.

shell_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestPubSub(t *testing.T) {
147147

148148
is.Nil(err)
149149
is.NotNil(r)
150-
is.Equal(r.DataString(), "Hello World!")
150+
is.Equal(r.Data(), "Hello World!")
151151

152152
sub2, err := s.PubSubSubscribe(topic)
153153
is.Nil(err)
@@ -158,12 +158,12 @@ func TestPubSub(t *testing.T) {
158158
r, err = sub2.Next()
159159
is.Nil(err)
160160
is.NotNil(r)
161-
is.Equal(r.Data, "Hallo Welt!")
161+
is.Equal(r.Data(), "Hallo Welt!")
162162

163163
r, err = sub.Next()
164164
is.NotNil(r)
165165
is.Nil(err)
166-
is.Equal(r.Data, "Hallo Welt!")
166+
is.Equal(r.Data(), "Hallo Welt!")
167167

168168
is.Nil(sub.Cancel())
169169
}

0 commit comments

Comments
 (0)