Skip to content

Commit ad3b8e8

Browse files
committed
Add adapter and example folder
1 parent 3dc4c09 commit ad3b8e8

File tree

2 files changed

+401
-0
lines changed

2 files changed

+401
-0
lines changed

lewis/adapters/opcua.py

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
# -*- coding: utf-8 -*-
2+
# *********************************************************************
3+
# lewis - a library for creating hardware device simulators
4+
# Copyright (C) 2016-2021 European Spallation Source ERIC
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
#
11+
# This program is distributed in the hope that it will be useful,
12+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
# GNU General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
# **
19+
20+
"""
21+
This module provides components to expose a Device via a OPCUA Protocol. The following resources
22+
were used as guidelines and references for implementing the protocol:
23+
24+
- https://opcfoundation.org/wp-content/uploads/2014/05/OPC-UA_Security_EN.pdf
25+
- https://www.opc-router.com/what-is-opc-ua/
26+
- https://www.unified-automation.com/
27+
- https://github.com/bashwork/pymodbus
28+
29+
.. note::
30+
31+
For an example how Modbus can be used in the current implementation, please look
32+
at lewis/examples/modbus_device.
33+
"""
34+
35+
import asyncio
36+
import inspect
37+
import threading
38+
import time
39+
from typing import Any, Dict, Optional, List
40+
from asyncua import Server, Node, ua
41+
from asyncua.common.methods import uamethod
42+
43+
from lewis.core.adapters import Adapter
44+
from lewis.core.devices import InterfaceBase
45+
from lewis.core.logging import has_log
46+
47+
@has_log
48+
class OPCUAAdapter(Adapter):
49+
"""
50+
Adapter for exposing a device via OPCUA.
51+
52+
This adapter creates an OPCUA server that exposes the device's
53+
properties and methods as OPCUA nodes. It handles mapping device properties to OPCUA
54+
noes and translating method calls from OPCUA to device method calls.
55+
56+
:param options: Configuration options for the adapter.
57+
"""
58+
59+
default_options = {
60+
'port' : 4840, #Default OPCUA port
61+
'server_name': 'Lewis OPCUA Server',
62+
'uri': 'urn:lewis:opcua',
63+
'update_interval': 0.1, #Interval for updating variables in seconds
64+
'exclude_properties': [], #Properties excluded from exposure
65+
'read_only_properties': [], #Properties that should be read-only
66+
'security_mode' : 'None', #Security mode options: None, Sign, SignAndEncrypt
67+
'security_policy' : 'None', #Security policy options: None, Basic128Rsa15, Basic256, Basic256Sha256
68+
'certificate' : None, #Path to certificate
69+
'private_key' : None, #Path to private key file
70+
}
71+
72+
protocol = 'opcua'
73+
74+
def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
75+
"""Initialise the OPCUA adapter with given options."""
76+
super(OPCUAAdapter, self).__init__(options)
77+
78+
#Init member variables
79+
self._server = None
80+
self._running = False
81+
self._nodes = {}
82+
self._update_thread = None
83+
self._stop_event = threading.Event()
84+
85+
#Track property values to detect changes
86+
self._property_values = {}
87+
88+
def start_server(self) -> None:
89+
"""
90+
Start the OPCUA server.
91+
92+
This method initialises the OPCUA server, creates the address space,
93+
populates it with nodes that represent device properties and methods,
94+
and starts the server.
95+
"""
96+
97+
if self._running:
98+
return
99+
100+
#Create server
101+
self._server = Server()
102+
103+
#Setup server parameters
104+
endpoint = F"opc.tcp://0.0.0.0:{self._options.port}"
105+
self._server.set_endpoint(endpoint)
106+
self._server.set_server_name(self._options.server_name)
107+
108+
#Configure security if specified
109+
if (self._option.security_mode != 'None' and
110+
self._options.security_policy != 'None' and
111+
self._options.certificate and
112+
self._options.private_key):
113+
self._server.load_certificate(self._options.certificate)
114+
self._server.load_private_key(self._options.private_key)
115+
116+
#Apply security settings
117+
security_string = f"{self._options.security_policy}, {self._options.security_mode}"
118+
self._server.set_security_policy([security_string])
119+
120+
#Setup namespace
121+
uri = self._option.uri
122+
idx = self._server.register_namespace(uri)
123+
124+
#Create node to store the device
125+
objects = self._server.get_objects_node()
126+
device_node = objects.add_object(idx, "Device")
127+
128+
#Add properties as variables and methods
129+
if self.interface:
130+
self._add_properties(idx, device_node)
131+
self._add_methods(idx, device_node)
132+
133+
#Start the server
134+
self._server.start()
135+
self._running = True
136+
137+
#Start the update thread for periodic property updates
138+
self._stop_event.clear()
139+
self._update_thread = threading.Thread(
140+
target=self._update_variables,
141+
daemon=True
142+
)
143+
self._update_thread.start()
144+
145+
self.log.info(f"OPCUA Server started on {endpoint}")
146+
147+
148+
def _add_properties(self, idx: int, device_node: Node) -> None:
149+
"""
150+
Add device properties as OPCUA variables.
151+
152+
:param idx: Namespace index
153+
:param device_node: Device node to add the properties to
154+
"""
155+
156+
for property in dir(self.interface):
157+
#Skip the excluded properties, internal properties, and methods
158+
if (property in self._options.exclude_properties or
159+
property.startswith('_') or
160+
callable(getattr(self.interface, property))):
161+
continue
162+
163+
#Get property value
164+
value = getattr(self.interface, property)
165+
166+
#Determine if the property is writable
167+
writeable = property not in self._options.read_only_properties
168+
169+
#Determine data type
170+
data_type = self._get_ua_data_type(value)
171+
172+
#Create the variable node
173+
var = device_node.add_variable(
174+
idx,
175+
property,
176+
value,
177+
data_type
178+
)
179+
var.set_writeable(writeable)
180+
181+
#Store the node for updates
182+
self._nodes[property] = var
183+
184+
#Store initial value
185+
self._property_values[property] = value
186+
187+
#If writeable, set up callback to handle writes
188+
if writeable:
189+
#definte write callback that updates the device
190+
def make_callback(property_name):
191+
def write_callback(node, val):
192+
with self.device_lock:
193+
setattr(self.interface, property_name, val)
194+
return True
195+
return write_callback
196+
197+
#set the callback
198+
var.set_value_callback = make_callback(property)
199+
200+
201+
def _add_methods(self, idx:int, device_node: Node) -> None:
202+
"""
203+
Add device methods as OPCUA methods.
204+
205+
:param idx: Namespace index
206+
:param device_node: Device node to add the methods to
207+
"""
208+
209+
for method_name in dir(self.interface):
210+
#Skip properties and internal/special methods
211+
if(not callable(getattr(self.interface, method_name)) or
212+
method_name.startswith('_')):
213+
continue
214+
215+
#Get the method
216+
method = getattr(self.interface, method_name)
217+
218+
#Get info about the method's parameters
219+
try:
220+
signature = inspect.signature(method)
221+
222+
#Create input argument descriptions
223+
inputs = []
224+
for param_name, param in signature.parameters.items():
225+
if param_name == 'self':
226+
continue
227+
228+
#Add input argument
229+
inputs.append(ua.Argument(
230+
name=param_name,
231+
data_type=ua.NodeId(ua.ObjectIds.Variant),
232+
value_rank=-1,
233+
array_dimensions=[],
234+
description=""
235+
))
236+
237+
outputs = [
238+
ua.Argument(
239+
name="Result",
240+
data_type=ua.NodeId(ua.ObjectIds.Variant),
241+
value_rank=-1,
242+
array_dimensions=[],
243+
description=""
244+
)
245+
]
246+
247+
#Create a wrapper to call the device method
248+
def method_wrapper(parent, *args):
249+
with self.device_lock:
250+
result = getattr(self.interface, method_name)(*args)
251+
return [result] if result is not None else []
252+
253+
device_node.add_method(
254+
idx,
255+
method_name,
256+
method_wrapper,
257+
inputs,
258+
outputs
259+
)
260+
except Exception as e:
261+
self.log.warning(f"Failed to add method {method_name}: {e}")
262+
263+
def _get_ua_data_type(self, value: Any) -> ua.VariantType:
264+
"""
265+
Determine the OPCUA data type for a given value.
266+
267+
:param value: The value to determine the data type for
268+
:return: OPCUA Variant Type
269+
"""
270+
271+
if isinstance(value, bool):
272+
return ua.VariantType.Boolean
273+
elif isinstance(value, int):
274+
return ua.VariantType.Int64
275+
elif isinstance(value, float):
276+
return ua.VariantType.Double
277+
elif isinstance(value, str):
278+
return ua.VariantType.String
279+
elif isinstance(value, list):
280+
# For lists, use a more specific type if possible
281+
if all(isinstance(x, bool) for x in value):
282+
return ua.VariantType.Boolean
283+
elif all(isinstance(x, int) for x in value):
284+
return ua.VariantType.Int64
285+
elif all(isinstance(x, float) for x in value or isinstance(x, int) for x in value):
286+
return ua.VariantType.Double
287+
else:
288+
return ua.VariantType.Variant
289+
else:
290+
# Default to variant for complex types
291+
return ua.VariantType.Variant
292+
293+
294+
def stop_server(self):
295+
"""
296+
Stop the OPCUA server.
297+
298+
This method stops the update thread and shuts down the OPCUA server.
299+
"""
300+
301+
if not self._running:
302+
return
303+
304+
#Stop the update thread
305+
self._stop_event.set()
306+
if self._update_thread:
307+
self._update_thread.join(timeout=2.0)
308+
self._update_thread = None
309+
310+
#Stop the server
311+
if self._server:
312+
self._server.stop()
313+
self._server = None
314+
315+
self._running = False
316+
self._nodes = {}
317+
self._property_values = {}
318+
319+
self.log.info("OPCUA server stopped")
320+
321+
@property
322+
def is_running(self) -> bool:
323+
"""
324+
Check if the OPCUA server is running.
325+
326+
:return: True if server running, False otherwise
327+
"""
328+
329+
return self._running
330+
331+
def handle(self, cycle_delay: float = 0.1) -> None:
332+
"""
333+
Handle OPCUA operations.
334+
335+
This method is called periodically by Lewis. For OPCUA, most of the
336+
handling is done by the server thread, so this method mainly waits.
337+
338+
:param cycle_delay: Approximate time to spend handling requests
339+
"""
340+
341+
#Most handling is done by the OPCUA server itself
342+
if self._running and self.interface:
343+
time.sleep(min(cycle_delay, self._options.update_interval))
344+
345+
def _update_variables(self) -> None:
346+
"""
347+
Update OPCUA variables with current device values.
348+
349+
This method runs in a separate thread and periodically updates the OPCUA
350+
variables with the current values from the device.
351+
"""
352+
353+
while not self._stop_event.is_set() and self._running and self.interface:
354+
#Update variables that have changed
355+
with self.device_lock:
356+
for property, node in self._nodes.items():
357+
if hasattr(self.interface, property):
358+
current_value = getattr(self.interface, property)
359+
360+
#check if the value has changed
361+
if property not in self._property_values or self._property_values[property] != current_value:
362+
try:
363+
node.set_value(current_value)
364+
self._property_values[property] = current_value
365+
except Exception as e:
366+
self.log.warning(f"Failed to update node {property}: {e}")
367+
368+
self._stop_event.wait(self._options.update_interval)
369+

0 commit comments

Comments
 (0)