Microsoft Fabric is an all-in-one analytics solution that covers everything from data movement to data science, analytics, and business intelligence. It offers a comprehensive suite of services, including data lake, data engineering, and data integration, all in one place. You don't even need an Azure subscription for it, let alone deploy or manage any apps or services. You can get started with Microsoft Fabric here.
To configure Microsoft Fabric for production line data, you need at least 1 OPC UA server integrated into your produciton line that support OPC UA PubSub. Alternatively, you can use an OPC UA Client/Server to OPC UA PubSub adapter app like UA Cloud Publisher used in this reference solution. Then follow these steps:
- Log into Microsoft Fabric here.
- Create a
Eventhouseby clickingCreate->See all->Eventhouseand give it a name, e.g.opcua. ClickCreate. - Under
KQL DatabaseandDatabase detailsactivate the settingOneLake availability. This will enable sharing your OPC UA time-series data from your production line within your organization via OneLake in Parquet file format. ClickDone.
Create the tables you need for ingesting the OPC UA PubSub data by clicking Explore your data, deleting the sample data in the text box and then entering the following Kusto commands, and then clicking on each itema dn click Run:
// Create a landing table for raw OPC UA telemetry
.create table opcua_raw(payload: dynamic)
// Create an intermediate table to unbatch our OPC UA PubSub messages into
.create table opcua_intermediate(DataSetWriterID: string, Timestamp: datetime, Payload: dynamic)
// Create our final OPC UA telemetry table
.create table opcua_telemetry (DataSetWriterID: string, Timestamp: datetime, Name: string, Value: dynamic)
// Create a landing table for raw OPC UA metadata
.create table opcua_metadata_raw(payload: dynamic)
// Create an OPC UA metadata landing table
.create table opcua_metadata(DataSetWriterID: string, Timestamp: datetime, Name: string, Type: string, DisplayName:string, Workcell: string, Line: string, Area: string, Site: string, Enterprise: string, NamespaceUri: string, NodeId: string)
Then run the following Kusto commands each:
// Create a function to do the raw OPC UA expansion
.create-or-alter function OPCUARawExpand() { opcua_raw | mv-expand records = payload.Messages | where records != '' | project DataSetWriterID = tostring(records["DataSetWriterId"]), Timestamp = todatetime(records["Timestamp"]), Payload = todynamic(records["Payload"]) }
// Create a function to do the OPC UA dataset expansion
.create-or-alter function OPCUADatasetExpand() { opcua_intermediate | mv-apply Payload on (extend key = tostring(bag_keys(Payload)[0]) | extend p = Payload[key] | project Name = key, Value = todynamic(p.Value)) }
// Create a function to do the raw OPC UA metadata expansion
.create-or-alter function OPCUAMetaDataExpand() { opcua_metadata_raw | parse tostring(payload.MetaData.Name) with * ":" Workcell "." Line "." Area "." Site "." Enterprise ";nsu=" NamespaceUri ";" NodeId | project DataSetWriterId = tostring(payload.DataSetWriterId), Timestamp = todatetime(payload.Timestamp), Name = tostring(payload.MetaData.Name), Type = tostring(payload.MetaData.Fields[0].Description), DisplayName = tostring(payload.MetaData.Fields[0].Name), Workcell, Line, Area, Site, Enterprise, NamespaceUri, NodeId }
// Create a materialized view for the last known value (LKV) of our metadata
.create materialized-view opcua_metadata_lkv on table opcua_metadata { opcua_metadata | summarize arg_max(Timestamp, *) by Name, DataSetWriterID }
Then run the following Kusto commands each:
// Create mapping from JSON ingestion to the landing table
.create-or-alter table opcua_raw ingestion json mapping 'opcua_mapping' '[{"column":"payload","path":"$","datatype":"dynamic"}]'
// Apply the raw expansion function to the OPC UA raw table
.alter table opcua_intermediate policy update @'[{"Source": "opcua_raw", "Query": "OPCUARawExpand()", "IsEnabled": "True"}]'
// Apply the dataset expansion function to the intermediate table
.alter table opcua_telemetry policy update @'[{"Source": "opcua_intermediate", "Query": "OPCUADatasetExpand()", "IsEnabled": "True"}]'
// Create mapping from JSON ingestion to the metadata landing table
.create-or-alter table opcua_metadata_raw ingestion json mapping 'opcua_metadata_mapping' '[{"column":"payload","path":"$","datatype":"dynamic"}]'
// Apply the raw metadata expansion function to the metadata landing table
.alter table opcua_metadata policy update @'[{"Source": "opcua_metadata_raw", "Query": "OPCUAMetaDataExpand()", "IsEnabled": "True"}]'
- Create an
Eventstreamby clickingCreate->See all->Eventstreamand give it a name (e.g.eventstream_opcua_telemetry). ClickCreate. This component will receive the OPC UA PubSub production line telemetry and send it to your KQL database. - Click
New sourceand selectCustom Appand give it a name (e.g.opcua_telemetry). ClickAdd. In theInformationbox, click onConnection string-primary keyand copy it. You will need it soon when configuring UA Cloud Publisher. - Create another
Eventstreamby clickingCreate->See all->Eventstreamand give it a name (e.g.eventstream_opcua_metadata). ClickCreate. This component will receive the OPC UA PubSub production line metadata and send it to your KQL database. - Click
New sourceand selectCustom Appand give it a name (e.g.opcua_metadata). ClickAdd. In theInformationbox, click onConnection string-primary keyand copy it. You will need it soon when configuring UA Cloud Publisher.
You can either follow the steps for connecting your own production line described here or you can modify the configuration of the UA Cloud Publisher setup in the production line simulation provided in this repository, for example for the Munich production line. For the latter, follow these steps:
- Log into the VM deployed with this reference solution, open an Administrator Powershell window and run
Get-AksEdgeNodeAddras well askubectl get services -n munich. - Open a browser on the VM and enter the IP address and port retrieved for UA Cloud Publisher in the previous step in the address field (e.g.
http://192.168.0.2:30356) to access the UA Cloud Publisher UI. - In the UA Cloud Publisher UI, click
Configurationand enter theConnection string-primary keyfrom theopcua_telemetrycustom app you copied earlier into theBroker Passwordfield, enter$ConnectionStringinto theBroker Usernamefield, enter theEntityPathinto theBroker Message Topic(the entity path is contained at the end of the connection string and starts with "es_") and the name of your custom app into theBroker URLfield (the custom app name is contained within the connection string and starts witheventstream-and ends with.servicebus.windows.net). - Select the checkbox
Use Alternative Broker For OPC UA Metadata Messagesand enter theConnection string-primary keyfrom theopcua_metadatacustom app you copied earlier into theAlternative Broker Passwordfield, enter9093in theAlternative Broker Portfield, enter$ConnectionStringinto theAlternative Broker Usernamefield, enter theEntityPathinto theBroker Metadata Topic(the entity path is contained at the end of the connection string and starts with "es_") and the name of your custom app into theAlternative Broker URLfield (the custom app name is contained within the connection string and starts witheventstream-and ends with.servicebus.windows.net). - Set the
Metadata Send Interval in Secondsto3000. - Click
Applyat the top of the configuration page. - In the UA Cloud Publisher UI, click
Diagnosticsand verify that you have a connection to Microsoft Fabric (Connected to broker(s)is set toTrue). - Back in Microsoft Fabric, select your workspace, click on your
eventstream_opcua_telemetryevent stream and selectOpen Eventsteam. - Click
New destinationand selectKQL Databaseand give it a name (e.g.kql_db_opcua_telemetry). UnderWorkspace, select you Fabric workspace (e.g.My workspace) and underKQL Database, select your KQL database you setup earlier. ClickAdd and configure. - In the
Ingest datapopup window, underTable, selectExisting table, selectopcua_rawand clickNext: Source. Leave everything the way it is and clickNext: Schema. Wait a few seconds for the ingested data to be made available. UnderData format, selectJSON. UnderMapping name, selectUse existing mappingand selectopcua_mapping. ClickNext: Summary. ClickClose. - Select your workspace, click on your
eventstream_opcua_metadataevent stream and selectOpen Eventsteam. - Click
New destinationand selectKQL Databaseand give it a name (e.g.kql_db_opcua_metadata). UnderWorkspace, select you Fabric workspace (e.g.My workspace) and underKQL Database, select your KQL database you setup earlier. ClickAdd and configure. - In the
Ingest datapopup window, underTable, selectExisting table, selectopcua_raw_metadataand clickNext: Source. Leave everything the way it is and clickNext: Schema. Wait a few seconds for the ingested data to be made available. UnderData format, selectJSON. UnderMapping name, selectUse existing mappingand selectopcua_metadata_mapping. ClickNext: Summary. ClickClose.
To share your OPC UA data via OneLake, create a Lakehouse by clicking Create -> See all -> Lakehouse and give it a name, e.g. opcua_lake. Click Create.
- Click
New shortcut, selectMicrosoft OneLake, select your KQL database, expand theTablesand selectopcua_telemetry. - Click
New shortcut, selectMicrosoft OneLake, select your KQL database, expand theTablesand selectopcua_metadata.
Click on your workspace, select Lineage view to see the entire flow of OPC UA data you have just setup in Microsoft Fabric.
Click on our KQL Database and select Open KQL Database followed by Explore your data. Delete the sample queries and enter the following query in the text box:
let _startTime = ago(1h);
let _endTime = now();
opcua_metadata
| where Name contains "assembly"
| where Name contains "munich"
| join kind=inner (opcua_telemetry
| where Name == "Status"
| where Timestamp > _startTime and Timestamp < _endTime
) on DataSetWriterID
| extend energy = todouble(Value)
| project Timestamp1, energy
| sort by Timestamp1 desc
| render linechart
.create-or-alter function QuerySpecificValue(stationName: string, productionLineName: string, valueToQuery: string, desiredValue: real) {
opcua_metadata_lkv
| where Name contains stationName
| where Name contains productionLineName
| join kind = inner(opcua_telemetry
| where Name == valueToQuery
| where Value == desiredValue
| where Timestamp > ago(5m)
) on DataSetWriterID
| project Timestamp1
| sort by Timestamp1 desc
| take 1
}
.create-or-alter function QuerySpecificTime(stationName: string, productionLineName: string, valueToQuery: string, timeToQuery: datetime, idealCycleTime: timespan) {
opcua_metadata_lkv
| where Name contains stationName
| where Name contains productionLineName
| join kind = inner(opcua_telemetry
| where Name == valueToQuery
| where Timestamp > ago(5m)
) on DataSetWriterID
| where around(Timestamp1, timeToQuery, idealCycleTime)
| sort by Timestamp1 desc
| project Value
| take 1
}
.create-or-alter function EnergyPerPart(productionLineName: string, idealCycleTime: timespan) {
// check if a new part was produced (last machine in the production line, i.e. packaging, is in state 2 ("done") with a passed QA)
// and get the part's serial number and energy consumption at that time
let timeLatestProductWasProduced = toscalar(QuerySpecificValue("packaging", productionLineName, "Status", "2"));
let serialNumber = toscalar(QuerySpecificTime("packaging", productionLineName, "ProductSerialNumber", timeLatestProductWasProduced, idealCycleTime));
//
let timePartWasProducedPackaging = toscalar(timeLatestProductWasProduced);
let energyPackaging = toscalar(QuerySpecificTime("packaging", productionLineName, "EnergyConsumption", timePartWasProducedPackaging, idealCycleTime));
//
// check each other machine for the time when the product with this serial number was in the machine and get its energy comsumption at that time
let timePartWasProducedTest = toscalar(QuerySpecificValue("test", productionLineName, "ProductSerialNumber", serialNumber));
let energyTest = toscalar(QuerySpecificTime("test", productionLineName, "EnergyConsumption", timePartWasProducedTest, idealCycleTime));
//
let timePartWasProducedAssembly = toscalar(QuerySpecificValue("assembly", productionLineName, "ProductSerialNumber", serialNumber));
let energyAssembly = toscalar(QuerySpecificTime("assembly", productionLineName, "EnergyConsumption", timePartWasProducedAssembly, idealCycleTime));
//
// calculate the total energy consumption for the product by summing up all the machines' energy consumptions (in kW), multiply by 1000 to get Watts and then multiply by the ideal cycle time (which is in seconds) divided by 3600 to get Wh
let totalenergy = (todouble(energyAssembly) + todouble(energyTest) + todouble(energyPackaging)) * 1000 * todouble(format_timespan(idealCycleTime, "s")) / 3600;
print serialNumber, timeLatestProductWasProduced, totalenergy
}