from fastapi import FastAPI from routers import websocket from fastapi.middleware.cors import CORSMiddleware import asyncio import msgpack #from globalData import node_storage, clients import globalData as gd import globalConfigs as gc from opcuaHandler import initOPCUAConnection, connection_watcher2 update_buffer = {} async def data_consumer(queue): """Asynchronous consumer that stores the latest values.""" while True: nodeid, val = await queue.get() gd.node_storage[nodeid] = val update_buffer[nodeid] = val queue.task_done() async def event_consumer(queue): while True: node_id, event_data = await queue.get() print("Got new event", event_data) if node_id in gd.event_storage: event_data.Message = gd.event_storage[node_id].Message gd.event_storage[node_id].update(event_data) else: gd.event_storage[node_id] = event_data print(gd.event_storage) for ws in list(gd.clients): print("Sending event:", {"e": {node_id: event_data}}) await ws.send_bytes(msgpack.packb({"e": {node_id: event_data}})) queue.task_done() async def broadcast_deltas(changes: dict): payload = msgpack.packb({"u": changes}) for ws, subscribed_tags in list(gd.clients.items()): relevant = {tag: val for tag, val in changes.items() if tag in subscribed_tags} if relevant: await ws.send_bytes(msgpack.packb({"u": relevant})) async def broadcast_loop(): global update_buffer while True: await asyncio.sleep(gc.PUBLISH_INTERVAL) if not update_buffer: continue changes = dict(update_buffer) update_buffer.clear() for ws, subscribed_tags in list(gd.clients.items()): relevant = {tag: val for tag, val in changes.items() if tag in subscribed_tags} if relevant: await ws.send_bytes(msgpack.packb({"u": relevant})) async def lifespan(app: FastAPI): """ FastAPI lifespan context — creates the client and starts the watcher.loading="warning" """ #await initOPCUAConnection() # Launch the background watcher task for the opc ua connection watcher_task = asyncio.create_task(connection_watcher2()) # Lunch data consumer task for node value changed events #consumer_task = asyncio.create_task(data_consumer(gd.queue)) #event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue)) #broadcast_task = asyncio.create_task(broadcast_loop()) yield # Application runs while this yields # Cleanup on shutdown watcher_task.cancel() broadcast_task.cancel() consumer_task.cancel() event_consumer_task.cancel() try: await watcher_task await consumer_task await broadcast_task await event_consumer_task gd.opc_ua_client.disconnect() except asyncio.CancelledError: pass app = FastAPI(lifespan=lifespan) origins = [ "http://localhost:3000", "http://127.0.0.1:3000", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], # Erlaubt alle HTTP-Methoden (GET, POST, etc.) allow_headers=["*"], # Erlaubt alle Header ) app.include_router(websocket.router) # Serve the built frontend #app.mount("/assets", StaticFiles(directory="static/assets"), name="assets")