Files
opcua_fastapi_backend/opcuaHandler.py
2025-11-08 13:56:28 +01:00

191 lines
6.9 KiB
Python

import os
from asyncua import Client, ua, Node
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from cryptography.x509.oid import ExtendedKeyUsageOID
import asyncio
import globalConfigs as gc
import globalData as gd
new_value_event = asyncio.Event() # signals when a new value arrives
class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""
def __init__(self, queue, event_queue):
self.queue = queue
self.event_queue = event_queue
def datachange_notification(self, node: Node, val, data):
"""
called for every datachange notification from server
"""
self.queue.put_nowait((node.nodeid.to_string(), val))
def event_notification(self, event: ua.EventNotificationList):
"""
called for every event notification from server
"""
# print("=== Event empfangen ===")
# print("NodeId:", event.NodeId.Identifier)
# print("Severity:", event.Severity)
# print("Message:", event.Message.Text)
# print("ActiveState:", getattr(event, "ActiveState/Id"))
# print("ActiveStateTransTime:", getattr(event, "ActiveState/TransitionTime"))
# print("ConfirmedState:", getattr(event, "ConfirmedState/Id"))
# print("ConfirmedStateTransTime:", getattr(event, "ConfirmedState/TransitionTime"))
# print("Retain:", event.Retain)
# print("------------------------")
event_data = {
"Severity": event.Severity,
"Message": event.Message.Text,
"ActiveState": getattr(event, "ActiveState/Id"),
"ConfirmedState": getattr(event, "ConfirmedState/Id"),
"ActiveStateTransTime": getattr(event, "ActiveState/TransitionTime").isoformat(),
"ConfirmedStateTransTime": getattr(event, "ConfirmedState/TransitionTime").isoformat(),
"Retain": event.Retain
}
print("Got event")
self.event_queue.put_nowait((event.NodeId.Identifier, event_data))
def status_change_notification(self, status: ua.StatusChangeNotification):
"""
called for every status change notification from server
"""
print("status_notification %s", status)
# def load_node_ids():
# if not gc.NODE_IDS_FILE_PATH.exists():
# return []
# with open(gc.NODE_IDS_FILE_PATH, "r", encoding="utf-8") as f:
# node_ids = [line.strip() for line in f if line.strip() and not line.startswith("#")]
# if not node_ids:
# raise ValueError(f"No node IDs found in {gc.NODE_IDS_FILE_PATH}")
# return node_ids
# async def createSubscriptions():
# subscription = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue))
# node_ids = load_node_ids()
# nodes = [gd.opc_ua_client.get_node(f"{nid}") for nid in node_ids]
# await subscription.subscribe_data_change(nodes)
async def connection_watcher2():
await initOPCUAConnection()
connected = False
while not connected:
try:
# In the first round we can use the normal connect
await gd.opc_ua_client.connect()
connected = True
except Exception:
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
# Create subscriptions
gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue))
# Create an event subscription
event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger")
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"])
# Periodically check for connection
state = 0
while True:
match (state):
case 0:
try:
await gd.opc_ua_client.connect_sessionless()
await gd.opc_ua_client.create_session()
except Exception:
await asyncio.sleep(gc.CHECK_INTERVAL)
try:
await gd.opc_ua_client.check_connection()
except ua.uaerrors._auto.BadConnectionClosed:
connected = False
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
async def connection_watcher():
"""
Background task to keep checking OPC UA client connection.
Reconnects automatically if disconnected.
"""
while True:
try:
gd.opc_ua_client = Client(url=gc.OPC_UA_URL)
gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI
await gd.opc_ua_client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(gc.CERT),
private_key=str(gc.PRIVATE_KEY)
)
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)
async with gd.opc_ua_client:
print("Connected")
#print("Creating subscriptions")
#await createSubscriptions()
gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue))
event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger")
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"])
print("Registered events")
while True:
await asyncio.sleep(gc.CHECK_INTERVAL)
await gd.opc_ua_client.check_connection() # Throws a exception if connection is lost
except ua.uaerrors._auto.BadConnectionClosed:
print("Lost connection")
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
except Exception as e:
print(e)
break
async def initOPCUAConnection():
if (not os.path.isfile(gc.CERT)) or (not os.path.isfile(gc.PRIVATE_KEY)):
await setup_self_signed_certificate(
gc.PRIVATE_KEY,
gc.CERT,
gc.CLIENT_APP_URI,
gc.HOST_NAME,
[ExtendedKeyUsageOID.CLIENT_AUTH],
{
"countryName": "DE",
"stateOrProvinceName": "NRW",
"localityName": "HMI backend",
"organizationName": "Heisig GmbH",
},
)
gd.opc_ua_client = Client(url=gc.OPC_UA_URL)
gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI
await gd.opc_ua_client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(gc.CERT),
private_key=str(gc.PRIVATE_KEY)
)
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)