You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm currently facing a problem implementing an opc-ua client service in form of a fastAPI app.
In fact, everything is working well to subscribe and unsubscribe to the server nodes, but I can't figure out a way to catch and handle the exceptions coming from the _monitor_server_loop method of asyncua.Client.
FastAPI app code
import asyncio
import json
import logging
import os
import time
# from asyncua import Client
from lysr_opc.CustomClient import AsyncuaClient as Client
from confluent_kafka.admin import AdminClient, NewTopic
from fastapi import FastAPI
from lysr_opc.opcua_browser import OPCUABrowser
from lysr_opc.opcua_subscriber import OPCUASubscriber
from lysr_opc.utils import convert_nodeid_to_string
from lysr_opc.config import OPCUA_SERVER_URL, KAFKA_URL, LOG_LEVEL
from lysr_opc.utils import get_logger
api_logger = get_logger(LOG_LEVEL)
logging.getLogger("asyncua").setLevel(logging.WARNING)
# OPCUA_SERVER_URL = os.environ.get('OPC_SERVER_URL')
# KAFKA_BROKER = os.environ.get('KAFKA_BROKER')
# DEVICE_ID = os.environ.get('DEVICE_ID')
app = FastAPI()
subscription = None
subscribers = {}
client = Client(OPCUA_SERVER_URL)
subscriber = None
def handle_exception(context):
# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
api_logger.exception(f"!!!!!!!!!!!!!!!!!! Caught exception: {msg}")
@app.on_event("startup")
async def startup_event():
global client, subscriber
try:
await client.connect()
api_logger.info("Client connected !")
except Exception as e:
if isinstance(e, TimeoutError):
api_logger.exception(f"Error : {type(e).__name__}, unable to connect to server, verify address.")
else:
api_logger.exception(f"Error : {type(e).__name__}, {e}")
try:
subscriber = OPCUASubscriber(client)
except Exception as e:
api_logger.exception(f"Error : {type(e).__name__}, {e}")
return {"status": f"Error : {type(e).__name__}, {e}"}
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
@app.on_event("shutdown")
async def shutdown_event():
global client, subscriber
await subscriber.kill()
await client.disconnect()
api_logger.info("Client disconnected !")
# Methods used for debug purpose
@app.get("/createTopic/{nodeid}")
async def create_topic(nodeid: str):
# With confluent-kafka
admin_kafka = AdminClient({'bootstrap.servers': KAFKA_URL})
if admin_kafka.list_topics().topics.get(convert_nodeid_to_string(nodeid)) is None:
ret = f"Topic {nodeid} already existing."
api_logger.info(ret)
return ret
new_topic = NewTopic(convert_nodeid_to_string(nodeid), num_partitions=1, replication_factor=1)
fs = admin_kafka.create_topics([new_topic])
ret = ""
for topic, f in fs.items():
try:
f.result() # The result itself is None
ret = f"Topic {topic} created"
api_logger.info(ret)
except Exception as e:
ret = f"Failed to create topic {topic}: {e}"
api_logger.info(ret)
return ret
@app.get("/getRunningThreads")
async def get_running_threads():
import threading
for thread in threading.enumerate():
api_logger.info(f"Thread name: {thread.name} | Thread ID: {thread.ident}")
# return threading.enumerate()
# End of debug methods
@app.get("/browseServer")
async def execute_browse_command():
browser = OPCUABrowser(client)
api_logger.info("Starting to design the browsing tree...")
start = time.time()
tree = await browser.get_tree(["0:Objects"])
end = time.time()
api_logger.info(f"Tree design established in {end - start} seconds !")
browser.export_json(output_file="../tree_light.json")
api_logger.info("Tree design exported !")
return json.dumps(tree, indent=2)
@app.post("/subscribe/{nodeid}")
async def subscribe_to_node(nodeid: str):
global subscriber
try:
status = await subscriber.subscribe_node(nodeid)
except Exception as e:
api_logger.exception(f"Error : {type(e).__name__}, {e}")
return {"status": f"Error : {type(e).__name__}, {e}"}
return status
@app.post("/subscribeMultiple")
async def subscribe_to_multiple_nodes(nodeids: list[str]):
global subscriber
res = await subscriber.subscribe_nodes(nodeids)
return {"status": res}
@app.post("/unsubscribe/{nodeid}")
async def unsubscribe_to_node(nodeid: str):
global subscriber
try:
status = await subscriber.unsubscribe(nodeid)
except Exception as e:
api_logger.exception(f"Error : {type(e).__name__}, {e}")
return {"status": f"Error : {type(e).__name__}, {e}"}
return status
@app.post("/unsubscribeMultiple")
async def unsubscribe_to_multiple_nodes(nodeids: list[str]):
global subscribers
res = await subscriber.unsubscribe_nodes(nodeids)
return {"status": res}
# @app.get("/getStatus")
# async def get_status():
# global subscriber
# return subscriber.get_status()
The OPCUASubscriber.subscribe_node(nodeid) code :
async def subscribe_node(self, nodeid):
if nodeid in self.subscriptionsStates.keys():
if self.subscriptionsStates[nodeid].status == SubscriptionStatus.SUBSCRIPTION_FAILED:
self.logger.info(f"Subscription for nodeid {nodeid} previously failed, retrying...")
elif self.subscriptionsStates[nodeid].status == SubscriptionStatus.ACTIVE:
self.logger.info(f"Subscription already existing for node {nodeid}")
return self.subscriptionsStates[nodeid]
try:
found_node = self.client.get_node(nodeid=nodeid)
found_node_name = await found_node.read_browse_name()
self.node_map[nodeid] = found_node_name.Name
if self.sub is None:
self.handler = SubHandler(self.node_map, producer=self.producer)
self.sub = await self.client.create_subscription(period=1000,handler=self.handler)
if self.handler is not None:
self.handler.update_node_map(self.node_map)
# self.subs[nodeid] = sub
monitored_item = await self.sub.subscribe_data_change(found_node)
self.monitored_items[nodeid] = monitored_item
self.subscriptionsStates[nodeid] = SubScriptionState(nodeid=nodeid, message="Subscription success", status=SubscriptionStatus.ACTIVE)
except Exception as e:
self.subscriptionsStates[nodeid] = SubScriptionState(nodeid=nodeid, message=f"Error : {type(e).__name__}, {e}", status=SubscriptionStatus.SUBSCRIPTION_FAILED)
self.logger.exception(f"Error : {type(e).__name__}, {e}")
return self.subscriptionsStates[nodeid]
I guess the fact that it isn't used inside a "with" statement or an infinite loop is the problem to catch the exceptions launched by an asyncio task inside the client... Has anyone ever faced this problem or has a solution ?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello everyone,
I'm currently facing a problem implementing an opc-ua client service in form of a fastAPI app.
In fact, everything is working well to subscribe and unsubscribe to the server nodes, but I can't figure out a way to catch and handle the exceptions coming from the _monitor_server_loop method of asyncua.Client.
FastAPI app code
The OPCUASubscriber.subscribe_node(nodeid) code :
I guess the fact that it isn't used inside a "with" statement or an infinite loop is the problem to catch the exceptions launched by an asyncio task inside the client... Has anyone ever faced this problem or has a solution ?
Beta Was this translation helpful? Give feedback.
All reactions