@@ -75,8 +75,8 @@ func NewSubscriber[H header.Header[H]](
75
75
}, nil
76
76
}
77
77
78
- // Start starts the Subscriber, registering a topic validator for the "header-sub"
79
- // topic and joining it .
78
+ // Start starts the Subscriber and joins the instance's topic. SetVerifier must
79
+ // be called separately to ensure a validator is mounted on the topic .
80
80
func (s * Subscriber [H ]) Start (context.Context ) (err error ) {
81
81
log .Infow ("joining topic" , "topic ID" , s .pubsubTopicID )
82
82
s .topic , err = s .pubsub .Join (s .pubsubTopicID , pubsub .WithTopicMessageIdFn (s .msgID ))
@@ -85,14 +85,15 @@ func (s *Subscriber[H]) Start(context.Context) (err error) {
85
85
86
86
// Stop closes the topic and unregisters its validator.
87
87
func (s * Subscriber [H ]) Stop (context.Context ) error {
88
- err := s .pubsub .UnregisterTopicValidator (s .pubsubTopicID )
89
- if err != nil {
90
- log .Warnf ("unregistering validator: %s" , err )
88
+ regErr := s .pubsub .UnregisterTopicValidator (s .pubsubTopicID )
89
+ if regErr != nil {
90
+ // do not return this error as it is non-critical and usually
91
+ // means that a validator was not mounted.
92
+ log .Warnf ("unregistering validator: %s" , regErr )
91
93
}
92
94
93
- err = errors .Join (err , s .topic .Close ())
94
- err = errors .Join (err , s .metrics .Close ())
95
- return err
95
+ err := s .topic .Close ()
96
+ return errors .Join (err , s .metrics .Close ())
96
97
}
97
98
98
99
// SetVerifier set given verification func as Header PubSub topic validator
0 commit comments