Rewrite to reconnect handler v2

This commit is contained in:
Matthias Heisig
2025-11-14 18:10:22 +01:00
parent a681c6da6b
commit 0695935010
3 changed files with 66 additions and 69 deletions

View File

@@ -9,20 +9,16 @@ HOST_NAME = socket.gethostname()
CLIENT_APP_URI = f"urn:{HOST_NAME}:client:hmi" CLIENT_APP_URI = f"urn:{HOST_NAME}:client:hmi"
# OPC UA settings # OPC UA settings
OPC_UA_URL = "opc.tcp://192.168.42.1:4840" OPC_UA_URL = "opc.tcp://192.168.0.148:4840"
OPC_UA_USER = "hmi" OPC_UA_USER = "Administrator"
OPC_UA_PW = "hmi" OPC_UA_PW = "11sep8819"
#OPC_UA_URL = "opc.tcp://192.168.56.101:4840"
#OPC_UA_USER = "twincat"
#OPC_UA_PW = "twincat"
NAMESPACE = "urn:BeckhoffAutomation:Ua:PLC1" NAMESPACE = "urn:BeckhoffAutomation:Ua:PLC1"
NS_IDX = 0
EVENT_LOGGER_NODE_ID = "ns=8;s=eventlogger"
TC_EVENT_DATATYPE_NODE_ID = "ns=5;i=4000"
CHECK_INTERVAL = 1.0 # seconds CHECK_INTERVAL = 1.0 # seconds
RECONNECT_INTERVAL = 5.0 # seconds RECONNECT_INTERVAL = 5.0 # seconds
# Publishing interval to the clients (rate limiting) # Publishing interval to the clients (rate limiting)
PUBLISH_INTERVAL = 0.1 # 200 ms PUBLISH_INTERVAL = 0.2 # 200 ms
# Path to the nodes list
NODE_IDS_FILE_PATH = Path(CERT_BASE / f"nodeList.txt")

25
main.py
View File

@@ -1,9 +1,6 @@
from fastapi import FastAPI #, HTTPException from fastapi import FastAPI
from routers import websocket from routers import websocket
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
#from starlette import status
#from fastapi.staticfiles import StaticFiles
#from models import SemiAutoControl, SemiAutoBatteryStatus
import asyncio import asyncio
import msgpack import msgpack
@@ -11,7 +8,7 @@ import msgpack
#from globalData import node_storage, clients #from globalData import node_storage, clients
import globalData as gd import globalData as gd
import globalConfigs as gc import globalConfigs as gc
from opcuaHandler import initOPCUAConnection, connection_watcher from opcuaHandler import initOPCUAConnection, connection_watcher2
update_buffer = {} update_buffer = {}
@@ -23,6 +20,7 @@ async def data_consumer(queue):
update_buffer[nodeid] = val update_buffer[nodeid] = val
queue.task_done() queue.task_done()
async def event_consumer(queue): async def event_consumer(queue):
while True: while True:
node_id, event_data = await queue.get() node_id, event_data = await queue.get()
@@ -40,6 +38,7 @@ async def event_consumer(queue):
await ws.send_bytes(msgpack.packb({"e": {node_id: event_data}})) await ws.send_bytes(msgpack.packb({"e": {node_id: event_data}}))
queue.task_done() queue.task_done()
async def broadcast_deltas(changes: dict): async def broadcast_deltas(changes: dict):
payload = msgpack.packb({"u": changes}) payload = msgpack.packb({"u": changes})
for ws, subscribed_tags in list(gd.clients.items()): for ws, subscribed_tags in list(gd.clients.items()):
@@ -47,6 +46,7 @@ async def broadcast_deltas(changes: dict):
if relevant: if relevant:
await ws.send_bytes(msgpack.packb({"u": relevant})) await ws.send_bytes(msgpack.packb({"u": relevant}))
async def broadcast_loop(): async def broadcast_loop():
global update_buffer global update_buffer
while True: while True:
@@ -68,15 +68,15 @@ async def lifespan(app: FastAPI):
""" """
FastAPI lifespan context — creates the client and starts the watcher.loading="warning" FastAPI lifespan context — creates the client and starts the watcher.loading="warning"
""" """
await initOPCUAConnection() #await initOPCUAConnection()
# Launch the background watcher task for the opc ua connection # Launch the background watcher task for the opc ua connection
watcher_task = asyncio.create_task(connection_watcher()) watcher_task = asyncio.create_task(connection_watcher2())
# Lunch data consumer task for node value changed events # Lunch data consumer task for node value changed events
consumer_task = asyncio.create_task(data_consumer(gd.queue)) #consumer_task = asyncio.create_task(data_consumer(gd.queue))
event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue)) #event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue))
broadcast_task = asyncio.create_task(broadcast_loop()) #broadcast_task = asyncio.create_task(broadcast_loop())
yield # Application runs while this yields yield # Application runs while this yields
@@ -91,6 +91,7 @@ async def lifespan(app: FastAPI):
await consumer_task await consumer_task
await broadcast_task await broadcast_task
await event_consumer_task await event_consumer_task
gd.opc_ua_client.disconnect()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
@@ -99,7 +100,6 @@ app = FastAPI(lifespan=lifespan)
origins = [ origins = [
"http://localhost:3000", "http://localhost:3000",
"http://127.0.0.1:3000", "http://127.0.0.1:3000",
# Fügen Sie hier weitere erforderlich Ursprünge hinzu
] ]
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
@@ -109,7 +109,6 @@ app.add_middleware(
allow_headers=["*"], # Erlaubt alle Header allow_headers=["*"], # Erlaubt alle Header
) )
app.include_router(websocket.router) app.include_router(websocket.router)
#app.include_router(semiAuto.router)
# Serve the built frontend # Serve the built frontend
#app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") #app.mount("/assets", StaticFiles(directory="static/assets"), name="assets")

View File

@@ -13,6 +13,8 @@ import globalData as gd
new_value_event = asyncio.Event() # signals when a new value arrives new_value_event = asyncio.Event() # signals when a new value arrives
session_id = 0
class SubHandler: class SubHandler:
""" """
Subscription Handler. To receive events from server for a subscription Subscription Handler. To receive events from server for a subscription
@@ -64,25 +66,9 @@ class SubHandler:
""" """
print("status_notification %s", status) print("status_notification %s", status)
# def load_node_ids():
# if not gc.NODE_IDS_FILE_PATH.exists():
# return []
# with open(gc.NODE_IDS_FILE_PATH, "r", encoding="utf-8") as f:
# node_ids = [line.strip() for line in f if line.strip() and not line.startswith("#")]
# if not node_ids:
# raise ValueError(f"No node IDs found in {gc.NODE_IDS_FILE_PATH}")
# return node_ids
# async def createSubscriptions():
# subscription = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue))
# node_ids = load_node_ids()
# nodes = [gd.opc_ua_client.get_node(f"{nid}") for nid in node_ids]
# await subscription.subscribe_data_change(nodes)
async def connection_watcher2(): async def connection_watcher2():
await initOPCUAConnection() await initOPCUAConnection()
connected = False connected = False
@@ -93,35 +79,32 @@ async def connection_watcher2():
await gd.opc_ua_client.connect() await gd.opc_ua_client.connect()
connected = True connected = True
except Exception: except Exception:
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
# Create subscriptions
gd.subscription_handler = await gd.opc_ua_client.create_subscription(250, SubHandler(gd.queue, gd.event_queue))
# Create an event subscription
event_logger_node = gd.opc_ua_client.get_node("ns=8;s=eventlogger")
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, "ns=5;i=4000"])
# Periodically check for connection
state = 0
while True:
match (state):
case 0:
try:
await gd.opc_ua_client.connect_sessionless()
await gd.opc_ua_client.create_session()
except Exception:
await asyncio.sleep(gc.CHECK_INTERVAL)
try:
await gd.opc_ua_client.check_connection()
except ua.uaerrors._auto.BadConnectionClosed:
connected = False connected = False
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds") print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL) await asyncio.sleep(gc.RECONNECT_INTERVAL)
if connected:
await createSubscriptions()
# Periodically check for connection
while True:
if connected:
try:
await gd.opc_ua_client.check_connection()
except ua.uaerrors._auto.BadConnectionClosed:
connected = False
gd.opc_ua_client.disconnect_sessionless()
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
else:
try:
await gd.opc_ua_client.connect_sessionless()
result = await gd.opc_ua_client.uaclient.activate_session()
print(result)
connected = True
except Exception:
print(f"Reconnecting in {gc.RECONNECT_INTERVAL} seconds")
await asyncio.sleep(gc.RECONNECT_INTERVAL)
@@ -182,10 +165,29 @@ async def initOPCUAConnection():
gd.opc_ua_client = Client(url=gc.OPC_UA_URL) gd.opc_ua_client = Client(url=gc.OPC_UA_URL)
gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI gd.opc_ua_client.application_uri = gc.CLIENT_APP_URI
result = await gd.opc_ua_client.get_endpoints()
print(result)
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)
await gd.opc_ua_client.set_security( await gd.opc_ua_client.set_security(
SecurityPolicyBasic256Sha256, SecurityPolicyBasic256Sha256,
certificate=str(gc.CERT), certificate=str(gc.CERT),
private_key=str(gc.PRIVATE_KEY) private_key=str(gc.PRIVATE_KEY)
) )
gd.opc_ua_client.set_user(gc.OPC_UA_USER)
gd.opc_ua_client.set_password(gc.OPC_UA_PW)
async def createSubscriptions():
sub_param = ua.CreateSubscriptionParameters
sub_param.RequestedPublishingInterval = gd.PUBLISH_INTERVAL
sub_param.RequestedMaxKeepAliveCount = 5
sub_param.RequestedLifetimeCount = 10
sub_param.MaxNotificationsPerPublish = 0
gd.subscription_handler = await gd.opc_ua_client.create_subscription(sub_param, SubHandler(gd.queue, gd.event_queue))
# Create an event subscription
event_logger_node = gd.opc_ua_client.get_node(gc.EVENT_LOGGER_NODE_ID)
await gd.subscription_handler.subscribe_events(event_logger_node, evtypes=[ua.ObjectIds.BaseEventType, gc.TC_EVENT_DATATYPE_NODE_ID])