import os from asyncua import Client, ua, Node from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256 from asyncua.crypto.cert_gen import setup_self_signed_certificate from cryptography.x509.oid import ExtendedKeyUsageOID import asyncio import globalConfigs as gc import globalData as gd new_value_event = asyncio.Event() # signals when a new value arrives class SubHandler: """ Subscription Handler. To receive events from server for a subscription This class is just a sample class. Whatever class having these methods can be used """ def __init__(self, queue, event_queue): self.queue = queue self.event_queue = event_queue def datachange_notification(self, node: Node, val, data): """ called for every datachange notification from server """ self.queue.put_nowait((node.nodeid.to_string(), val)) def event_notification(self, event: ua.EventNotificationList): """ called for every event notification from server """ # print("=== Event empfangen ===") # print("NodeId:", event.NodeId.Identifier) # print("Severity:", event.Severity) # print("Message:", event.Message.Text) # print("ActiveState:", getattr(event, "ActiveState/Id")) # print("ActiveStateTransTime:", getattr(event, "ActiveState/TransitionTime")) # print("ConfirmedState:", getattr(event, "ConfirmedState/Id")) # print("ConfirmedStateTransTime:", getattr(event, "ConfirmedState/TransitionTime")) # print("Retain:", event.Retain) # print("------------------------") event_data = { "Severity": event.Severity, "Message": event.Message.Text, "ActiveState": getattr(event, "ActiveState/Id"), "ConfirmedState": getattr(event, "ConfirmedState/Id"), "ActiveStateTransTime": getattr(event, "ActiveState/TransitionTime").isoformat(), "ConfirmedStateTransTime": getattr(event, "ConfirmedState/TransitionTime").isoformat(), "Retain": event.Retain } print("Got event") self.event_queue.put_nowait((event.NodeId.Identifier, event_data)) def status_change_notification(self, status: ua.StatusChangeNotification): """ called for every status change notification from server """ print("status_notification %s", status) # def load_node_ids(): # if not gc.NODE_IDS_FILE_PATH.exists(): # return [] # with open(gc.NODE_IDS_FILE_PATH, "r", encoding="utf-8") as f: # node_ids = [line.strip() for line in f if line.strip() and not line.startswith("#")] # if not node_ids: # raise ValueError(f"No node IDs found in {gc.NODE_IDS_FILE_PATH}") # return node_ids # async def createSubscriptions(): # subscription = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue)) # node_ids = load_node_ids() # nodes = [gd.opc_ua_client.get_node(f"{nid}") for nid in node_ids] # await subscription.subscribe_data_change(nodes) async def connection_watcher2(): await initOPCUAConnection() connected = False while not connected: try: # In the first round we can use the normal connect await gd.opc_ua_client.connect() connected = True except Exception: print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) # Create subscriptions gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue)) # Create an event subscription event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger") await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"]) # Periodically check for connection state = 0 while True: match (state): case 0: try: await gd.opc_ua_client.connect_sessionless() await gd.opc_ua_client.create_session() except Exception: await asyncio.sleep(gc.CHECK_INTERVAL) try: await gd.opc_ua_client.check_connection() except ua.uaerrors._auto.BadConnectionClosed: connected = False print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) async def connection_watcher(): """ Background task to keep checking OPC UA client connection. Reconnects automatically if disconnected. """ while True: try: gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI await gd.opc_ua_client.set_security( SecurityPolicyBasic256Sha256, certificate=str(gc.CERT), private_key=str(gc.PRIVATE_KEY) ) gd.opc_ua_client.set_user(gc.OPC_UA_USER) gd.opc_ua_client.set_password(gc.OPC_UA_PW) async with gd.opc_ua_client: print("Connected") #print("Creating subscriptions") #await createSubscriptions() gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue)) event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger") await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"]) print("Registered events") while True: await asyncio.sleep(gc.CHECK_INTERVAL) await gd.opc_ua_client.check_connection() # Throws a exception if connection is lost except ua.uaerrors._auto.BadConnectionClosed: print("Lost connection") print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) except Exception as e: print(e) break async def initOPCUAConnection(): if (not os.path.isfile(gc.CERT)) or (not os.path.isfile(gc.PRIVATE_KEY)): await setup_self_signed_certificate( gc.PRIVATE_KEY, gc.CERT, gc.CLIENT_APP_URI, gc.HOST_NAME, [ExtendedKeyUsageOID.CLIENT_AUTH], { "countryName": "DE", "stateOrProvinceName": "NRW", "localityName": "HMI backend", "organizationName": "Heisig GmbH", }, ) gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI await gd.opc_ua_client.set_security( SecurityPolicyBasic256Sha256, certificate=str(gc.CERT), private_key=str(gc.PRIVATE_KEY) ) gd.opc_ua_client.set_user(gc.OPC_UA_USER) gd.opc_ua_client.set_password(gc.OPC_UA_PW)