Files
Uniper_Sphinx_Docu/get_scada_list.py
2025-05-19 13:51:06 +02:00

81 lines
3.0 KiB
Python

import asyncio
from asyncua import Client, Node
from asyncua import ua
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
url = "opc.tcp://10.103.32.50:4840"
namespace = "urn:BeckhoffAutomation:Ua:PLC1"
cert = "uaexpert.der"
key = "uaexpert_key.pem"
async def crawl(node : Node, client):
children = await node.get_children()
for childId in children:
ch = client.get_node(childId)
if await ch.read_node_class() == ua.NodeClass.Object:
await crawl(ch, client)
elif await ch.read_node_class() == ua.NodeClass.Variable:
if ch.nodeid.NamespaceIndex == 4:
await WriteNodeData(ch)
async def getAllScadaStructures(node: Node, client, node_list: list) -> list:
children = await node.get_children()
for childId in children:
ch = client.get_node(childId)
node_class = await ch.read_node_class()
if node_class == ua.NodeClass.Object:
node_list = await getAllScadaStructures(ch, client, node_list)
elif node_class == ua.NodeClass.Variable:
if ch.nodeid.NamespaceIndex == 4:
display_name = await ch.read_display_name()
if display_name.Text == "rValue":
parent_node = await ch.get_parent()
parent_nodeid = parent_node.nodeid.to_string()
if ".stSetpoint" not in parent_nodeid:
unit_node = client.get_node(f"{parent_nodeid}.sUnit")
unit_node_value = await unit_node.read_value()
node_list.append(f"\"{parent_nodeid}.rValue\", \"{unit_node_value}\"")
return node_list
return node_list
async def WriteNodeData(node: Node):
browse_name = await node.read_display_name()
nodeId = node.nodeid.to_string()
if browse_name.Text == "rValue":
data_type = "float32"
print(f"\"{nodeId}\", \"{browse_name.Text}\" ,\"{data_type}\"")
if browse_name.Text == "sUnit":
value = await node.read_value()
print(f"\"{nodeId}\", \"{value}\"")
async def main():
print(f"Connecting to {url} ...")
client = Client(url=url)
client.set_user("telegraf")
client.set_password("telegraf")
await client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(cert),
private_key=str(key),
server_certificate="CP-86768C.der",
)
print("Connected!")
async with client:
root = client.get_root_node()
# await crawl(root, client)
node_list = []
print("Crawling opc-ua ...")
node_list = await getAllScadaStructures(root, client, node_list)
print(f"Crawling finished! Found {len(node_list)} entries")
with open("data_list.csv", "w") as f:
f.write("\"NodeId\", \"Einheit\"\n")
for line in node_list:
f.write(f"{line}\n")
print("Finished writing file")
if __name__ == "__main__":
asyncio.run(main())