Skip to content

Commit 775f977

Browse files
committed
Abstract data services behind an interface and some code cleanup.
1 parent d978151 commit 775f977

File tree

8 files changed

+76
-59
lines changed

8 files changed

+76
-59
lines changed
Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
using Kusto.Data;
44
using Kusto.Data.Common;
55
using Kusto.Data.Net.Client;
6-
using System.Collections.Concurrent;
76
using System.Data;
87
using System.Diagnostics;
8+
using UA_DataProcessor.Interfaces;
99

1010
namespace Opc.Ua.Data.Processor
1111
{
12-
public class ADXDataService : IDisposable
12+
public class ADXDataService : IDataService
1313
{
1414
private ICslQueryProvider _queryProvider = null;
1515

16-
public ADXDataService()
16+
public void Connect()
1717
{
1818
// connect to ADX cluster
1919
string adxClusterName = Environment.GetEnvironmentVariable("ADX_HOST");
@@ -47,8 +47,11 @@ public void Dispose()
4747
}
4848
}
4949

50-
public void RunADXQuery(string query, ConcurrentDictionary<string, object> values, bool allowMultiRow = false)
50+
public Dictionary<string, object> RunQuery(string query)
5151
{
52+
bool allowMultiRow = false;
53+
Dictionary<string, object> values = new();
54+
5255
ClientRequestProperties clientRequestProperties = new ClientRequestProperties()
5356
{
5457
ClientRequestId = Guid.NewGuid().ToString()
@@ -107,6 +110,8 @@ public void RunADXQuery(string query, ConcurrentDictionary<string, object> value
107110
{
108111
Console.WriteLine("RunADXQuery: " + ex.Message);
109112
}
113+
114+
return values;
110115
}
111116
}
112117
}
Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
using System.Net;
55
using System.Net.Http.Headers;
66
using System.Text;
7+
using UA_DataProcessor.Interfaces;
78

89
namespace Opc.Ua.Data.Processor
910
{
10-
public class DynamicsDataService : IDisposable
11+
public class DynamicsDataService : IDataService
1112
{
1213
private HttpClient _client = null;
1314

@@ -17,7 +18,7 @@ public class DynamicsDataService : IDisposable
1718
private string _tenantId = string.Empty;
1819
private string _environmentId = string.Empty;
1920

20-
public DynamicsDataService()
21+
public void Connect()
2122
{
2223
_client = new();
2324

@@ -83,8 +84,21 @@ private async Task Authorize()
8384
}
8485
}
8586

86-
public async Task<DynamicsQueryResponse> RunDynamicsQuery(DynamicsQuery query)
87+
public Dictionary<string, object> RunQuery(string query)
8788
{
89+
Dictionary<string, object> results = new();
90+
91+
string[] queryLines = query.Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
92+
DynamicsQuery queryRequest = new()
93+
{
94+
tracingDirection = queryLines.Length > 0 ? queryLines[0] : string.Empty,
95+
company = queryLines.Length > 1 ? queryLines[1] : string.Empty,
96+
itemNumber = queryLines.Length > 2 ? queryLines[2] : string.Empty,
97+
batchNumber = queryLines.Length > 3 ? queryLines[3] : string.Empty,
98+
serialNumber = queryLines.Length > 4 ? queryLines[4] : string.Empty,
99+
shouldIncludeEvents = true
100+
};
101+
88102
if (!string.IsNullOrEmpty(_instanceEndpoint))
89103
{
90104
try
@@ -109,7 +123,7 @@ public async Task<DynamicsQueryResponse> RunDynamicsQuery(DynamicsQuery query)
109123
Debug.WriteLine("Bearer Token expired! Attempting to retrieve a new barer token.");
110124

111125
// re-authorize
112-
await Authorize().ConfigureAwait(false);
126+
Authorize().GetAwaiter().GetResult();
113127

114128
// re-try our data request, using the updated bearer token
115129
response = _client.Send(
@@ -131,18 +145,15 @@ public async Task<DynamicsQueryResponse> RunDynamicsQuery(DynamicsQuery query)
131145
throw new Exception(response.StatusCode.ToString());
132146
}
133147

134-
return JsonConvert.DeserializeObject<DynamicsQueryResponse>(await response.Content.ReadAsStringAsync().ConfigureAwait(false));
148+
results.Add(query, JsonConvert.DeserializeObject<DynamicsQueryResponse>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult()));
135149
}
136150
catch (Exception ex)
137151
{
138152
Console.WriteLine("RunDynamicsQuery: " + ex.Message);
139-
return null;
140-
}
141-
}
142-
else
143-
{
144-
return null;
145-
}
146-
}
153+
}
154+
}
155+
156+
return results;
157+
}
147158
}
148159
}

Interfaces/IDataService.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace UA_DataProcessor.Interfaces
2+
{
3+
public interface IDataService : IDisposable
4+
{
5+
public void Connect();
6+
7+
public Dictionary<string, object> RunQuery(string query);
8+
}
9+
}
File renamed without changes.
File renamed without changes.
Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
using Newtonsoft.Json;
33
using Newtonsoft.Json.Linq;
44
using Opc.Ua.Cloud.Client.Models;
5-
using System.Collections.Concurrent;
65
using System.Net.Http.Headers;
76
using System.Text;
87

98
namespace Opc.Ua.Data.Processor
109
{
11-
public class ProductCarbonFootprintService
10+
public class PCFProcessor
1211
{
1312
private readonly ADXDataService _adxDataService = new ADXDataService();
1413
private readonly DynamicsDataService _dynamicsDataService = new DynamicsDataService();
@@ -21,15 +20,21 @@ public class ProductCarbonFootprintService
2120
}
2221
};
2322

24-
public void GeneratePCFs()
23+
public PCFProcessor()
24+
{
25+
_adxDataService.Connect();
26+
_dynamicsDataService.Connect();
27+
}
28+
29+
public void Process()
2530
{
2631
// we have two production lines in the manufacturing ontologies production line simulation and they are connected like so:
2732
// assembly -> test -> packaging
28-
GeneratePCFForProductionLine("Munich", "48.1375", "11.575", 6);
29-
GeneratePCFForProductionLine("Seattle", "47.609722", "-122.333056", 10);
33+
CalcPCFForProductionLine("Munich", "48.1375", "11.575", 6);
34+
CalcPCFForProductionLine("Seattle", "47.609722", "-122.333056", 10);
3035
}
3136

32-
private void GeneratePCFForProductionLine(string productionLineName, string latitude, string longitude, int idealCycleTime)
37+
private void CalcPCFForProductionLine(string productionLineName, string latitude, string longitude, int idealCycleTime)
3338
{
3439
try
3540
{
@@ -39,21 +44,21 @@ private void GeneratePCFForProductionLine(string productionLineName, string lati
3944
{
4045
// check if a new product was produced (last machine in the production line, i.e. packaging, is in state 2 ("done") with a passed QA)
4146
// and get the products serial number and energy consumption at that time
42-
ConcurrentDictionary<string, object> latestProductProduced = ADXQueryForSpecificValue("packaging", productionLineName, "Status", 2);
47+
Dictionary<string, object> latestProductProduced = ADXQueryForSpecificValue("packaging", productionLineName, "Status", 2);
4348
if ((latestProductProduced != null) && (latestProductProduced.Count > 0))
4449
{
45-
ConcurrentDictionary<string, object> serialNumberResult = ADXQueryForSpecificTime("packaging", productionLineName, "ProductSerialNumber", ((DateTime)latestProductProduced["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
50+
Dictionary<string, object> serialNumberResult = ADXQueryForSpecificTime("packaging", productionLineName, "ProductSerialNumber", ((DateTime)latestProductProduced["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
4651
double serialNumber = (double)serialNumberResult["OPCUANodeValue"];
4752

48-
ConcurrentDictionary<string, object> timeItWasProducedPackaging = ADXQueryForSpecificValue("packaging", productionLineName, "ProductSerialNumber", serialNumber);
49-
ConcurrentDictionary<string, object> energyPackaging = ADXQueryForSpecificTime("packaging", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedPackaging["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
53+
Dictionary<string, object> timeItWasProducedPackaging = ADXQueryForSpecificValue("packaging", productionLineName, "ProductSerialNumber", serialNumber);
54+
Dictionary<string, object> energyPackaging = ADXQueryForSpecificTime("packaging", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedPackaging["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
5055

5156
// check each other machine for the time when the product with this serial number was in the machine and get its energy comsumption at that time
52-
ConcurrentDictionary<string, object> timeItWasProducedTest = ADXQueryForSpecificValue("test", productionLineName, "ProductSerialNumber", serialNumber);
53-
ConcurrentDictionary<string, object> energyTest = ADXQueryForSpecificTime("test", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedTest["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
57+
Dictionary<string, object> timeItWasProducedTest = ADXQueryForSpecificValue("test", productionLineName, "ProductSerialNumber", serialNumber);
58+
Dictionary<string, object> energyTest = ADXQueryForSpecificTime("test", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedTest["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
5459

55-
ConcurrentDictionary<string, object> timeItWasProducedAssembly = ADXQueryForSpecificValue("assembly", productionLineName, "ProductSerialNumber", serialNumber);
56-
ConcurrentDictionary<string, object> energyAssembly = ADXQueryForSpecificTime("assembly", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedAssembly["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
60+
Dictionary<string, object> timeItWasProducedAssembly = ADXQueryForSpecificValue("assembly", productionLineName, "ProductSerialNumber", serialNumber);
61+
Dictionary<string, object> energyAssembly = ADXQueryForSpecificTime("assembly", productionLineName, "EnergyConsumption", ((DateTime)timeItWasProducedAssembly["Timestamp"]).ToString("yyyy-MM-dd HH:mm:ss"), idealCycleTime);
5762

5863
// calculate the total energy consumption for the product by summing up all the machines' energy consumptions (in Ws), divide by 3600 to get seconds and multiply by the ideal cycle time (which is in seconds)
5964
double energyTotal = ((double)energyAssembly["OPCUANodeValue"] + (double)energyTest["OPCUANodeValue"] + (double)energyPackaging["OPCUANodeValue"]) / 3600 * idealCycleTime;
@@ -77,7 +82,7 @@ private void GeneratePCFForProductionLine(string productionLineName, string lati
7782
}
7883
catch (Exception ex)
7984
{
80-
Console.WriteLine("GeneratePCFForProductionLine: " + ex.Message);
85+
Console.WriteLine("CalcPCFForProductionLine: " + ex.Message);
8186
}
8287
}
8388

@@ -132,17 +137,15 @@ private float RetrieveScope3Emissions()
132137
{
133138
try
134139
{
135-
DynamicsQueryResponse response = _dynamicsDataService.RunDynamicsQuery(new DynamicsQuery() {
136-
tracingDirection = "Backward",
137-
company = Environment.GetEnvironmentVariable("DYNAMICS_COMPANY_NAME"),
138-
itemNumber = Environment.GetEnvironmentVariable("DYNAMICS_PRODUCT_NAME"),
139-
serialNumber = Environment.GetEnvironmentVariable("DYNAMICS_BATCH_NAME"),
140-
shouldIncludeEvents = true
141-
}).GetAwaiter().GetResult();
142-
143-
if (response != null)
140+
string query = "Backward" + "\r\n"
141+
+ Environment.GetEnvironmentVariable("DYNAMICS_COMPANY_NAME") + "\r\n"
142+
+ Environment.GetEnvironmentVariable("DYNAMICS_PRODUCT_NAME") + "\r\n"
143+
+ Environment.GetEnvironmentVariable("DYNAMICS_BATCH_NAME") + "\r\n";
144+
145+
Dictionary<string, object> response = _dynamicsDataService.RunQuery(query);
146+
if (response.ContainsKey(query) && (response[query] != null) && response[query] is DynamicsQueryResponse dynamicsResponse)
144147
{
145-
return FindPcf(response.root);
148+
return FindPcf(dynamicsResponse.root);
146149
}
147150
else
148151
{
@@ -191,7 +194,7 @@ private float FindPcf(ErpNode node)
191194
return 0.0f;
192195
}
193196

194-
private ConcurrentDictionary<string, object> ADXQueryForSpecificValue(string stationName, string productionLineName, string valueToQuery, double desiredValue)
197+
private Dictionary<string, object> ADXQueryForSpecificValue(string stationName, string productionLineName, string valueToQuery, double desiredValue)
195198
{
196199
string query = "opcua_metadata_lkv\r\n"
197200
+ "| where Name contains \"" + stationName + "\"\r\n"
@@ -203,13 +206,10 @@ private ConcurrentDictionary<string, object> ADXQueryForSpecificValue(string sta
203206
+ "| distinct Timestamp, OPCUANodeValue = todouble(Value)\r\n"
204207
+ "| sort by Timestamp desc";
205208

206-
ConcurrentDictionary<string, object> values = new ConcurrentDictionary<string, object>();
207-
_adxDataService.RunADXQuery(query, values);
208-
209-
return values;
209+
return _adxDataService.RunQuery(query);
210210
}
211211

212-
private ConcurrentDictionary<string, object> ADXQueryForSpecificTime(string stationName, string productionLineName, string valueToQuery, string timeToQuery, int idealCycleTime)
212+
private Dictionary<string, object> ADXQueryForSpecificTime(string stationName, string productionLineName, string valueToQuery, string timeToQuery, int idealCycleTime)
213213
{
214214
string query = "opcua_metadata_lkv\r\n"
215215
+ "| where Name contains \"" + stationName + "\"\r\n"
@@ -222,10 +222,7 @@ private ConcurrentDictionary<string, object> ADXQueryForSpecificTime(string stat
222222
+ "| where around(Timestamp, datetime(" + timeToQuery + "), " + idealCycleTime.ToString() + "s)\r\n"
223223
+ "| sort by Timestamp desc";
224224

225-
ConcurrentDictionary<string, object> values = new ConcurrentDictionary<string, object>();
226-
_adxDataService.RunADXQuery(query, values);
227-
228-
return values;
225+
return _adxDataService.RunQuery(query);
229226
}
230227
}
231228
}

Program.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,19 @@
2727
* http://opcfoundation.org/License/MIT/1.00/
2828
* ======================================================================*/
2929

30-
using Newtonsoft.Json;
31-
using Opc.Ua.Cloud.Client.Models;
32-
using System.Globalization;
33-
using System.Text;
34-
3530
[assembly: CLSCompliant(false)]
3631
namespace Opc.Ua.Data.Processor
3732
{
3833
public class Program
3934
{
4035
static async Task Main()
4136
{
42-
ProductCarbonFootprintService pcfService = new ProductCarbonFootprintService();
37+
PCFProcessor pcfProcessor = new PCFProcessor();
4338
while (true)
4439
{
4540
try
4641
{
47-
pcfService.GeneratePCFs();
42+
pcfProcessor.Process();
4843
await Task.Delay(5000).ConfigureAwait(false); // Wait 5s before next processing
4944
}
5045
catch (Exception ex)

0 commit comments

Comments
 (0)