Skip to content

Commit ad2ff36

Browse files
[Go] Make registered handlers non-blocking (#1008)
1 parent b6d56b3 commit ad2ff36

File tree

9 files changed

+162
-36
lines changed

9 files changed

+162
-36
lines changed

go/mqtt/API.md

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import "github.com/Azure/iot-operations-sdks/go/mqtt"
99
- [Constants](<#constants>)
1010
- [func IsTopicFilterMatch\(topicFilter, topicName string\) bool](<#IsTopicFilterMatch>)
1111
- [func RandomClientID\(\) string](<#RandomClientID>)
12+
- [type AIOBrokerFeatureError](<#AIOBrokerFeatureError>)
13+
- [func \(e \*AIOBrokerFeatureError\) Error\(\) string](<#AIOBrokerFeatureError.Error>)
1214
- [type Ack](<#Ack>)
1315
- [type ClientState](<#ClientState>)
1416
- [type ClientStateError](<#ClientStateError>)
@@ -33,8 +35,8 @@ import "github.com/Azure/iot-operations-sdks/go/mqtt"
3335
- [func \(e \*FatalConnackError\) Error\(\) string](<#FatalConnackError.Error>)
3436
- [type FatalDisconnectError](<#FatalDisconnectError>)
3537
- [func \(e \*FatalDisconnectError\) Error\(\) string](<#FatalDisconnectError.Error>)
36-
- [type InvalidAIOBrokerFeature](<#InvalidAIOBrokerFeature>)
37-
- [func \(e \*InvalidAIOBrokerFeature\) Error\(\) string](<#InvalidAIOBrokerFeature.Error>)
38+
- [type HandlerPanicError](<#HandlerPanicError>)
39+
- [func \(e \*HandlerPanicError\) Error\(\) string](<#HandlerPanicError.Error>)
3840
- [type InvalidArgumentError](<#InvalidArgumentError>)
3941
- [func \(e \*InvalidArgumentError\) Error\(\) string](<#InvalidArgumentError.Error>)
4042
- [func \(e \*InvalidArgumentError\) Unwrap\(\) error](<#InvalidArgumentError.Unwrap>)
@@ -127,6 +129,26 @@ func RandomClientID() string
127129

128130
RandomClientID generates a random valid MQTT client ID. This should never be used in production \(as it fully invalidates all session guarantees\) but can be useful in testing to prevent parallel tests from conflicting.
129131

132+
<a name="AIOBrokerFeatureError"></a>
133+
## type [AIOBrokerFeatureError](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L159-L161>)
134+
135+
AIOBrokerFeatureError indicates that a feature specific to the AIO Broker was used when AIO Broker features were explicitly disabled.
136+
137+
```go
138+
type AIOBrokerFeatureError struct {
139+
// contains filtered or unexported fields
140+
}
141+
```
142+
143+
<a name="AIOBrokerFeatureError.Error"></a>
144+
### func \(\*AIOBrokerFeatureError\) [Error](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L163>)
145+
146+
```go
147+
func (e *AIOBrokerFeatureError) Error() string
148+
```
149+
150+
151+
130152
<a name="Ack"></a>
131153
## type [Ack](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/alias.go#L30>)
132154

@@ -376,22 +398,22 @@ func (e *FatalDisconnectError) Error() string
376398

377399

378400

379-
<a name="InvalidAIOBrokerFeature"></a>
380-
## type [InvalidAIOBrokerFeature](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L159-L161>)
401+
<a name="HandlerPanicError"></a>
402+
## type [HandlerPanicError](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L172-L174>)
381403

382-
InvalidAIOBrokerFeature indicates that a feature specific to the AIO Broker was used when AIO Broker features were explicitly disabled.
404+
HandlerPanicError indicates that a user\-provided handler panicked. This error will never be returned, only logged.
383405

384406
```go
385-
type InvalidAIOBrokerFeature struct {
407+
type HandlerPanicError struct {
386408
// contains filtered or unexported fields
387409
}
388410
```
389411

390-
<a name="InvalidAIOBrokerFeature.Error"></a>
391-
### func \(\*InvalidAIOBrokerFeature\) [Error](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L163>)
412+
<a name="HandlerPanicError.Error"></a>
413+
### func \(\*HandlerPanicError\) [Error](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/errors.go#L176>)
392414

393415
```go
394-
func (e *InvalidAIOBrokerFeature) Error() string
416+
func (e *HandlerPanicError) Error() string
395417
```
396418

397419

@@ -572,7 +594,7 @@ func (c *SessionClient) RegisterConnectEventHandler(handler ConnectEventHandler)
572594
RegisterConnectEventHandler registers a handler to a list of handlers that are called synchronously in registration order whenever the session client successfully establishes an MQTT connection. Note that since the handler gets called synchronously, handlers should not block for an extended period of time to avoid blocking the session client.
573595

574596
<a name="SessionClient.RegisterDisconnectEventHandler"></a>
575-
### func \(\*SessionClient\) [RegisterDisconnectEventHandler](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L31-L33>)
597+
### func \(\*SessionClient\) [RegisterDisconnectEventHandler](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L38-L40>)
576598

577599
```go
578600
func (c *SessionClient) RegisterDisconnectEventHandler(handler DisconnectEventHandler) func()
@@ -581,7 +603,7 @@ func (c *SessionClient) RegisterDisconnectEventHandler(handler DisconnectEventHa
581603
RegisterDisconnectEventHandler registers a handler to a list of handlers that are called synchronously in registration order whenever the session client detects a disconnection from the MQTT server. Note that since the handler gets called synchronously, handlers should not block for an extended period of time to avoid blocking the session client.
582604

583605
<a name="SessionClient.RegisterFatalErrorHandler"></a>
584-
### func \(\*SessionClient\) [RegisterFatalErrorHandler](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L39-L41>)
606+
### func \(\*SessionClient\) [RegisterFatalErrorHandler](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L53-L55>)
585607

586608
```go
587609
func (c *SessionClient) RegisterFatalErrorHandler(handler func(error)) func()
@@ -599,7 +621,7 @@ func (c *SessionClient) RegisterMessageHandler(handler MessageHandler) func()
599621
RegisterMessageHandler registers a message handler on this client. Returns a callback to remove the message handler.
600622

601623
<a name="SessionClient.Start"></a>
602-
### func \(\*SessionClient\) [Start](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L48>)
624+
### func \(\*SessionClient\) [Start](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L69>)
603625

604626
```go
605627
func (c *SessionClient) Start() error
@@ -608,7 +630,7 @@ func (c *SessionClient) Start() error
608630
Start the session client, spawning any necessary background goroutines. In order to terminate the session client and clean up any running goroutines, Stop\(\) must be called after calling Start\(\).
609631

610632
<a name="SessionClient.Stop"></a>
611-
### func \(\*SessionClient\) [Stop](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L73>)
633+
### func \(\*SessionClient\) [Stop](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/connect.go#L94>)
612634

613635
```go
614636
func (c *SessionClient) Stop() error
@@ -617,7 +639,7 @@ func (c *SessionClient) Stop() error
617639
Stop the session client, terminating any pending operations and cleaning up background goroutines.
618640

619641
<a name="SessionClient.Subscribe"></a>
620-
### func \(\*SessionClient\) [Subscribe](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/subscribe.go#L75-L79>)
642+
### func \(\*SessionClient\) [Subscribe](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/subscribe.go#L80-L84>)
621643

622644
```go
623645
func (c *SessionClient) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (*Ack, error)
@@ -626,7 +648,7 @@ func (c *SessionClient) Subscribe(ctx context.Context, topic string, opts ...Sub
626648
Subscribe to the given topic.
627649

628650
<a name="SessionClient.Unsubscribe"></a>
629-
### func \(\*SessionClient\) [Unsubscribe](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/subscribe.go#L118-L122>)
651+
### func \(\*SessionClient\) [Unsubscribe](<https://github.com/Azure/iot-operations-sdks/blob/main/go/mqtt/subscribe.go#L123-L127>)
630652

631653
```go
632654
func (c *SessionClient) Unsubscribe(ctx context.Context, topic string, opts ...UnsubscribeOption) (*Ack, error)

go/mqtt/connect.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,14 @@ import (
2020
func (c *SessionClient) RegisterConnectEventHandler(
2121
handler ConnectEventHandler,
2222
) func() {
23-
return c.connectEventHandlers.AppendEntry(handler)
23+
return c.connectEventHandlers.AppendEntry(func(ce *ConnectEvent) {
24+
defer func() {
25+
if e := recover(); e != nil {
26+
c.log.Error(context.Background(), &HandlerPanicError{e})
27+
}
28+
}()
29+
handler(ce)
30+
})
2431
}
2532

2633
// RegisterDisconnectEventHandler registers a handler to a list of handlers that
@@ -31,15 +38,29 @@ func (c *SessionClient) RegisterConnectEventHandler(
3138
func (c *SessionClient) RegisterDisconnectEventHandler(
3239
handler DisconnectEventHandler,
3340
) func() {
34-
return c.disconnectEventHandlers.AppendEntry(handler)
41+
return c.disconnectEventHandlers.AppendEntry(func(de *DisconnectEvent) {
42+
defer func() {
43+
if e := recover(); e != nil {
44+
c.log.Error(context.Background(), &HandlerPanicError{e})
45+
}
46+
}()
47+
handler(de)
48+
})
3549
}
3650

3751
// RegisterFatalErrorHandler registers a handler that is called in a goroutine
3852
// if the session client terminates due to a fatal error.
3953
func (c *SessionClient) RegisterFatalErrorHandler(
4054
handler func(error),
4155
) func() {
42-
return c.fatalErrorHandlers.AppendEntry(handler)
56+
return c.fatalErrorHandlers.AppendEntry(func(err error) {
57+
defer func() {
58+
if e := recover(); e != nil {
59+
c.log.Error(context.Background(), &HandlerPanicError{e})
60+
}
61+
}()
62+
handler(err)
63+
})
4364
}
4465

4566
// Start the session client, spawning any necessary background goroutines. In
@@ -256,7 +277,7 @@ func (c *SessionClient) signalConnection(
256277
)
257278

258279
for handler := range c.connectEventHandlers.All() {
259-
handler(event)
280+
go handler(event)
260281
}
261282
}
262283

@@ -280,7 +301,7 @@ func (c *SessionClient) signalDisconnection(
280301
}
281302

282303
for handler := range c.disconnectEventHandlers.All() {
283-
handler(event)
304+
go handler(event)
284305
}
285306
}
286307

go/mqtt/errors.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,25 @@ func (*PublishQueueFullError) Error() string {
154154
return "publish queue full"
155155
}
156156

157-
// InvalidAIOBrokerFeature indicates that a feature specific to the AIO Broker
158-
// was used when AIO Broker features were explicitly disabled.
159-
type InvalidAIOBrokerFeature struct {
157+
// AIOBrokerFeatureError indicates that a feature specific to the AIO Broker was
158+
// used when AIO Broker features were explicitly disabled.
159+
type AIOBrokerFeatureError struct {
160160
feature string
161161
}
162162

163-
func (e *InvalidAIOBrokerFeature) Error() string {
163+
func (e *AIOBrokerFeatureError) Error() string {
164164
return fmt.Sprintf(
165165
"%s was used with AIO Broker features disabled",
166166
e.feature,
167167
)
168168
}
169+
170+
// HandlerPanicError indicates that a user-provided handler panicked. This error
171+
// will never be returned, only logged.
172+
type HandlerPanicError struct {
173+
panic any
174+
}
175+
176+
func (e *HandlerPanicError) Error() string {
177+
return fmt.Sprintf("panic in user-provided handler: %v", e.panic)
178+
}

go/mqtt/features.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ func WithPersist() PublishOption {
2222
func (o *SessionClientOptions) checkFeatures(p map[string]string) error {
2323
if o.DisableAIOBrokerFeatures && p != nil {
2424
if _, ok := p[AIOPersistence]; ok {
25-
return &InvalidAIOBrokerFeature{
26-
feature: AIOPersistence,
27-
}
25+
return &AIOBrokerFeatureError{AIOPersistence}
2826
}
2927
}
3028
return nil

go/mqtt/subscribe.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (c *SessionClient) makeOnPublishReceived(
2929
var willAck sync.WaitGroup
3030
for handler := range c.messageHandlers.All() {
3131
willAck.Add(1)
32-
handler(buildMessage(packet, sync.OnceFunc(willAck.Done)))
32+
go handler(buildMessage(packet, sync.OnceFunc(willAck.Done)))
3333
}
3434

3535
if packet.QoS > 0 {
@@ -63,6 +63,11 @@ func (c *SessionClient) makeOnPublishReceived(
6363
func (c *SessionClient) RegisterMessageHandler(handler MessageHandler) func() {
6464
ctx, cancel := context.WithCancel(context.Background())
6565
done := c.messageHandlers.AppendEntry(func(msg *Message) {
66+
defer func() {
67+
if e := recover(); e != nil {
68+
c.log.Error(context.Background(), &HandlerPanicError{e})
69+
}
70+
}()
6671
handler(ctx, msg)
6772
})
6873
return sync.OnceFunc(func() {

go/test/integration/protocol/command_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestCommand(t *testing.T) {
2222
defer listeners.Close()
2323

2424
enc := protocol.JSON[string]{}
25-
topic := "prefix/{ex:token}/suffix"
25+
topic := "command/{ex:token}/suffix"
2626
value := "test"
2727

2828
executor, err := protocol.NewCommandExecutor(
@@ -80,7 +80,7 @@ func TestCommandError(t *testing.T) {
8080

8181
req := protocol.Empty{}
8282
res := protocol.JSON[string]{}
83-
topic := "topic"
83+
topic := "command-error"
8484

8585
executor, err := protocol.NewCommandExecutor(
8686
app, server, req, res, topic,
@@ -119,7 +119,7 @@ func TestCommandManualError(t *testing.T) {
119119

120120
req := protocol.Empty{}
121121
res := protocol.JSON[string]{}
122-
topic := "topic"
122+
topic := "command-manual-error"
123123

124124
executor, err := protocol.NewCommandExecutor(
125125
app, server, req, res, topic,

go/test/integration/protocol/greeter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestSayHello(t *testing.T) {
2626

2727
encReq := protocol.JSON[envoy.HelloRequest]{}
2828
encRes := protocol.JSON[envoy.HelloResponse]{}
29-
topic := "prefix/{ex:token}/suffix"
29+
topic := "say-hello/{ex:token}/suffix"
3030

3131
executor, err := protocol.NewCommandExecutor(
3232
app, server, encReq, encRes, topic,
@@ -88,7 +88,7 @@ func TestSayHelloWithDelay(t *testing.T) {
8888
defer listeners.Close()
8989
encReq := protocol.JSON[envoy.HelloWithDelayRequest]{}
9090
encRes := protocol.JSON[envoy.HelloResponse]{}
91-
topic := "prefix/{ex:token}/suffix"
91+
topic := "say-hello-with-delay/{ex:token}/suffix"
9292
executor, err := protocol.NewCommandExecutor(
9393
app, server, encReq, encRes, topic,
9494
func(
@@ -163,7 +163,7 @@ func TestSayHelloWithDelayZeroThrows(t *testing.T) {
163163

164164
encReq := protocol.JSON[envoy.HelloWithDelayRequest]{}
165165
encRes := protocol.JSON[envoy.HelloResponse]{}
166-
topic := "prefix/{ex:token}/suffix"
166+
topic := "say-hello-with-delay-zero-throws/{ex:token}/suffix"
167167

168168
executor, err := protocol.NewCommandExecutor(
169169
app,

go/test/integration/protocol/telemetry_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestTelemetry(t *testing.T) {
1818
defer done()
1919

2020
enc := protocol.Custom{}
21-
topic := "prefix/{token}/suffix"
21+
topic := "telemetry/{token}/suffix"
2222
value := protocol.Data{
2323
Payload: []byte("value"),
2424
ContentType: "custom/type",
@@ -61,6 +61,6 @@ func TestTelemetry(t *testing.T) {
6161
cloudEvent, err := protocol.CloudEventFromTelemetry(res)
6262
require.NoError(t, err)
6363
require.Equal(t, "https://contoso.com", cloudEvent.Source.String())
64-
require.Equal(t, "prefix/test/suffix", cloudEvent.Subject)
64+
require.Equal(t, "telemetry/test/suffix", cloudEvent.Subject)
6565
require.Equal(t, value.ContentType, cloudEvent.DataContentType)
6666
}

0 commit comments

Comments
 (0)