diff --git a/dotnet/Azure.Iot.Operations.sln b/dotnet/Azure.Iot.Operations.sln index 08731dcfe..de55b504c 100644 --- a/dotnet/Azure.Iot.Operations.sln +++ b/dotnet/Azure.Iot.Operations.sln @@ -89,6 +89,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Iot.Operations.Connec EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PollingTelemetryConnectorTemplate", "templates\PollingTelemetryConnector\PollingTelemetryConnectorTemplate.csproj", "{698D0245-18C6-37E9-08F6-E8EADCBA0880}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleRpcClient", "samples\SampleRpcClient\SampleRpcClient\SampleRpcClient.csproj", "{8931AAF6-627F-4598-B059-D51102F28F88}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleRpcServer", "samples\SampleRpcServer\SampleRpcServer\SampleRpcServer.csproj", "{9DC14638-1801-40B9-9FFC-2F883B1C7CDB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -227,6 +231,14 @@ Global {698D0245-18C6-37E9-08F6-E8EADCBA0880}.Debug|Any CPU.Build.0 = Debug|Any CPU {698D0245-18C6-37E9-08F6-E8EADCBA0880}.Release|Any CPU.ActiveCfg = Release|Any CPU {698D0245-18C6-37E9-08F6-E8EADCBA0880}.Release|Any CPU.Build.0 = Release|Any CPU + {8931AAF6-627F-4598-B059-D51102F28F88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8931AAF6-627F-4598-B059-D51102F28F88}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8931AAF6-627F-4598-B059-D51102F28F88}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8931AAF6-627F-4598-B059-D51102F28F88}.Release|Any CPU.Build.0 = Release|Any CPU + {9DC14638-1801-40B9-9FFC-2F883B1C7CDB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9DC14638-1801-40B9-9FFC-2F883B1C7CDB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9DC14638-1801-40B9-9FFC-2F883B1C7CDB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9DC14638-1801-40B9-9FFC-2F883B1C7CDB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -270,6 +282,8 @@ Global {4992415C-ABB2-2284-D6CD-8FE01910092F} = {4DDB0B28-1830-4B6F-8A10-FB37874D4D57} {C527ECCE-FBE1-4FBC-9987-F94BACC904F2} = {CE9A7664-594B-43EB-AA48-606B88411FC9} {698D0245-18C6-37E9-08F6-E8EADCBA0880} = {4636D1F8-ACA6-4138-8EC0-1FF71CAF3A3B} + {8931AAF6-627F-4598-B059-D51102F28F88} = {0F2D6563-6660-4FE7-A936-E15CBFBE3BC0} + {9DC14638-1801-40B9-9FFC-2F883B1C7CDB} = {0F2D6563-6660-4FE7-A936-E15CBFBE3BC0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {F2687BCD-45E3-4094-9656-BF183C816BA0} diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/PayloadObject.cs b/dotnet/samples/SampleRpcClient/SampleRpcClient/PayloadObject.cs new file mode 100644 index 000000000..8d5cb3ebf --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/PayloadObject.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json.Serialization; +using System.Threading.Tasks; + +namespace SampleRpcClient +{ + public class PayloadObject + { + [JsonPropertyName("SomeField")] + public string? SomeField { get; set; } + + [JsonPropertyName("OtherField")] + public string? OtherField { get; set; } + } +} diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/Program.cs b/dotnet/samples/SampleRpcClient/SampleRpcClient/Program.cs new file mode 100644 index 000000000..f28524a52 --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/Program.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics; +using Azure.Iot.Operations.Mqtt.Session; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.Connection; +using Microsoft.Extensions.Configuration; +using SampleRpcClient; + +string commandName = "someCommandName"; + +var configuration = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables() + .AddCommandLine(args) + .Build(); + +var mqttDiag = Convert.ToBoolean(configuration["mqttDiag"]); +if (mqttDiag) Trace.Listeners.Add(new ConsoleTraceListener()); +await using MqttSessionClient mqttClient = new(new MqttSessionClientOptions { EnableMqttLogging = mqttDiag }); +ApplicationContext applicationContext = new(); + +await using SampleCommandInvoker rpcInvoker = new(applicationContext, mqttClient, commandName, new Utf8JsonSerializer()); + +await mqttClient.ConnectAsync(MqttConnectionSettings.FromEnvVars()); + +var payload = new PayloadObject() +{ + SomeField = "someValue", + OtherField = "someOtherValue" +}; + +PayloadObject response = (await rpcInvoker.InvokeCommandAsync(payload)).Response; +Console.WriteLine("Received RPC response"); diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleCommandInvoker.cs b/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleCommandInvoker.cs new file mode 100644 index 000000000..6b1d85fed --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleCommandInvoker.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.RPC; + +namespace SampleRpcClient +{ + [CommandTopic("rpc/command-samples/{commandName}")] + public class SampleCommandInvoker : CommandInvoker + { + public SampleCommandInvoker(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, string commandName, IPayloadSerializer serializer) : base(applicationContext, mqttClient, commandName, serializer) + { + TopicTokenMap["commandName"] = commandName; + } + } +} diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleRpcClient.csproj b/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleRpcClient.csproj new file mode 100644 index 000000000..c1cb0f026 --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/SampleRpcClient.csproj @@ -0,0 +1,24 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + + + + + + + PreserveNewest + + + + diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/Utf8JsonSerializer.cs b/dotnet/samples/SampleRpcClient/SampleRpcClient/Utf8JsonSerializer.cs new file mode 100644 index 000000000..00d5ac22a --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/Utf8JsonSerializer.cs @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Buffers; +using System.Text.Json; +using System.Text.Json.Serialization; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.Models; + +namespace SampleRpcClient; + +public class EmptyJson +{ +} + +public class Utf8JsonSerializer : IPayloadSerializer +{ + protected static readonly JsonSerializerOptions _jsonSerializerOptions = new() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + + public const string ContentType = "application/json"; + + public const MqttPayloadFormatIndicator PayloadFormatIndicator = MqttPayloadFormatIndicator.CharacterData; + + public T FromBytes(ReadOnlySequence payload, string? contentType, MqttPayloadFormatIndicator payloadFormatIndicator) + where T : class + { + if (contentType != null && contentType != ContentType) + { + throw new AkriMqttException($"Content type {contentType} is not supported by this implementation; only {ContentType} is accepted.") + { + Kind = AkriMqttErrorKind.HeaderInvalid, + HeaderName = "Content Type", + HeaderValue = contentType, + IsShallow = false, + IsRemote = false, + }; + } + + try + { + if (payload.IsEmpty) + { + if (typeof(T) != typeof(EmptyJson)) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + + return (new EmptyJson() as T)!; + } + + Utf8JsonReader reader = new(payload); + return JsonSerializer.Deserialize(ref reader, _jsonSerializerOptions)!; + } + catch (Exception) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + } + + public SerializedPayloadContext ToBytes(T? payload) + where T : class + { + try + { + if (typeof(T) == typeof(EmptyJson)) + { + return new(ReadOnlySequence.Empty, ContentType, PayloadFormatIndicator); + } + + return new(new(JsonSerializer.SerializeToUtf8Bytes(payload, _jsonSerializerOptions)), ContentType, PayloadFormatIndicator); + } + catch (Exception) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + } +} diff --git a/dotnet/samples/SampleRpcClient/SampleRpcClient/appsettings.json b/dotnet/samples/SampleRpcClient/SampleRpcClient/appsettings.json new file mode 100644 index 000000000..a7f28df92 --- /dev/null +++ b/dotnet/samples/SampleRpcClient/SampleRpcClient/appsettings.json @@ -0,0 +1,3 @@ +{ + "mqttDiag": true +} diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/PayloadObject.cs b/dotnet/samples/SampleRpcServer/SampleRpcServer/PayloadObject.cs new file mode 100644 index 000000000..325b23865 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/PayloadObject.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json.Serialization; +using System.Threading.Tasks; + +namespace SampleRpcServer +{ + public class PayloadObject + { + [JsonPropertyName("SomeField")] + public string? SomeField { get; set; } + + [JsonPropertyName("OtherField")] + public string? OtherField { get; set; } + } +} diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/Program.cs b/dotnet/samples/SampleRpcServer/SampleRpcServer/Program.cs new file mode 100644 index 000000000..bda2f8ff2 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/Program.cs @@ -0,0 +1,49 @@ +using System.Diagnostics; +using Azure.Iot.Operations.Mqtt.Session; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.Connection; +using Microsoft.Extensions.Configuration; +using SampleRpcServer; + +string commandName = "someCommandName"; + +var configuration = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables() + .AddCommandLine(args) + .Build(); + +var mqttDiag = Convert.ToBoolean(configuration["mqttDiag"]); +if (mqttDiag) Trace.Listeners.Add(new ConsoleTraceListener()); +await using MqttSessionClient mqttClient = new(new MqttSessionClientOptions { EnableMqttLogging = mqttDiag }); +ApplicationContext applicationContext = new(); + +await mqttClient.ConnectAsync(MqttConnectionSettings.FromEnvVars()); + +await using SampleCommandExecutor rpcExecutor = new(applicationContext, mqttClient, commandName, new Utf8JsonSerializer()) +{ + OnCommandReceived = (request, cancellationToken) => + { + Console.WriteLine("Received RPC request"); + + var responsePayload = new PayloadObject() + { + SomeField = request.Request.SomeField, + OtherField = request.Request.OtherField, + }; + + // Echo the payload back to the sender + return Task.FromResult(new Azure.Iot.Operations.Protocol.RPC.ExtendedResponse() + { + Response = responsePayload + }); + } +}; + +await rpcExecutor.StartAsync(); + +await Task.Delay(TimeSpan.FromMinutes(1)); + + + + diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleCommandExecutor.cs b/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleCommandExecutor.cs new file mode 100644 index 000000000..75d3d4101 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleCommandExecutor.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.RPC; + +namespace SampleRpcServer +{ + [CommandTopic("rpc/command-samples/{commandName}")] + public class SampleCommandExecutor : CommandExecutor + { + public SampleCommandExecutor(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, string commandName, IPayloadSerializer serializer) : base(applicationContext, mqttClient, commandName, serializer) + { + TopicTokenMap["commandName"] = commandName; + } + } +} diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleRpcServer.csproj b/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleRpcServer.csproj new file mode 100644 index 000000000..4d3f73fd4 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/SampleRpcServer.csproj @@ -0,0 +1,22 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + + + + + + PreserveNewest + + + diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/Utf8JsonSerializer.cs b/dotnet/samples/SampleRpcServer/SampleRpcServer/Utf8JsonSerializer.cs new file mode 100644 index 000000000..3fe576f18 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/Utf8JsonSerializer.cs @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Buffers; +using System.Text.Json; +using System.Text.Json.Serialization; +using Azure.Iot.Operations.Protocol; +using Azure.Iot.Operations.Protocol.Models; + +namespace SampleRpcServer; + +public class EmptyJson +{ +} + +public class Utf8JsonSerializer : IPayloadSerializer +{ + protected static readonly JsonSerializerOptions _jsonSerializerOptions = new() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + + public const string ContentType = "application/json"; + + public const MqttPayloadFormatIndicator PayloadFormatIndicator = MqttPayloadFormatIndicator.CharacterData; + + public T FromBytes(ReadOnlySequence payload, string? contentType, MqttPayloadFormatIndicator payloadFormatIndicator) + where T : class + { + if (contentType != null && contentType != ContentType) + { + throw new AkriMqttException($"Content type {contentType} is not supported by this implementation; only {ContentType} is accepted.") + { + Kind = AkriMqttErrorKind.HeaderInvalid, + HeaderName = "Content Type", + HeaderValue = contentType, + IsShallow = false, + IsRemote = false, + }; + } + + try + { + if (payload.IsEmpty) + { + if (typeof(T) != typeof(EmptyJson)) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + + return (new EmptyJson() as T)!; + } + + Utf8JsonReader reader = new(payload); + return JsonSerializer.Deserialize(ref reader, _jsonSerializerOptions)!; + } + catch (Exception) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + } + + public SerializedPayloadContext ToBytes(T? payload) + where T : class + { + try + { + if (typeof(T) == typeof(EmptyJson)) + { + return new(ReadOnlySequence.Empty, ContentType, PayloadFormatIndicator); + } + + return new(new(JsonSerializer.SerializeToUtf8Bytes(payload, _jsonSerializerOptions)), ContentType, PayloadFormatIndicator); + } + catch (Exception) + { + throw AkriMqttException.GetPayloadInvalidException(); + } + } +} diff --git a/dotnet/samples/SampleRpcServer/SampleRpcServer/appsettings.json b/dotnet/samples/SampleRpcServer/SampleRpcServer/appsettings.json new file mode 100644 index 000000000..a7f28df92 --- /dev/null +++ b/dotnet/samples/SampleRpcServer/SampleRpcServer/appsettings.json @@ -0,0 +1,3 @@ +{ + "mqttDiag": true +}