Skip to content

Commit d92c56d

Browse files
committed
Support multiple handlers for single topic
1 parent 2d19961 commit d92c56d

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

rolling-shutter/p2p/messaging.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,34 @@ type (
2525
HandlerFunc func(context.Context, p2pmsg.Message) ([]p2pmsg.Message, error)
2626
HandlerRegistry map[protoreflect.FullName][]HandlerFunc
2727
ValidatorFunc func(context.Context, p2pmsg.Message) (pubsub.ValidationResult, error)
28-
ValidatorRegistry map[string]pubsub.ValidatorEx
28+
ValidatorRegistry map[string][]pubsub.ValidatorEx
2929
)
3030

31+
func (r *ValidatorRegistry) GetCombinedValidator(topic string) pubsub.ValidatorEx {
32+
validate := func(ctx context.Context, sender peer.ID, message *pubsub.Message) pubsub.ValidationResult {
33+
ignored := false
34+
for _, valFunc := range (*r)[topic] {
35+
res := valFunc(ctx, sender, message)
36+
switch res {
37+
case pubsub.ValidationAccept:
38+
continue
39+
case pubsub.ValidationReject:
40+
return pubsub.ValidationReject
41+
case pubsub.ValidationIgnore:
42+
ignored = true
43+
default:
44+
log.Warn().Str("topic", topic).Msg("unknown validation result %d, treating as reject")
45+
return pubsub.ValidationReject
46+
}
47+
}
48+
if ignored {
49+
return pubsub.ValidationIgnore
50+
}
51+
return pubsub.ValidationAccept
52+
}
53+
return validate
54+
}
55+
3156
const (
3257
allowTraceContext = true // whether we allow the trace field to be set in the message envelope
3358
invalidResultType = pubsub.ValidationReject
@@ -134,16 +159,6 @@ func (m *P2PMessaging) AddHandlerFunc(handlerFunc HandlerFunc, protos ...p2pmsg.
134159

135160
func (m *P2PMessaging) addValidatorImpl(valFunc ValidatorFunc, messProto p2pmsg.Message) {
136161
topic := messProto.Topic()
137-
_, exists := m.validatorRegistry[topic]
138-
if exists {
139-
// This is likely not intended and happens when different messages return the same P2PMessage.Topic().
140-
// Currently a topic is mapped 1 to 1 to a message type (instead of using an envelope for unmarshalling)
141-
// (If feature needed, allow for chaining of successively registered validator functions per topic)
142-
panic(errors.Errorf(
143-
"can't register more than one validator per topic (topic: '%s', message-type: '%s')",
144-
topic,
145-
reflect.TypeOf(messProto)))
146-
}
147162
handleError := func(err error) {
148163
log.Info().Str("topic", topic).Err(err).Msg("received invalid message)")
149164
}
@@ -176,8 +191,12 @@ func (m *P2PMessaging) addValidatorImpl(valFunc ValidatorFunc, messProto p2pmsg.
176191
}
177192
return valid
178193
}
179-
m.validatorRegistry[topic] = validate
180-
m.AddGossipTopic(topic)
194+
195+
_, exists := m.validatorRegistry[topic]
196+
if !exists {
197+
m.AddGossipTopic(topic)
198+
}
199+
m.validatorRegistry[topic] = append(m.validatorRegistry[topic], validate)
181200
}
182201

183202
// AddValidator will add a validator-function to a P2PHandler instance:

rolling-shutter/p2p/p2p.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ func (p *P2PNode) Run(
9696
return err
9797
}
9898

99-
for topicName, validator := range topicValidators {
99+
for topicName := range topicValidators {
100+
validator := topicValidators.GetCombinedValidator(topicName)
100101
if err := p.pubSub.RegisterTopicValidator(topicName, validator); err != nil {
101102
return err
102103
}

rolling-shutter/p2p/p2p_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestStartNetworkNodeIntegration(t *testing.T) {
9292
assert.NilError(t, err)
9393
p2ps = append(p2ps, p2pHandler.P2P)
9494
fn := func(ctx context.Context, runner service.Runner) error {
95-
return p2pHandler.P2P.Run(ctx, runner, gossipTopicNames, map[string]pubsub.ValidatorEx{})
95+
return p2pHandler.P2P.Run(ctx, runner, gossipTopicNames, map[string][]pubsub.ValidatorEx{})
9696
}
9797
services[i] = service.Function{Func: fn}
9898
}

0 commit comments

Comments
 (0)