Files
opcua_fastapi_backend/opcuaHandler.py
2025-11-14 18:10:22 +01:00

194 lines
6.9 KiB
Python

import os
from asyncua import Client, ua, Node
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from cryptography.x509.oid import ExtendedKeyUsageOID
import asyncio
import globalConfigs as gc
import globalData as gd
new_value_event = asyncio.Event() # signals when a new value arrives
session_id = 0
class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""
def __init__(self, queue, event_queue):
self.queue = queue
self.event_queue = event_queue
def datachange_notification(self, node: Node, val, data):
"""
called for every datachange notification from server
"""
self.queue.put_nowait((node.nodeid.to_string(), val))
def event_notification(self, event: ua.EventNotificationList):
"""
called for every event notification from server
"""
# print("=== Event empfangen ===")
# print("NodeId:", event.NodeId.Identifier)
# print("Severity:", event.Severity)
# print("Message:", event.Message.Text)
# print("ActiveState:", getattr(event, "ActiveState/Id"))
# print("ActiveStateTransTime:", getattr(event, "ActiveState/TransitionTime"))
# print("ConfirmedState:", getattr(event, "ConfirmedState/Id"))
# print("ConfirmedStateTransTime:", getattr(event, "ConfirmedState/TransitionTime"))
# print("Retain:", event.Retain)
# print("------------------------")
event_data = {
"Severity": event.Severity,
"Message": event.Message.Text,
"ActiveState": getattr(event, "ActiveState/Id"),
"ConfirmedState": getattr(event, "ConfirmedState/Id"),
"ActiveStateTransTime": getattr(event, "ActiveState/TransitionTime").isoformat(),
"ConfirmedStateTransTime": getattr(event, "ConfirmedState/TransitionTime").isoformat(),
"Retain": event.Retain
}
print("Got event")
self.event_queue.put_nowait((event.NodeId.Identifier, event_data))
def status_change_notification(self, status: ua.StatusChangeNotification):
"""
called for every status change notification from server
"""
print("status_notification %s", status)
async def connection_watcher2():
await initOPCUAConnection()
connected = False
while not connected:
try:
# In the first round we can use the normal connect
await gd.opc_ua_client.connect()
connected = True
except Exception:
connected = False
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
if connected:
await createSubscriptions()
# Periodically check for connection
while True:
if connected:
try:
await gd.opc_ua_client.check_connection()
except ua.uaerrors._auto.BadConnectionClosed:
connected = False
gd.opc_ua_client.disconnect_sessionless()
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
else:
try:
await gd.opc_ua_client.connect_sessionless()
result = await gd.opc_ua_client.uaclient.activate_session()
print(result)
connected = True
except Exception:
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
async def connection_watcher():
"""
Background task to keep checking OPC UA client connection.
Reconnects automatically if disconnected.
"""
while True:
try:
gd.opc_ua_client = Client(url=gc.OPC_UA_URL)
gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI
await gd.opc_ua_client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(gc.CERT),
private_key=str(gc.PRIVATE_KEY)
)
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)
async with gd.opc_ua_client:
print("Connected")
#print("Creating subscriptions")
#await createSubscriptions()
gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue))
event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger")
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"])
print("Registered events")
while True:
await asyncio.sleep(gc.CHECK_INTERVAL)
await gd.opc_ua_client.check_connection() # Throws a exception if connection is lost
except ua.uaerrors._auto.BadConnectionClosed:
print("Lost connection")
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
except Exception as e:
print(e)
break
async def initOPCUAConnection():
if (not os.path.isfile(gc.CERT)) or (not os.path.isfile(gc.PRIVATE_KEY)):
await setup_self_signed_certificate(
gc.PRIVATE_KEY,
gc.CERT,
gc.CLIENT_APP_URI,
gc.HOST_NAME,
[ExtendedKeyUsageOID.CLIENT_AUTH],
{
"countryName": "DE",
"stateOrProvinceName": "NRW",
"localityName": "HMI backend",
"organizationName": "Heisig GmbH",
},
)
gd.opc_ua_client = Client(url=gc.OPC_UA_URL)
gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI
result = await gd.opc_ua_client.get_endpoints()
print(result)
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)
await gd.opc_ua_client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(gc.CERT),
private_key=str(gc.PRIVATE_KEY)
)
async def createSubscriptions():
sub_param = ua.CreateSubscriptionParameters
sub_param.RequestedPublishingInterval = gd.PUBLISH_INTERVAL
sub_param.RequestedMaxKeepAliveCount = 5
sub_param.RequestedLifetimeCount = 10
sub_param.MaxNotificationsPerPublish = 0
gd.subscription_handler = await gd.opc_ua_client.create_subscription(sub_param, SubHandler(gd.queue, gd.event_queue))
# Create an event subscription
event_logger_node = gd.opc_ua_client.get_node(gc.EVENT_LOGGER_NODE_ID)
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, gc.TC_EVENT_DATATYPE_NODE_ID])