import asyncio from asyncua import Client, Node from asyncua import ua from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256 url = "opc.tcp://10.103.32.50:4840" namespace = "urn:BeckhoffAutomation:Ua:PLC1" cert = "uaexpert.der" key = "uaexpert_key.pem" async def crawl(node : Node, client): children = await node.get_children() for childId in children: ch = client.get_node(childId) if await ch.read_node_class() == ua.NodeClass.Object: await crawl(ch, client) elif await ch.read_node_class() == ua.NodeClass.Variable: if ch.nodeid.NamespaceIndex == 4: await WriteNodeData(ch) async def getAllScadaStructures(node: Node, client, node_list: list) -> list: children = await node.get_children() for childId in children: ch = client.get_node(childId) node_class = await ch.read_node_class() if node_class == ua.NodeClass.Object: node_list = await getAllScadaStructures(ch, client, node_list) elif node_class == ua.NodeClass.Variable: if ch.nodeid.NamespaceIndex == 4: display_name = await ch.read_display_name() if display_name.Text == "rValue": parent_node = await ch.get_parent() parent_nodeid = parent_node.nodeid.to_string() if ".stSetpoint" not in parent_nodeid: unit_node = client.get_node(f"{parent_nodeid}.sUnit") unit_node_value = await unit_node.read_value() node_list.append(f"\"{parent_nodeid}.rValue\", \"{unit_node_value}\"") return node_list return node_list async def WriteNodeData(node: Node): browse_name = await node.read_display_name() nodeId = node.nodeid.to_string() if browse_name.Text == "rValue": data_type = "float32" print(f"\"{nodeId}\", \"{browse_name.Text}\" ,\"{data_type}\"") if browse_name.Text == "sUnit": value = await node.read_value() print(f"\"{nodeId}\", \"{value}\"") async def main(): print(f"Connecting to {url} ...") client = Client(url=url) client.set_user("telegraf") client.set_password("telegraf") await client.set_security( SecurityPolicyBasic256Sha256, certificate=str(cert), private_key=str(key), server_certificate="CP-86768C.der", ) print("Connected!") async with client: root = client.get_root_node() # await crawl(root, client) node_list = [] print("Crawling opc-ua ...") node_list = await getAllScadaStructures(root, client, node_list) print(f"Crawling finished! Found {len(node_list)} entries") with open("data_list.csv", "w") as f: f.write("\"NodeId\", \"Einheit\"\n") for line in node_list: f.write(f"{line}\n") print("Finished writing file") if __name__ == "__main__": asyncio.run(main())