Skip to content

Commit 8705963

Browse files
Hub: implement direct comms with in-process instance back to hub
1 parent ac2617b commit 8705963

File tree

5 files changed

+197
-170
lines changed

5 files changed

+197
-170
lines changed

src/Certify.Core/Management/CertifyManager/CertifyManager.ManagementHub.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private async Task StartManagementHubConnection(string hubUri)
9191

9292
if (_managementServerClient != null)
9393
{
94-
_managementServerClient.OnGetCommandResult -= PerformDirectHubCommandWithResult;
94+
_managementServerClient.OnGetCommandResult -= PerformHubCommandWithResult;
9595
_managementServerClient.OnConnectionReconnecting -= _managementServerClient_OnConnectionReconnecting;
9696
}
9797

@@ -101,7 +101,7 @@ private async Task StartManagementHubConnection(string hubUri)
101101
{
102102
await _managementServerClient.ConnectAsync();
103103

104-
_managementServerClient.OnGetCommandResult += PerformDirectHubCommandWithResult;
104+
_managementServerClient.OnGetCommandResult += PerformHubCommandWithResult;
105105
_managementServerClient.OnConnectionReconnecting += _managementServerClient_OnConnectionReconnecting;
106106
}
107107
catch (Exception ex)
@@ -112,7 +112,7 @@ private async Task StartManagementHubConnection(string hubUri)
112112
}
113113
}
114114

115-
public async Task<InstanceCommandResult> PerformDirectHubCommandWithResult(InstanceCommandRequest arg)
115+
public async Task<InstanceCommandResult> PerformHubCommandWithResult(InstanceCommandRequest arg)
116116
{
117117
object val = null;
118118

src/Certify.Core/Management/CertifyManager/ICertifyManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Threading.Tasks;
@@ -87,7 +87,7 @@ public interface ICertifyManager
8787
Task<ActionResult> CleanupManagedChallengeRequest(ManagedChallengeRequest request);
8888

8989
Task<ActionStep> UpdateManagementHub(string url, string joiningKey);
90-
Task<InstanceCommandResult> PerformDirectHubCommandWithResult(InstanceCommandRequest arg);
90+
Task<InstanceCommandResult> PerformHubCommandWithResult(InstanceCommandRequest arg);
9191

9292
void SetDirectManagementClient(IManagementServerClient client);
9393
ManagedInstanceInfo GetManagedInstanceInfo();

src/Certify.Server/Certify.Server.Hub.Api/Services/ManagementAPI.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,16 @@ private async Task SendCommandWithNoResult(string instanceId, InstanceCommandReq
6565

6666
_mgmtStateProvider.AddAwaitedCommandRequest(cmd);
6767

68-
await _mgmtHubContext.Clients.Client(connectionId).SendCommandRequest(cmd);
68+
if (_certifyManager != null && instanceId == _mgmtStateProvider.GetManagementHubInstanceId())
69+
{
70+
// get command result directly from in-process instance
71+
await _certifyManager.PerformHubCommandWithResult(cmd);
72+
}
73+
else
74+
{
75+
76+
await _mgmtHubContext.Clients.Client(connectionId).SendCommandRequest(cmd);
77+
}
6978
}
7079

7180
private async Task<T?> PerformInstanceCommandTaskWithResult<T>(string instanceId, KeyValuePair<string, string>[] args, string commandType)
@@ -76,7 +85,7 @@ private async Task SendCommandWithNoResult(string instanceId, InstanceCommandReq
7685
if (_certifyManager != null && instanceId == _mgmtStateProvider.GetManagementHubInstanceId())
7786
{
7887
// get command result directly from in-process instance
79-
result = await _certifyManager.PerformDirectHubCommandWithResult(cmd);
88+
result = await _certifyManager.PerformHubCommandWithResult(cmd);
8089
}
8190
else
8291
{
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
using Certify.Management;
2+
using Certify.Models.Hub;
3+
using Certify.Models.Reporting;
4+
using Certify.Server.Hub.Api.SignalR.ManagementHub;
5+
6+
namespace Certify.Server.HubService.Services
7+
{
8+
public class DirectInstanceManagementHub : IInstanceManagementHub
9+
{
10+
private IInstanceManagementStateProvider _stateProvider;
11+
private ILogger<DirectInstanceManagementHub> _logger;
12+
private ICertifyManager _certifyManager;
13+
public DirectInstanceManagementHub(ILogger<DirectInstanceManagementHub> logger, IInstanceManagementStateProvider stateProvider, ICertifyManager certifyManager)
14+
{
15+
_stateProvider = stateProvider;
16+
_logger = logger;
17+
_certifyManager = certifyManager;
18+
}
19+
20+
/// <summary>
21+
/// Receive results from a previously issued command
22+
/// </summary>
23+
/// <param name="result"></param>
24+
/// <returns></returns>
25+
public Task ReceiveCommandResult(InstanceCommandResult result)
26+
{
27+
28+
result.Received = DateTimeOffset.Now;
29+
30+
// check we are awaiting this result
31+
var cmd = _stateProvider.GetAwaitedCommandRequest(result.CommandId);
32+
33+
if (cmd == null && !result.IsCommandResponse)
34+
{
35+
// message was not requested and has been sent by the instance (e.g. heartbeat)
36+
cmd = new InstanceCommandRequest { CommandId = result.CommandId, CommandType = result.CommandType };
37+
}
38+
39+
if (cmd != null)
40+
{
41+
_stateProvider.RemoveAwaitedCommandRequest(cmd.CommandId);
42+
43+
if (cmd.CommandType == ManagementHubCommands.GetInstanceInfo)
44+
{
45+
var instanceInfo = System.Text.Json.JsonSerializer.Deserialize<ManagedInstanceInfo>(result.Value);
46+
47+
if (instanceInfo != null)
48+
{
49+
50+
instanceInfo.LastReported = DateTimeOffset.Now;
51+
_stateProvider.UpdateInstanceConnectionInfo("internal", instanceInfo);
52+
53+
_logger?.LogInformation("Received instance {instanceId} {instanceTitle} for mgmt hub connection.", instanceInfo.InstanceId, instanceInfo.Title);
54+
55+
// if we don't yet have any managed items for this instance, ask for them
56+
if (!_stateProvider.HasItemsForManagedInstance(instanceInfo.InstanceId))
57+
{
58+
var request = new InstanceCommandRequest
59+
{
60+
CommandId = Guid.NewGuid(),
61+
CommandType = ManagementHubCommands.GetManagedItems
62+
};
63+
64+
IssueCommand(request);
65+
}
66+
67+
// if we dont have a status summary, ask for that
68+
if (!_stateProvider.HasStatusSummaryForManagedInstance(instanceInfo.InstanceId))
69+
{
70+
var request = new InstanceCommandRequest
71+
{
72+
CommandId = Guid.NewGuid(),
73+
CommandType = ManagementHubCommands.GetStatusSummary
74+
};
75+
76+
IssueCommand(request);
77+
}
78+
}
79+
}
80+
else
81+
{
82+
// for all other command results we need to resolve which instance id we are communicating with
83+
var instanceId = _stateProvider.GetInstanceIdForConnection("internal");
84+
result.InstanceId = instanceId;
85+
86+
if (!string.IsNullOrWhiteSpace(instanceId))
87+
{
88+
// action this message from this instance
89+
_logger?.LogInformation("Received instance command result {result}", result.CommandType);
90+
91+
if (cmd.CommandType == ManagementHubCommands.GetManagedItems)
92+
{
93+
// got items from an instance
94+
var val = System.Text.Json.JsonSerializer.Deserialize<ManagedInstanceItems>(result.Value);
95+
96+
_stateProvider.UpdateInstanceItemInfo(instanceId, val.Items);
97+
}
98+
else if (cmd.CommandType == ManagementHubCommands.GetStatusSummary && result?.Value != null)
99+
{
100+
// got status summary
101+
var val = System.Text.Json.JsonSerializer.Deserialize<StatusSummary>(result.Value);
102+
103+
_stateProvider.UpdateInstanceStatusSummary(instanceId, val);
104+
}
105+
else
106+
{
107+
// store for something else to consume
108+
if (result.IsCommandResponse)
109+
{
110+
_stateProvider.AddAwaitedCommandResult(result);
111+
}
112+
else
113+
{
114+
// item was not requested, queue for processing
115+
if (result.CommandType == ManagementHubCommands.NotificationUpdatedManagedItem)
116+
{
117+
//_uiStatusHub.Clients.All.SendAsync(Providers.StatusHubMessages.SendManagedCertificateUpdateMsg, System.Text.Json.JsonSerializer.Deserialize<Models.ManagedCertificate>(result.Value));
118+
}
119+
else if (result.CommandType == ManagementHubCommands.NotificationManagedItemRequestProgress)
120+
{
121+
//_uiStatusHub.Clients.All.SendAsync(Providers.StatusHubMessages.SendProgressStateMsg, System.Text.Json.JsonSerializer.Deserialize<Models.RequestProgressState>(result.Value));
122+
}
123+
else if (result.CommandType == ManagementHubCommands.NotificationRemovedManagedItem)
124+
{
125+
// deleted :TODO
126+
//_uiStatusHub.Clients.All.SendAsync(Providers.StatusHubMessages.SendMsg, $"Deleted item {result.Value}");
127+
}
128+
}
129+
}
130+
}
131+
else
132+
{
133+
_logger?.LogError("Received instance command result for an unknown instance {result}", result.CommandType);
134+
}
135+
}
136+
}
137+
138+
return Task.CompletedTask;
139+
}
140+
141+
public Task ReceiveInstanceMessage(InstanceMessage message)
142+
{
143+
144+
var instanceId = _stateProvider.GetInstanceIdForConnection("internal");
145+
if (instanceId != null)
146+
{
147+
// action this message from this instance
148+
_logger?.LogInformation("Received instance message {msg}", message);
149+
}
150+
else
151+
{
152+
_logger?.LogError("Received instance command result for an unknown instance {msg}", message);
153+
}
154+
155+
return Task.CompletedTask;
156+
}
157+
158+
public Task SendCommandRequest(InstanceCommandRequest cmd)
159+
{
160+
IssueCommand(cmd);
161+
162+
return Task.CompletedTask;
163+
}
164+
165+
private async void IssueCommand(InstanceCommandRequest cmd)
166+
{
167+
_stateProvider.AddAwaitedCommandRequest(cmd);
168+
169+
//
170+
var result = await _certifyManager.PerformHubCommandWithResult(cmd);
171+
if (result.IsCommandResponse)
172+
{
173+
result.CommandType = cmd.CommandType;
174+
result.CommandId = cmd.CommandId;
175+
result.InstanceId = _stateProvider.GetManagementHubInstanceId();
176+
177+
await ReceiveCommandResult(result);
178+
}
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)