Files
polaris_hmi/backend/main.py
2025-11-17 08:44:00 +01:00

115 lines
3.4 KiB
Python

from fastapi import FastAPI
from routers import websocket
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import msgpack
#from globalData import node_storage, clients
import globalData as gd
import globalConfigs as gc
from opcuaHandler import connection_watcher2
update_buffer = {}
async def data_consumer(queue):
"""Asynchronous consumer that stores the latest values."""
global update_buffer
while True:
nodeid, val = await queue.get()
gd.node_storage[nodeid] = val
update_buffer[nodeid] = val
queue.task_done()
async def event_consumer(queue):
while True:
node_id, event_data = await queue.get()
if node_id in gd.event_storage:
event_data.Message = gd.event_storage[node_id].Message
gd.event_storage[node_id].update(event_data)
else:
gd.event_storage[node_id] = event_data
print(gd.event_storage)
for ws in list(gd.clients):
print("Sending event:", {"e": {node_id: event_data}})
await ws.send_bytes(msgpack.packb({"e": {node_id: event_data}}))
queue.task_done()
async def broadcast_deltas(changes: dict):
payload = msgpack.packb({"u": changes})
for ws, subscribed_tags in list(gd.clients.items()):
relevant = {tag: val for tag, val in changes.items() if tag in subscribed_tags}
if relevant:
await ws.send_bytes(msgpack.packb({"u": relevant}))
async def broadcast_loop():
global update_buffer
while True:
await asyncio.sleep(gc.PUBLISH_INTERVAL)
if not update_buffer:
continue
changes = dict(update_buffer)
update_buffer.clear()
for ws, subscribed_tags in list(gd.clients.items()):
relevant = {tag: val for tag, val in changes.items() if tag in subscribed_tags}
if relevant:
await ws.send_bytes(msgpack.packb({"u": relevant}))
async def lifespan(app: FastAPI):
"""
FastAPI lifespan context — creates the client and starts the watcher.loading="warning"
"""
#await initOPCUAConnection()
# Launch the background watcher task for the opc ua connection
watcher_task = asyncio.create_task(connection_watcher2())
# Lunch data consumer task for node value changed events
consumer_task = asyncio.create_task(data_consumer(gd.queue))
event_consumer_task = asyncio.create_task(event_consumer(gd.event_queue))
broadcast_task = asyncio.create_task(broadcast_loop())
yield # Application runs while this yields
# Cleanup on shutdown
watcher_task.cancel()
broadcast_task.cancel()
consumer_task.cancel()
event_consumer_task.cancel()
try:
await watcher_task
await consumer_task
await broadcast_task
await event_consumer_task
await gd.opc_ua_client.disconnect()
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
origins = [
"http://localhost:3000",
"http://127.0.0.1:3000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"], # Erlaubt alle HTTP-Methoden (GET, POST, etc.)
allow_headers=["*"], # Erlaubt alle Header
)
app.include_router(websocket.router)
# Serve the built frontend
#app.mount("/assets", StaticFiles(directory="static/assets"), name="assets")