Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,9 @@ func (p *PubSub) notifySubs(msg *Message) {
topic := msg.GetTopic()
subs := p.mySubs[topic]
for f := range subs {
if f.skipSelf && msg.ReceivedFrom == f.selfPid {
continue
}
select {
case f.ch <- msg:
default:
Expand Down Expand Up @@ -1693,6 +1696,17 @@ func WithBufferSize(size int) SubOpt {
}
}

// WithSelfNotification is a Subscribe option to control whether messages published
// by the local node are delivered to this subscription. By default, the subscription
// receives all messages, including those published by the local host.
// Set to false to filter out messages that were published by the local node.
func WithSelfNotification(enabled bool) SubOpt {
return func(sub *Subscription) error {
sub.skipSelf = !enabled
return nil
}
}

type topicReq struct {
resp chan []string
}
Expand Down
6 changes: 6 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pubsub
import (
"context"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
)

// Subscription handles the details of a particular Topic subscription.
Expand All @@ -14,6 +16,10 @@ type Subscription struct {
ctx context.Context
err error
once sync.Once

// selfPid is the peer ID of the local host, used for filtering.
selfPid peer.ID
skipSelf bool
}

// Topic returns the topic string associated with the Subscription
Expand Down
5 changes: 3 additions & 2 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
}

sub := &Subscription{
topic: t.topic,
ctx: t.p.ctx,
topic: t.topic,
ctx: t.p.ctx,
selfPid: t.p.host.ID(),
}

for _, opt := range opts {
Expand Down
89 changes: 89 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,3 +1081,92 @@ func TestWithLocalPublication(t *testing.T) {
t.Fatal("wrong message")
}
}

func TestWithSelfNotification(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const numHosts = 2
topicID := "self-notif-test"
hosts := getDefaultHosts(t, numHosts)

psubs := getPubsubs(ctx, hosts)
connectAll(t, hosts)

topic0, err := psubs[0].Join(topicID)
if err != nil {
t.Fatal(err)
}
topic1, err := psubs[1].Join(topicID)
if err != nil {
t.Fatal(err)
}

// Subscribe on host 0 with self-notification disabled.
subNoSelf, err := topic0.Subscribe(WithSelfNotification(false))
if err != nil {
t.Fatal(err)
}

// Subscribe on host 0 with default behavior (self-notification enabled).
subWithSelf, err := topic0.Subscribe()
if err != nil {
t.Fatal(err)
}

// Subscribe on host 1 to receive messages from host 0.
subRemote, err := topic1.Subscribe()
if err != nil {
t.Fatal(err)
}

// Wait for mesh to form.
time.Sleep(time.Second)

msg := []byte("hello from host 0")
if err := topic0.Publish(ctx, msg); err != nil {
t.Fatal(err)
}

// The default subscription on host 0 should receive the message.
m, err := subWithSelf.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m.Data, msg) {
t.Fatal("expected to receive own message on default subscription")
}

// The remote subscription on host 1 should receive the message.
m, err = subRemote.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m.Data, msg) {
t.Fatal("expected to receive message on remote subscription")
}

// The subscription with self-notification disabled should NOT receive the message.
// Use a short timeout to verify no message arrives.
noMsgCtx, noMsgCancel := context.WithTimeout(ctx, time.Second)
defer noMsgCancel()
_, err = subNoSelf.Next(noMsgCtx)
if err == nil {
t.Fatal("expected no message on self-notification-disabled subscription, but got one")
}

// Publish from host 1 and verify that host 0's filtered subscription
// still receives messages from other peers.
remoteMsg := []byte("hello from host 1")
if err := topic1.Publish(ctx, remoteMsg); err != nil {
t.Fatal(err)
}

m, err = subNoSelf.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m.Data, remoteMsg) {
t.Fatal("expected to receive remote message on filtered subscription")
}
}