Skip to content

Read values and subscription give different values #1925

@lounis-benamar-suez

Description

@lounis-benamar-suez

Hi, I don't really know if it's a bug. We have a case where we have to use a subscription that could last a very long time. We handle the lost connection issue and other tricks, but I can see that with exactly the same configuration as UAExpert (subscription and monitored item settings) the subscription miss lots of values and sometime nothing change for a day. I tried to use read_values method in parallele of subscription and this return us sometime very old values, I really don't know what is happening. I forgot to mention that we target 900 nodes.

Example of different values between polling and subscription (i.e. nodeId-1234):

time|value|node_id|source
2026-01-27 20:33:39.315072|True|ns=14;nodeId-1234|subscription
2026-01-27 20:33:39.315398|2026-01-27 20:33:39.312882+00:00|i=2258|subscription
2026-01-27 20:33:44.314623|2026-01-27 20:33:44.265685+00:00|i=2258|subscription
2026-01-27 20:33:46.804887|2026-01-27 20:33:46.593481+00:00|i=2258|polling
2026-01-27 20:33:46.805230|False|ns=14;nodeId-1234|polling

Example of blocked subscription (with the opc server time node):

2026-01-26 20:49:41.355702|2026-01-26 20:49:41.305494+00:00|i=2258
2026-01-26 20:49:44.758193|2026-01-26 20:49:44.699704+00:00|i=2258
2026-01-27 08:40:52.209451|2026-01-27 08:40:52.153587+00:00|i=2258
2026-01-27 08:40:54.759477|2026-01-27 08:40:54.675261+00:00|i=2258
2026-01-27 08:41:03.838510|2026-01-27 08:41:03.794363+00:00|i=2258

The code is very simple like this:

class DataChangeHandler:
    def datachange_notification(self, node, val, data):
        try:
            state_queue.put_nowait({
                "node_id": node.nodeid.to_string(),
                "value": val,
                "source_timestamp": data.monitored_item.Value.SourceTimestamp,
                "origin": "subscription",
            })
        except Exception:
            logger.exception("Failed to enqueue subscription value")

    def event_notification(self, event):
        logger.info("Event: %s", event)

    def status_change_notification(self, status):
        logger.info(
            f"Notification status changed. Name: {status.Status.name}, Code: {status.Status.value}, Message: {status.Status.doc}")


async def main_async():
    handler = DataChangeHandler()
    asyncio.create_task(state_worker())
    skip = False
    opcua_client = None

    while True:
        try:
            if skip and opcua_client:
                await opcua_client.client.check_connection()  # ConnectionError('Connection is closed')
                await asyncio.sleep(10)
                continue

            opcua_client = AsyncOpcuaClient(url, username, password)
            await opcua_client.connect()

            if not state or not nodes:
                for node_id in NODE_IDS_TO_SUBSCRIBE:
                    node_instance = opcua_client.client.get_node(node_id)
                    nodes.append(node_instance)
                    state[node_id] = {
                        'instance': node_instance,
                        'source_timestamp': None,
                        'last_value': None,
                    }

            uaParameters = ua.CreateSubscriptionParameters()
            uaParameters.RequestedPublishingInterval = 5000
            uaParameters.RequestedLifetimeCount = 2400
            uaParameters.RequestedMaxKeepAliveCount = 10  # self.get_keepalive_count(period)
            uaParameters.Priority = 255

            await opcua_client.create_subscription(uaParameters, handler)
            await opcua_client.subscribe_data_changes(NODE_IDS_TO_SUBSCRIBE)
            await opcua_client.client.check_connection()  # ConnectionError('Connection is closed')
            skip = True

        except (Exception, ConnectionError) as e:
            logger.exception("Error while monitoring alarms: %s", e)
            skip = False

            try:
                await opcua_client.disconnect()
            except Exception as e:
                logger.error("Unable to disconnect from opcua server: ", e)

            logger.info('Prepare to renew the subscription process to the OPC UA server in 11 seconds.')
            await asyncio.sleep(10)

        finally:
            await asyncio.sleep(1)

The state_worker() is just to write in a file using an async way from asyncio.

MonitoredItem settings:

handles: List[int] = await self.subscription._subscribe(
            nodes=nodes,
            # attr=ua.AttributeIds.Value,
            mfilter=ua.DataChangeFilter(ua.DataChangeTrigger.StatusValueTimestamp),
            queuesize=1,
            # monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting,
            sampling_interval=2000
        )

In the debug logs I can see a stable state of the program:

INFO - Publish callback called with result...
DEBUG - publish []
DEBUG - Sending: PublishRequest(TypeId...

I noticed that if I re-new the subscription with a different "PublishingInterval" it works again but still bug after a certain time.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions