Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 2e50b24

Browse files
committed
[#1800] Internalize key prefix handling to ipfs package for PubSub
The consumers of the Publisher/Subscriber should not need to worry about the key prefix of the topics they are interested in. Made an internal concern.
1 parent ced7194 commit 2e50b24

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

core/net.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
122122
go func() {
123123
ctx, cancel := context.WithCancel(context.Background())
124124
defer cancel()
125-
err := n.Pubsub.Publisher.Publish(ctx, ipfs.MessageTopicPrefix+pointer.Cid.String(), ciphertext)
125+
err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext)
126126
if err != nil {
127127
log.Error(err)
128128
}

ipfs/pubsub.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import (
1717
)
1818

1919
const (
20-
MessageTopicPrefix = "/offlinemessage/"
21-
GlobalIPNSTopic = "IPNS"
22-
GlobalBlockTopic = "BLOCK"
23-
GlobalCIDTopic = "CID"
20+
globalIPNSTopic = "IPNS"
21+
globalBlockTopic = "BLOCK"
22+
globalCIDTopic = "CID"
23+
messageTopicPrefix = "/offlinemessage/"
2424
)
2525

2626
type Pubsub struct {
@@ -79,40 +79,42 @@ func NewPubsubSubscriber(ctx context.Context, host p2phost.Host, cr routing.Cont
7979

8080
func (p *PubsubPublisher) Publish(ctx context.Context, topic string, data []byte) error {
8181
p.mx.Lock()
82-
_, ok := p.subs[topic]
82+
id := messageTopicPrefix + topic
83+
_, ok := p.subs[id]
8384

8485
if !ok {
85-
p.subs[topic] = struct{}{}
86+
p.subs[id] = struct{}{}
8687
p.mx.Unlock()
8788

88-
bootstrapPubsub(p.ctx, p.cr, p.host, topic)
89+
bootstrapPubsub(p.ctx, p.cr, p.host, id)
8990
} else {
9091
p.mx.Unlock()
9192
}
9293

93-
log.Debugf("PubsubPublish: publish data for %s", topic)
94-
return p.ps.Publish(topic, data)
94+
log.Debugf("PubsubPublish: publish data for %s", id)
95+
return p.ps.Publish(id, data)
9596
}
9697

9798
func (r *PubsubSubscriber) Subscribe(ctx context.Context, topic string) (chan []byte, error) {
9899
r.mx.Lock()
99100
// see if we already have a pubsub subscription; if not, subscribe
100-
_, ok := r.subs[topic]
101+
id := messageTopicPrefix + topic
102+
_, ok := r.subs[id]
101103
resp := make(chan []byte)
102104
if !ok {
103-
sub, err := r.ps.Subscribe(topic)
105+
sub, err := r.ps.Subscribe(id)
104106
if err != nil {
105107
r.mx.Unlock()
106108
return nil, err
107109
}
108110

109-
log.Debugf("PubsubSubscribe: subscribed to %s", topic)
111+
log.Debugf("PubsubSubscribe: subscribed to %s", id)
110112

111-
r.subs[topic] = sub
113+
r.subs[id] = sub
112114

113115
ctx, cancel := context.WithCancel(r.ctx)
114-
go r.handleSubscription(sub, topic, resp, cancel)
115-
go bootstrapPubsub(ctx, r.cr, r.host, topic)
116+
go r.handleSubscription(sub, id, resp, cancel)
117+
go bootstrapPubsub(ctx, r.cr, r.host, id)
116118
}
117119
r.mx.Unlock()
118120
return resp, nil

0 commit comments

Comments
 (0)