diff --git a/globalConfigs.py b/globalConfigs.py index f098d7f..78a2cc5 100644 --- a/globalConfigs.py +++ b/globalConfigs.py @@ -9,20 +9,16 @@ HOST_NAME = socket.gethostname() CLIENT_APP_URI = f"urn:{HOST_NAME}:client:hmi" # OPC UA settings -OPC_UA_URL = "opc.tcp://192.168.42.1:4840" -OPC_UA_USER = "hmi" -OPC_UA_PW = "hmi" -#OPC_UA_URL = "opc.tcp://192.168.56.101:4840" -#OPC_UA_USER = "twincat" -#OPC_UA_PW = "twincat" +OPC_UA_URL = "opc.tcp://192.168.0.148:4840" +OPC_UA_USER = "Administrator" +OPC_UA_PW = "11sep8819" NAMESPACE = "urn:BeckhoffAutomation:Ua:PLC1" -NS_IDX = 0 + +EVENT_LOGGER_NODE_ID = "ns=8;s=eventlogger" +TC_EVENT_DATATYPE_NODE_ID = "ns=5;i=4000" CHECK_INTERVAL = 1.0 # seconds RECONNECT_INTERVAL = 5.0 # seconds # Publishing interval to the clients (rate limiting) -PUBLISH_INTERVAL = 0.1 # 200 ms - -# Path to the nodes list -NODE_IDS_FILE_PATH = Path(CERT_BASE / f"nodeList.txt") \ No newline at end of file +PUBLISH_INTERVAL = 0.2 # 200 ms \ No newline at end of file diff --git a/main.py b/main.py index 5943979..e555aa2 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,6 @@ -from fastapi import FastAPI #, HTTPException +from fastapi import FastAPI from routers import websocket from fastapi.middleware.cors import CORSMiddleware -#from starlette import status -#from fastapi.staticfiles import StaticFiles -#from models import SemiAutoControl, SemiAutoBatteryStatus import asyncio import msgpack @@ -11,7 +8,7 @@ import msgpack #from globalData import node_storage, clients import globalData as gd import globalConfigs as gc -from opcuaHandler import initOPCUAConnection, connection_watcher +from opcuaHandler import initOPCUAConnection, connection_watcher2 update_buffer = {} @@ -23,6 +20,7 @@ async def data_consumer(queue): update_buffer[nodeid] = val queue.task_done() + async def event_consumer(queue): while True: node_id, event_data = await queue.get() @@ -40,6 +38,7 @@ async def event_consumer(queue): await ws.send_bytes(msgpack.packb({"e": {node_id: event_data}})) queue.task_done() + async def broadcast_deltas(changes: dict): payload = msgpack.packb({"u": changes}) for ws, subscribed_tags in list(gd.clients.items()): @@ -47,6 +46,7 @@ async def broadcast_deltas(changes: dict): if relevant: await ws.send_bytes(msgpack.packb({"u": relevant})) + async def broadcast_loop(): global update_buffer while True: @@ -68,15 +68,15 @@ async def lifespan(app: FastAPI): """ FastAPI lifespan context — creates the client and starts the watcher.loading="warning" """ - await initOPCUAConnection() + #await initOPCUAConnection() # Launch the background watcher task for the opc ua connection - watcher_task = asyncio.create_task(connection_watcher()) + watcher_task = asyncio.create_task(connection_watcher2()) # Lunch data consumer task for node value changed events - consumer_task = asyncio.create_task(data_consumer(gd.queue)) - event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue)) - broadcast_task = asyncio.create_task(broadcast_loop()) + #consumer_task = asyncio.create_task(data_consumer(gd.queue)) + #event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue)) + #broadcast_task = asyncio.create_task(broadcast_loop()) yield # Application runs while this yields @@ -91,6 +91,7 @@ async def lifespan(app: FastAPI): await consumer_task await broadcast_task await event_consumer_task + gd.opc_ua_client.disconnect() except asyncio.CancelledError: pass @@ -99,7 +100,6 @@ app = FastAPI(lifespan=lifespan) origins = [ "http://localhost:3000", "http://127.0.0.1:3000", - # Fügen Sie hier weitere erforderlich Ursprünge hinzu ] app.add_middleware( CORSMiddleware, @@ -109,7 +109,6 @@ app.add_middleware( allow_headers=["*"], # Erlaubt alle Header ) app.include_router(websocket.router) -#app.include_router(semiAuto.router) # Serve the built frontend -#app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") +#app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") \ No newline at end of file diff --git a/opcuaHandler.py b/opcuaHandler.py index 2fb3765..93b4555 100644 --- a/opcuaHandler.py +++ b/opcuaHandler.py @@ -13,6 +13,8 @@ import globalData as gd new_value_event = asyncio.Event() # signals when a new value arrives +session_id = 0 + class SubHandler: """ Subscription Handler. To receive events from server for a subscription @@ -64,25 +66,9 @@ class SubHandler: """ print("status_notification %s", status) -# def load_node_ids(): -# if not gc.NODE_IDS_FILE_PATH.exists(): -# return [] - -# with open(gc.NODE_IDS_FILE_PATH, "r", encoding="utf-8") as f: -# node_ids = [line.strip() for line in f if line.strip() and not line.startswith("#")] - -# if not node_ids: -# raise ValueError(f"No node IDs found in {gc.NODE_IDS_FILE_PATH}") -# return node_ids - -# async def createSubscriptions(): - -# subscription = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue)) -# node_ids = load_node_ids() -# nodes = [gd.opc_ua_client.get_node(f"{nid}") for nid in node_ids] -# await subscription.subscribe_data_change(nodes) async def connection_watcher2(): + await initOPCUAConnection() connected = False @@ -93,35 +79,32 @@ async def connection_watcher2(): await gd.opc_ua_client.connect() connected = True except Exception: - print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") - await asyncio.sleep(gc.RECONNECT_INTERVAL) - - - # Create subscriptions - gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue)) - - # Create an event subscription - event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger") - await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"]) - - # Periodically check for connection - state = 0 - while True: - match (state): - case 0: - try: - await gd.opc_ua_client.connect_sessionless() - await gd.opc_ua_client.create_session() - except Exception: - - - await asyncio.sleep(gc.CHECK_INTERVAL) - try: - await gd.opc_ua_client.check_connection() - except ua.uaerrors._auto.BadConnectionClosed: connected = False print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") await asyncio.sleep(gc.RECONNECT_INTERVAL) + + if connected: + await createSubscriptions() + + # Periodically check for connection + while True: + if connected: + try: + await gd.opc_ua_client.check_connection() + except ua.uaerrors._auto.BadConnectionClosed: + connected = False + gd.opc_ua_client.disconnect_sessionless() + print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") + await asyncio.sleep(gc.RECONNECT_INTERVAL) + else: + try: + await gd.opc_ua_client.connect_sessionless() + result = await gd.opc_ua_client.uaclient.activate_session() + print(result) + connected = True + except Exception: + print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") + await asyncio.sleep(gc.RECONNECT_INTERVAL) @@ -182,10 +165,29 @@ async def initOPCUAConnection(): gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI + + result = await gd.opc_ua_client.get_endpoints() + print(result) + + gd.opc_ua_client.set_user(gc.OPC_UA_USER) + gd.opc_ua_client.set_password(gc.OPC_UA_PW) await gd.opc_ua_client.set_security( SecurityPolicyBasic256Sha256, certificate=str(gc.CERT), private_key=str(gc.PRIVATE_KEY) ) - gd.opc_ua_client.set_user(gc.OPC_UA_USER) - gd.opc_ua_client.set_password(gc.OPC_UA_PW) \ No newline at end of file + + + +async def createSubscriptions(): + sub_param = ua.CreateSubscriptionParameters + sub_param.RequestedPublishingInterval = gd.PUBLISH_INTERVAL + sub_param.RequestedMaxKeepAliveCount = 5 + sub_param.RequestedLifetimeCount = 10 + sub_param.MaxNotificationsPerPublish = 0 + + gd.subscription_handler = await gd.opc_ua_client.create_subscription(sub_param, SubHandler(gd.queue, gd.event_queue)) + + # Create an event subscription + event_logger_node = gd.opc_ua_client.get_node(gc.EVENT_LOGGER_NODE_ID) + await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, gc.TC_EVENT_DATATYPE_NODE_ID])