Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 4cadfed

Browse files
Merge pull request #41 from keks/feat/pubsub
Proper Pubsub support
2 parents 0ee8672 + 50bf087 commit 4cadfed

File tree

4 files changed

+156
-1
lines changed

4 files changed

+156
-1
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ services:
1616
before_install:
1717
- docker pull ipfs/go-ipfs:master
1818
- mkdir /tmp/ipfs && chmod 0777 /tmp/ipfs
19-
- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master
19+
- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master --enable-pubsub-experiment
2020

2121
install:
2222
- go get -t -v ./...

pubsub.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package shell
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/json"
6+
7+
"github.com/libp2p/go-floodsub"
8+
"github.com/libp2p/go-libp2p-peer"
9+
)
10+
11+
// PubSubRecord is a record received via PubSub.
12+
type PubSubRecord interface {
13+
// From returns the peer ID of the node that published this record
14+
From() peer.ID
15+
16+
// Data returns the data field
17+
Data() []byte
18+
19+
// SeqNo is the sequence number of this record
20+
SeqNo() int64
21+
22+
//TopicIDs is the list of topics this record belongs to
23+
TopicIDs() []string
24+
}
25+
26+
type floodsubRecord struct {
27+
msg *floodsub.Message
28+
}
29+
30+
func (r floodsubRecord) From() peer.ID {
31+
return r.msg.GetFrom()
32+
}
33+
34+
func (r floodsubRecord) Data() []byte {
35+
return r.msg.GetData()
36+
}
37+
38+
func (r floodsubRecord) SeqNo() int64 {
39+
return int64(binary.BigEndian.Uint64(r.msg.GetSeqno()))
40+
}
41+
42+
func (r floodsubRecord) TopicIDs() []string {
43+
return r.msg.GetTopicIDs()
44+
}
45+
46+
///
47+
48+
// PubSubSubscription allow you to receive pubsub records that where published on the network.
49+
type PubSubSubscription struct {
50+
resp *Response
51+
}
52+
53+
func newPubSubSubscription(resp *Response) *PubSubSubscription {
54+
sub := &PubSubSubscription{
55+
resp: resp,
56+
}
57+
58+
sub.Next() // skip empty element used for flushing
59+
return sub
60+
}
61+
62+
// Next waits for the next record and returns that.
63+
func (s *PubSubSubscription) Next() (PubSubRecord, error) {
64+
if s.resp.Error != nil {
65+
return nil, s.resp.Error
66+
}
67+
68+
d := json.NewDecoder(s.resp.Output)
69+
70+
r := &floodsub.Message{}
71+
err := d.Decode(r)
72+
73+
return floodsubRecord{msg: r}, err
74+
}
75+
76+
// Cancel cancels the given subscription.
77+
func (s *PubSubSubscription) Cancel() error {
78+
if s.resp.Output == nil {
79+
return nil
80+
}
81+
82+
return s.resp.Output.Close()
83+
}

shell.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,27 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) {
689689
return out.Hash, nil
690690
}
691691

692+
func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
693+
// connect
694+
req := s.newRequest("pubsub/sub", topic)
695+
696+
resp, err := req.Send(s.httpcli)
697+
if err != nil {
698+
return nil, err
699+
}
700+
701+
return newPubSubSubscription(resp), nil
702+
}
703+
704+
func (s *Shell) PubSubPublish(topic, data string) error {
705+
_, err := s.newRequest("pubsub/pub", topic, data).Send(s.httpcli)
706+
if err != nil {
707+
return err
708+
}
709+
710+
return nil
711+
}
712+
692713
func (s *Shell) DiagNet(format string) ([]byte, error) {
693714
var result = new(bytes.Buffer)
694715

shell_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"testing"
9+
"time"
910

1011
"github.com/cheekybits/is"
1112
)
@@ -116,3 +117,53 @@ func TestResolvePath(t *testing.T) {
116117
is.Nil(err)
117118
is.Equal(childHash, "QmZTR5bcpQD7cFgTorqxZDYaew1Wqgfbd2ud9QqGPAkK2V")
118119
}
120+
121+
func TestPubSub(t *testing.T) {
122+
is := is.New(t)
123+
s := NewShell(shellUrl)
124+
125+
var (
126+
topic = "test"
127+
128+
sub *PubSubSubscription
129+
err error
130+
)
131+
132+
t.Log("subscribing...")
133+
sub, err = s.PubSubSubscribe(topic)
134+
is.Nil(err)
135+
is.NotNil(sub)
136+
t.Log("sub: done")
137+
138+
time.Sleep(10 * time.Millisecond)
139+
140+
t.Log("publishing...")
141+
is.Nil(s.PubSubPublish(topic, "Hello World!"))
142+
t.Log("pub: done")
143+
144+
t.Log("next()...")
145+
r, err := sub.Next()
146+
t.Log("next: done. ")
147+
148+
is.Nil(err)
149+
is.NotNil(r)
150+
is.Equal(r.Data(), "Hello World!")
151+
152+
sub2, err := s.PubSubSubscribe(topic)
153+
is.Nil(err)
154+
is.NotNil(sub2)
155+
156+
is.Nil(s.PubSubPublish(topic, "Hallo Welt!"))
157+
158+
r, err = sub2.Next()
159+
is.Nil(err)
160+
is.NotNil(r)
161+
is.Equal(r.Data(), "Hallo Welt!")
162+
163+
r, err = sub.Next()
164+
is.NotNil(r)
165+
is.Nil(err)
166+
is.Equal(r.Data(), "Hallo Welt!")
167+
168+
is.Nil(sub.Cancel())
169+
}

0 commit comments

Comments
 (0)