Skip to content

Commit bcae702

Browse files
authored
Add mqtt hook for WebPubSub.AspNetCore project (Azure#45810)
1 parent 6383acc commit bcae702

25 files changed

+788
-72
lines changed

sdk/webpubsub/Microsoft.Azure.WebJobs.Extensions.WebPubSub/samples/WebPubSubTriggerFunction.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,20 @@ public static void Run(
5555
{
5656
}
5757
#endregion
58+
59+
[FunctionName("connect")]
60+
public static WebPubSubEventResponse Run(
61+
[WebPubSubTrigger("hub", WebPubSubEventType.System, "connect")] ConnectEventRequest request,
62+
ILogger log)
63+
{
64+
if (request.ConnectionContext.ConnectionId != "attacker")
65+
{
66+
return request.CreateResponse(request.ConnectionContext.UserId, new string[] { "group1", "group2" }, "websocket-subprotocol", new string[] { "webpubsub.joinLeaveGroup.group1", "webpubsub.sendToGroup.group2" });
67+
}
68+
else
69+
{
70+
return request.CreateErrorResponse(WebPubSubErrorCode.Unauthorized, "Unauthorized connection");
71+
}
72+
}
5873
}
5974
}

sdk/webpubsub/Microsoft.Azure.WebJobs.Extensions.WebPubSub/tests/JObjectTests.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -228,17 +228,12 @@ public void ParseMessageResponse_InvalidEnumReturnServerError()
228228
Assert.AreEqual(HttpStatusCode.InternalServerError, result.StatusCode);
229229
}
230230

231-
[TestCase(@"""Success""")]
232-
[TestCase("0")]
233-
public async Task ParseConnectResponse_ContentMatches(string code)
231+
[TestCase]
232+
public async Task ParseConnectResponse_ContentMatches()
234233
{
235-
var test = @"{""code"":{0},""test"":""test"",""errorMessage"":""not valid user.""}";
236-
test = test.Replace("{0}", code);
234+
var test = @"{""test"":""test"",""errorMessage"":""not valid user.""}";
237235
var expected = JObject.FromObject(JsonConvert.DeserializeObject<ConnectEventResponse>(test));
238236

239-
// Safe to be serialize to enum 0 which is not read by customer.
240-
Assert.AreEqual(0, expected["code"].Value<int>());
241-
242237
var result = BuildResponse(test, RequestType.Connect);
243238
var content = await result.Content.ReadAsStringAsync();
244239
var actual = JObject.Parse(content);

sdk/webpubsub/Microsoft.Azure.WebJobs.Extensions.WebPubSub/tests/WebPubSubTriggerDispatcherTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public async Task TestProcessRequest_MessageMediaTypes(string mediaType, HttpSta
130130

131131
private static IEnumerable<object[]> TestHandleMqttConnectRequest_InProcessModelTestData = new object[][]
132132
{
133-
new object[]{ new MqttConnectEventResponse("userId",new string[] {"group1", "group2"}, new string[] {"webpubsub.joinLeaveGroup"}) { Mqtt = new() { UserProperties = new MqttUserProperty[] { new("a", "b") } } },5, HttpStatusCode.OK, "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"code\":0,\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}"},
133+
new object[]{ new MqttConnectEventResponse("userId",new string[] {"group1", "group2"}, new string[] {"webpubsub.joinLeaveGroup"}) { Mqtt = new() { UserProperties = new MqttUserProperty[] { new("a", "b") } } },5, HttpStatusCode.OK, "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}"},
134134
new object[]{new MqttConnectEventErrorResponse(MqttV311ConnectReturnCode.NotAuthorized, "not authorized"),4, HttpStatusCode.Unauthorized, "{\"mqtt\":{\"code\":5,\"reason\":\"not authorized\",\"userProperties\":null}}", },
135135
new object[]{ CreateMqttConnectErrorResponse(MqttV500ConnectReasonCode.NotAuthorized, "not authorized", new MqttUserProperty[] {new MqttUserProperty("a", "b")}),5, HttpStatusCode.Unauthorized, "{\"mqtt\":{\"code\":135,\"reason\":\"not authorized\",\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]}}", }
136136
};
@@ -178,7 +178,7 @@ public async Task TestHandleMqttConnectRequest_InProcessModel(WebPubSubEventResp
178178

179179
private static readonly IEnumerable<object[]> TestHandleMqttConnectRequest_IsolatedProcessModelTestData = new object[][]
180180
{
181-
new object[]{ "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}", 5, HttpStatusCode.OK, "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"code\":0,\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}"},
181+
new object[]{ "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}", 5, HttpStatusCode.OK, "{\"mqtt\":{\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"userId\":\"userId\",\"groups\":[\"group1\",\"group2\"],\"subprotocol\":\"mqtt\",\"roles\":[\"webpubsub.joinLeaveGroup\"]}"},
182182
new object[]{ "{\"mqtt\":{\"code\":5,\"reason\":\"not authorized\",\"userProperties\":null}}", 4, HttpStatusCode.Unauthorized, "{\"mqtt\":{\"code\":5,\"reason\":\"not authorized\",\"userProperties\":null}}", },
183183
new object[]{ "{\"mqtt\":{\"code\":135,\"reason\":\"reason\",\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]},\"errorMessage\":\"reason\"}", 5, HttpStatusCode.Unauthorized, "{\"mqtt\":{\"code\":135,\"reason\":\"reason\",\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]}}", }
184184
};
@@ -263,7 +263,7 @@ public async Task TestHandleMqttConnectedEvent()
263263
[TestCase]
264264
public async Task TestHandleMqttDisonnectedEvent()
265265
{
266-
var body = " {\"mqtt\":{\"initiatedByClient\":false,\"disconnectPacket\":{\"code\":128,\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]}},\"reason\":\"reason\",\"connectionContext\":null}";
266+
var body = " {\"mqtt\":{\"initiatedByClient\":false,\"disconnectPacket\":{\"code\":128,\"userProperties\":[{\"name\":\"a\",\"value\":\"b\"}]}},\"reason\":\"reason\"}";
267267
var disconnectedRequest = TestHelpers.CreateHttpRequestMessage(TestHub, WebPubSubEventType.System, "disconnected", "clientId", ValidSignature, origin: new string[] { TestOrigin }, subProtocols: new string[] { "mqtt" }, clientProtocol: WebPubSubClientProtocol.Mqtt, payload: Encoding.UTF8.GetBytes(body));
268268
disconnectedRequest.Headers.Add(Constants.Headers.CloudEvents.MqttPhysicalConnectionId, "physicalConnectionId");
269269
disconnectedRequest.Headers.Add(Constants.Headers.CloudEvents.MqttSessionId, "sessionId");

sdk/webpubsub/Microsoft.Azure.WebPubSub.AspNetCore/CHANGELOG.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
# Release History
22

3-
## 1.4.0-beta.1 (Unreleased)
3+
## 1.4.0 (2024-09-25)
44

55
### Features Added
6-
7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
6+
- Support MQTT client event handling.
127

138
## 1.3.0 (2024-04-10)
149

sdk/webpubsub/Microsoft.Azure.WebPubSub.AspNetCore/README.md

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,17 @@ For information about general Web PubSub concepts [Concepts in Azure Web PubSub]
7777

7878
### `WebPubSubHub`
7979

80-
`WebPubSubHub` is an abstract class to let users implement the subscribed Web PubSub service events. After user register the [event handler](https://docs.microsoft.com/azure/azure-web-pubsub/howto-develop-eventhandler) in service side, these events will be forwarded from service to server. And `WebPubSubHub` provides 4 methods mapping to the service events to enable users deal with these events, for example, client management, validations or working with `Azure.Messaging.WebPubSub` to broadcast the messages. See samples below for details.
80+
`WebPubSubHub` is an abstract class to let users implement the subscribed Web PubSub service events. After user register the [event handler](https://docs.microsoft.com/azure/azure-web-pubsub/howto-develop-eventhandler) in service side, these events will be forwarded from service to server. And `WebPubSubHub` provides methods mapping to the service events to enable users deal with these events, for example, client management, validations or working with `Azure.Messaging.WebPubSub` to broadcast the messages. See samples below for details.
8181

8282
> NOTE
8383
>
84-
> Among the 4 methods, `OnConnectAsync()` and `OnMessageReceivedAsync()` are blocking events that service will respect server returns. Besides the mapped correct response, server can throw exceptions whenever the request is against the server side logic. And `UnauthorizedAccessException` and `AuthenticationException` will be converted to `401Unauthorized` and rest will be converted to `500InternalServerError` along with exception message to return service. Then service will drop current client connection.
84+
> Among the those methods, `OnConnectAsync()` and `OnMessageReceivedAsync()` are blocking events that service will respect server returns. Besides the mapped correct response, server can throw exceptions whenever the request is against the server side logic. And `UnauthorizedAccessException` and `AuthenticationException` will be converted to `401Unauthorized` and rest will be converted to `500InternalServerError` along with exception message to return service. Then service will drop current client connection.
8585
8686
## Examples
8787

8888
### Handle upstream `Connect` event
8989

90-
```C# Snippet:WebPubSubHubMethods
90+
```C# Snippet:HandleConnectEvent
9191
private sealed class SampleHub : WebPubSubHub
9292
{
9393
internal WebPubSubServiceClient<SampleHub> _serviceClient;
@@ -109,6 +109,106 @@ private sealed class SampleHub : WebPubSubHub
109109
}
110110
```
111111

112+
### Handle upstream MQTT `Connect` event
113+
```C# Snippet:HandleMqttConnectEvent
114+
private sealed class SampleHub2 : WebPubSubHub
115+
{
116+
internal WebPubSubServiceClient<SampleHub> _serviceClient;
117+
118+
// Need to ensure service client is injected by call `AddServiceHub<SampleHub2>` in ConfigureServices.
119+
public SampleHub2(WebPubSubServiceClient<SampleHub> serviceClient)
120+
{
121+
_serviceClient = serviceClient;
122+
}
123+
124+
public override ValueTask<ConnectEventResponse> OnConnectAsync(ConnectEventRequest request, CancellationToken cancellationToken)
125+
{
126+
// By converting the request to MqttConnectEventRequest, you can get the MQTT specific information.
127+
if (request is MqttConnectEventRequest mqttRequest)
128+
{
129+
if (mqttRequest.Mqtt.Username != "baduser")
130+
{
131+
var response = mqttRequest.CreateMqttResponse(mqttRequest.ConnectionContext.UserId, null, null);
132+
// You can customize the user properties that will be sent to the client in the MQTT CONNACK packet.
133+
response.Mqtt.UserProperties = new List<MqttUserProperty>()
134+
{
135+
new("name", "value")
136+
};
137+
return ValueTask.FromResult(response as ConnectEventResponse);
138+
}
139+
else
140+
{
141+
var errorResponse = mqttRequest.Mqtt.ProtocolVersion switch
142+
{
143+
// You can specify the MQTT specific error code and message.
144+
MqttProtocolVersion.V311 => mqttRequest.CreateMqttV311ErrorResponse(MqttV311ConnectReturnCode.NotAuthorized, "not authorized"),
145+
MqttProtocolVersion.V500 => mqttRequest.CreateMqttV50ErrorResponse(MqttV500ConnectReasonCode.Banned, "The user is banned."),
146+
_ => throw new System.NotSupportedException("Unsupported MQTT protocol version")
147+
};
148+
// You can customize the user properties that will be sent to the client in the MQTT CONNACK packet.
149+
errorResponse.Mqtt.UserProperties = new List<MqttUserProperty>()
150+
{
151+
new("name", "value")
152+
};
153+
throw new MqttConnectionException(errorResponse);
154+
}
155+
}
156+
else
157+
{
158+
// If you don't need to handle MQTT specific logic, you can still return a general response for MQTT clients.
159+
return ValueTask.FromResult(request.CreateResponse(request.ConnectionContext.UserId, null, request.Subprotocols.FirstOrDefault(), null));
160+
}
161+
}
162+
}
163+
```
164+
165+
### Handle upstream MQTT `Connected` event
166+
```C# Snippet:HandleMqttConnectedEvent
167+
private sealed class SampleHub3 : WebPubSubHub
168+
{
169+
internal WebPubSubServiceClient<SampleHub> _serviceClient;
170+
171+
// Need to ensure service client is injected by call `AddServiceHub<SampleHub3>` in ConfigureServices.
172+
public SampleHub3(WebPubSubServiceClient<SampleHub> serviceClient)
173+
{
174+
_serviceClient = serviceClient;
175+
}
176+
177+
public override Task OnConnectedAsync(ConnectedEventRequest request)
178+
{
179+
if (request.ConnectionContext is MqttConnectionContext mqttContext)
180+
{
181+
// Have your own logic here
182+
}
183+
return Task.CompletedTask;
184+
}
185+
}
186+
```
187+
188+
189+
### Handle upstream MQTT `Disconnected` event
190+
```C# Snippet:HandleMqttDisconnectedEvent
191+
private sealed class SampleHub4 : WebPubSubHub
192+
{
193+
internal WebPubSubServiceClient<SampleHub> _serviceClient;
194+
195+
// Need to ensure service client is injected by call `AddServiceHub<SampleHub4>` in ConfigureServices.
196+
public SampleHub4(WebPubSubServiceClient<SampleHub> serviceClient)
197+
{
198+
_serviceClient = serviceClient;
199+
}
200+
201+
public override Task OnDisconnectedAsync(DisconnectedEventRequest request)
202+
{
203+
if (request is MqttDisconnectedEventRequest mqttDisconnected)
204+
{
205+
// Have your own logic here
206+
}
207+
return Task.CompletedTask;
208+
}
209+
}
210+
```
211+
112212
## Troubleshooting
113213

114214
### Setting up console logging

sdk/webpubsub/Microsoft.Azure.WebPubSub.AspNetCore/api/Microsoft.Azure.WebPubSub.AspNetCore.netcoreapp3.1.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ public static partial class WebPubSubEndpointRouteBuilderExtensions
77
}
88
namespace Microsoft.Azure.WebPubSub.AspNetCore
99
{
10+
public partial class MqttConnectionException : System.Exception
11+
{
12+
public MqttConnectionException(Microsoft.Azure.WebPubSub.Common.MqttConnectEventErrorResponse mqttErrorResponse) { }
13+
}
1014
public abstract partial class WebPubSubHub
1115
{
1216
protected WebPubSubHub() { }

sdk/webpubsub/Microsoft.Azure.WebPubSub.AspNetCore/src/Extensions/WebPubSubRequestExtensions.cs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,16 @@ internal static async Task<WebPubSubEventRequest> ReadWebPubSubEventAsync(this H
6363
case RequestType.Connect:
6464
{
6565
var content = await new StreamReader(request.Body).ReadToEndAsync().ConfigureAwait(false);
66-
var eventRequest = JsonSerializer.Deserialize<ConnectEventRequest>(content);
67-
return new ConnectEventRequest(context, eventRequest.Claims, eventRequest.Query, eventRequest.Subprotocols, eventRequest.ClientCertificates, eventRequest.Headers);
66+
if (context is MqttConnectionContext mqttContext)
67+
{
68+
var requestBody = JsonSerializer.Deserialize<MqttConnectEventRequestContent>(content);
69+
return new MqttConnectEventRequest(mqttContext, requestBody.Claims, requestBody.Query, requestBody.ClientCertificates, requestBody.Headers, requestBody.Mqtt);
70+
}
71+
else
72+
{
73+
var eventRequest = JsonSerializer.Deserialize<ConnectEventRequest>(content);
74+
return new ConnectEventRequest(context, eventRequest.Claims, eventRequest.Query, eventRequest.Subprotocols, eventRequest.ClientCertificates, eventRequest.Headers);
75+
}
6876
}
6977
case RequestType.User:
7078
{
@@ -85,8 +93,16 @@ internal static async Task<WebPubSubEventRequest> ReadWebPubSubEventAsync(this H
8593
case RequestType.Disconnected:
8694
{
8795
var content = await new StreamReader(request.Body).ReadToEndAsync().ConfigureAwait(false);
88-
var eventRequest = JsonSerializer.Deserialize<DisconnectedEventRequest>(content);
89-
return new DisconnectedEventRequest(context, eventRequest.Reason);
96+
if (context is MqttConnectionContext mqttContext)
97+
{
98+
var requestBody = JsonSerializer.Deserialize<MqttDisconnectedEventRequestContent>(content);
99+
return new MqttDisconnectedEventRequest(mqttContext, requestBody.Reason, requestBody.Mqtt);
100+
}
101+
else
102+
{
103+
var eventRequest = JsonSerializer.Deserialize<DisconnectedEventRequest>(content);
104+
return new DisconnectedEventRequest(context, eventRequest.Reason);
105+
}
90106
}
91107
default:
92108
return null;
@@ -180,6 +196,17 @@ private static bool TryParseCloudEvents(this HttpRequest request, out WebPubSubC
180196
states = request.Headers.GetFirstHeaderValueOrDefault(Constants.Headers.CloudEvents.State).DecodeConnectionStates();
181197
}
182198

199+
if (Constants.MqttWebSocketSubprotocolValue.Equals(request.Headers.GetFirstHeaderValueOrDefault(Constants.Headers.CloudEvents.Subprotocol)))
200+
{
201+
var physicalConnectionId = request.Headers[Constants.Headers.CloudEvents.MqttPhysicalConnectionId];
202+
if (physicalConnectionId.Count != 0)
203+
{
204+
var sessionId = request.Headers.GetFirstHeaderValueOrDefault(Constants.Headers.CloudEvents.MqttSessionId);
205+
connectionContext = new MqttConnectionContext(eventType, eventName, hub, connectionId, physicalConnectionId.First(), sessionId, userId, signature, origin, states, headers);
206+
return true;
207+
}
208+
}
209+
183210
connectionContext = new WebPubSubConnectionContext(eventType, eventName, hub, connectionId, userId, signature, origin, states, headers);
184211
return true;
185212
}

sdk/webpubsub/Microsoft.Azure.WebPubSub.AspNetCore/src/Internal/Constants.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ internal static class Constants
88
public static readonly char[] HeaderSeparator = { ',', ' ' };
99
public const string AllowedAllOrigins = "*";
1010

11+
public const string MqttWebSocketSubprotocolValue = "mqtt";
12+
1113
public static class ContentTypes
1214
{
1315
public const string JsonContentType = "application/json";
@@ -45,6 +47,13 @@ public static class CloudEvents
4547

4648
public const string TypeSystemPrefix = "azure.webpubsub.sys.";
4749
public const string TypeUserPrefix = "azure.webpubsub.user.";
50+
51+
public const string Subprotocol = Prefix + "subprotocol";
52+
53+
#region MQTT
54+
public const string MqttPhysicalConnectionId = Prefix + "physicalConnectionId";
55+
public const string MqttSessionId = Prefix + "sessionId";
56+
#endregion
4857
}
4958

5059
public const string WebHookRequestOrigin = "WebHook-Request-Origin";

0 commit comments

Comments
 (0)