import os from asyncua import Client, ua, Node from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256 from asyncua.crypto.cert_gen import setup_self_signed_certificate from cryptography.x509.oid import ExtendedKeyUsageOID import asyncio import globalConfigs as gc import globalData as gd new_value_event = asyncio.Event() # signals when a new value arrives session_id = 0 class SubHandler: """ Subscription Handler. To receive events from server for a subscription This class is just a sample class. Whatever class having these methods can be used """ def __init__(self, queue, event_queue): self.queue = queue self.event_queue = event_queue def datachange_notification(self, node: Node, val, data): """ called for every datachange notification from server """ self.queue.put_nowait((node.nodeid.to_string(), val)) def event_notification(self, event: ua.EventNotificationList): """ called for every event notification from server """ # print("=== Event empfangen ===") # print("NodeId:", event.NodeId.Identifier) # print("Severity:", event.Severity) # print("Message:", event.Message.Text) # print("ActiveState:", getattr(event, "ActiveState/Id")) # print("ActiveStateTransTime:", getattr(event, "ActiveState/TransitionTime")) # print("ConfirmedState:", getattr(event, "ConfirmedState/Id")) # print("ConfirmedStateTransTime:", getattr(event, "ConfirmedState/TransitionTime")) # print("Retain:", event.Retain) # print("------------------------") event_data = { "Severity": event.Severity, "Message": event.Message.Text, "ActiveState": getattr(event, "ActiveState/Id"), "ConfirmedState": getattr(event, "ConfirmedState/Id"), "ActiveStateTransTime": getattr(event, "ActiveState/TransitionTime").isoformat(), "ConfirmedStateTransTime": getattr(event, "ConfirmedState/TransitionTime").isoformat(), "Retain": event.Retain } print("Got event") self.event_queue.put_nowait((event.NodeId.Identifier, event_data)) def status_change_notification(self, status: ua.StatusChangeNotification): """ called for every status change notification from server """ print("status_notification %s", status) async def connection_watcher2(): await initOPCUAConnection() connected = False while not connected: try: # In the first round we can use the normal connect await gd.opc_ua_client.connect() connected = True except Exception: connected = False print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) if connected: await createSubscriptions() # Periodically check for connection while True: if connected: try: await gd.opc_ua_client.check_connection() except ua.uaerrors._auto.BadConnectionClosed: connected = False gd.opc_ua_client.disconnect_sessionless() print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) else: try: await gd.opc_ua_client.connect_sessionless() result = await gd.opc_ua_client.uaclient.activate_session() print(result) connected = True except Exception: print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) async def connection_watcher(): """ Background task to keep checking OPC UA client connection. Reconnects automatically if disconnected. """ while True: try: gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI await gd.opc_ua_client.set_security( SecurityPolicyBasic256Sha256, certificate=str(gc.CERT), private_key=str(gc.PRIVATE_KEY) ) gd.opc_ua_client.set_user(gc.OPC_UA_USER) gd.opc_ua_client.set_password(gc.OPC_UA_PW) async with gd.opc_ua_client: print("Connected") #print("Creating subscriptions") #await createSubscriptions() gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue)) event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger") await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"]) print("Registered events") while True: await asyncio.sleep(gc.CHECK_INTERVAL) await gd.opc_ua_client.check_connection() # Throws a exception if connection is lost except ua.uaerrors._auto.BadConnectionClosed: print("Lost connection") print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) except Exception as e: print(e) break async def initOPCUAConnection(): if (not os.path.isfile(gc.CERT)) or (not os.path.isfile(gc.PRIVATE_KEY)): await setup_self_signed_certificate( gc.PRIVATE_KEY, gc.CERT, gc.CLIENT_APP_URI, gc.HOST_NAME, [ExtendedKeyUsageOID.CLIENT_AUTH], { "countryName": "DE", "stateOrProvinceName": "NRW", "localityName": "HMI backend", "organizationName": "Heisig GmbH", }, ) gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI result = await gd.opc_ua_client.get_endpoints() print(result) gd.opc_ua_client.set_user(gc.OPC_UA_USER) gd.opc_ua_client.set_password(gc.OPC_UA_PW) await gd.opc_ua_client.set_security( SecurityPolicyBasic256Sha256, certificate=str(gc.CERT), private_key=str(gc.PRIVATE_KEY) ) async def createSubscriptions(): sub_param = ua.CreateSubscriptionParameters sub_param.RequestedPublishingInterval = gd.PUBLISH_INTERVAL sub_param.RequestedMaxKeepAliveCount = 5 sub_param.RequestedLifetimeCount = 10 sub_param.MaxNotificationsPerPublish = 0 gd.subscription_handler = await gd.opc_ua_client.create_subscription(sub_param, SubHandler(gd.queue, gd.event_queue)) # Create an event subscription event_logger_node = gd.opc_ua_client.get_node(gc.EVENT_LOGGER_NODE_ID) await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, gc.TC_EVENT_DATATYPE_NODE_ID])