Skip to content

Commit b979cda

Browse files
authored
Merge pull request #1 from rido-min/worker
Use Worker
2 parents 0a18594 + 0fa4cda commit b979cda

File tree

3 files changed

+68
-99
lines changed

3 files changed

+68
-99
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using Microsoft.Azure.Devices.Client;
2+
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
3+
using System.Text;
4+
5+
namespace SampleModule;
6+
7+
internal class ModuleBackgroundService : BackgroundService
8+
{
9+
private int _counter;
10+
private ModuleClient? _moduleClient;
11+
private CancellationToken _cancellationToken;
12+
private readonly ILogger<ModuleBackgroundService> _logger;
13+
14+
public ModuleBackgroundService(ILogger<ModuleBackgroundService> logger) => _logger = logger;
15+
16+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
17+
{
18+
_cancellationToken = cancellationToken;
19+
MqttTransportSettings mqttSetting = new(TransportType.Mqtt_Tcp_Only);
20+
ITransportSettings[] settings = { mqttSetting };
21+
22+
// Open a connection to the Edge runtime
23+
_moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
24+
25+
// Reconnect is not implented because we'll let docker restart the process when the connection is lost
26+
_moduleClient.SetConnectionStatusChangesHandler((status, reason) =>
27+
_logger.LogWarning("Connection changed: Status: {status} Reason: {reason}", status, reason));
28+
29+
await _moduleClient.OpenAsync(cancellationToken);
30+
31+
_logger.LogInformation("IoT Hub module client initialized.");
32+
33+
// Register callback to be called when a message is received by the module
34+
await _moduleClient.SetInputMessageHandlerAsync("input1", ProcessMessageAsync, null, cancellationToken);
35+
}
36+
37+
async Task<MessageResponse> ProcessMessageAsync(Message message, object userContext)
38+
{
39+
int counterValue = Interlocked.Increment(ref _counter);
40+
41+
byte[] messageBytes = message.GetBytes();
42+
string messageString = Encoding.UTF8.GetString(messageBytes);
43+
_logger.LogInformation("Received message: {counterValue}, Body: [{messageString}]", counterValue, messageString);
44+
45+
if (!string.IsNullOrEmpty(messageString))
46+
{
47+
using var pipeMessage = new Message(messageBytes);
48+
foreach (var prop in message.Properties)
49+
{
50+
pipeMessage.Properties.Add(prop.Key, prop.Value);
51+
}
52+
await _moduleClient!.SendEventAsync("output1", pipeMessage, _cancellationToken);
53+
54+
_logger.LogInformation("Received message sent");
55+
}
56+
return MessageResponse.Completed;
57+
}
58+
}
Lines changed: 5 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,7 @@
1-
namespace SampleModule
2-
{
3-
using Microsoft.Azure.Devices.Client;
4-
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
5-
using System;
6-
using System.Runtime.Loader;
7-
using System.Text;
8-
using System.Threading;
9-
using System.Threading.Tasks;
1+
using SampleModule;
102

11-
class Program
12-
{
13-
static int counter;
3+
IHost host = Host.CreateDefaultBuilder(args)
4+
.ConfigureServices(services =>services.AddHostedService<ModuleBackgroundService>())
5+
.Build();
146

15-
static void Main()
16-
{
17-
Init().Wait();
18-
19-
// Wait until the app unloads or is cancelled
20-
var cts = new CancellationTokenSource();
21-
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
22-
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
23-
WhenCancelled(cts.Token).Wait();
24-
}
25-
26-
/// <summary>
27-
/// Handles cleanup operations when app is cancelled or unloads
28-
/// </summary>
29-
public static Task WhenCancelled(CancellationToken cancellationToken)
30-
{
31-
var tcs = new TaskCompletionSource<bool>();
32-
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
33-
return tcs.Task;
34-
}
35-
36-
/// <summary>
37-
/// Initializes the ModuleClient and sets up the callback to receive
38-
/// messages containing temperature information
39-
/// </summary>
40-
static async Task Init()
41-
{
42-
MqttTransportSettings mqttSetting = new (TransportType.Mqtt_Tcp_Only);
43-
ITransportSettings[] settings = { mqttSetting };
44-
45-
// Open a connection to the Edge runtime
46-
ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
47-
await ioTHubModuleClient.OpenAsync();
48-
Console.WriteLine("IoT Hub module client initialized.");
49-
50-
// Register callback to be called when a message is received by the module
51-
await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, ioTHubModuleClient);
52-
}
53-
54-
/// <summary>
55-
/// This method is called whenever the module is sent a message from the EdgeHub.
56-
/// It just pipe the messages without any change.
57-
/// It prints all the incoming messages.
58-
/// </summary>
59-
static async Task<MessageResponse> PipeMessage(Message message, object userContext)
60-
{
61-
int counterValue = Interlocked.Increment(ref counter);
62-
63-
if (userContext is not ModuleClient moduleClient)
64-
{
65-
throw new InvalidOperationException("UserContext doesn't contain " + "expected values");
66-
}
67-
68-
byte[] messageBytes = message.GetBytes();
69-
string messageString = Encoding.UTF8.GetString(messageBytes);
70-
Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");
71-
72-
if (!string.IsNullOrEmpty(messageString))
73-
{
74-
using var pipeMessage = new Message(messageBytes);
75-
foreach (var prop in message.Properties)
76-
{
77-
pipeMessage.Properties.Add(prop.Key, prop.Value);
78-
}
79-
await moduleClient.SendEventAsync("output1", pipeMessage);
80-
81-
Console.WriteLine("Received message sent");
82-
}
83-
return MessageResponse.Completed;
84-
}
85-
}
86-
}
7+
host.Run();
Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,16 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk.Worker">
22
<PropertyGroup>
3-
<OutputType>Exe</OutputType>
43
<TargetFramework>net6.0</TargetFramework>
5-
</PropertyGroup>
6-
7-
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|net6.0|AnyCPU'">
8-
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
9-
<TreatSpecificWarningsAsErrors />
4+
<Nullable>enable</Nullable>
5+
<ImplicitUsings>enable</ImplicitUsings>
106
</PropertyGroup>
117

128
<ItemGroup>
139
<ProjectCapability Include="AzureIoTEdgeModule" />
1410
</ItemGroup>
1511

1612
<ItemGroup>
17-
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.*" />
18-
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.*" />
19-
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.*" />
20-
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.*" />
21-
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.*" />
22-
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="6.*" />
23-
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.*" />
24-
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
13+
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.41.3" />
14+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
2515
</ItemGroup>
2616
</Project>

0 commit comments

Comments
 (0)