from fastapi import WebSocket, APIRouter import msgpack import globalData as gd from asyncua import ua from models import WriteNodeCommand router = APIRouter() @router.websocket("/ws") async def websocket_endpoint(ws: WebSocket): global clients await ws.accept() gd.clients[ws] = set() try: while True: # empfange binäre Daten vom Client raw = await ws.receive_bytes() data = msgpack.unpackb(raw, raw=False) # Subscription handling if "subscribe" in data: gd.clients[ws].update(data["subscribe"]) diff_set = gd.subscribed_nodes.symmetric_difference(data["subscribe"]) nodes = [gd.opc_ua_client.get_node(f"{nid}") for nid in diff_set] await gd.subscription_handler.subscribe_data_change(nodes) initial_values = { nid: val for nid in gd.node_storage if (val := gd.node_storage.get(nid)) is not None } initial_events = { nid: val for nid in gd.event_storage if (val := gd.event_storage.get(nid)) is not None } # if initial_values: await ws.send_bytes(msgpack.packb({"initial": {"values": initial_values, "events": initial_events}})) elif "unsubscribe" in data: gd.clients[ws].pop(data["unsubscribe"]) elif "command" in data: await handle_command(data["command"]) except Exception: if ws in gd.clients: del gd.clients[ws] async def handle_command(cmd): try: wnc = WriteNodeCommand.model_validate(cmd) node = gd.opc_ua_client.get_node(cmd["nodeId"]) variantType = ua.VariantType(wnc.nodeType) if variantType == ua.VariantType.Float: wnc.nodeValue = float(wnc.nodeValue) dv = ua.DataValue(ua.Variant(Value=wnc.nodeValue, VariantType=variantType)) await node.write_value(value=dv) except Exception as e: print(e)