tests/ide_tests/opcua_browse.sikuli/opcua_service.bash
changeset 3820 46f3ca3f0157
parent 3718 7841b651d601
equal deleted inserted replaced
3819:4582f0fcf4c4 3820:46f3ca3f0157
     6 exec $BEREMIZPYTHONPATH - << EOF
     6 exec $BEREMIZPYTHONPATH - << EOF
     7 
     7 
     8 import sys
     8 import sys
     9 import os
     9 import os
    10 import time
    10 import time
       
    11 import asyncio
    11 
    12 
    12 from opcua import ua, Server
    13 from asyncua import Server
    13 
    14 
    14 server = Server()
    15 async def main():
    15 host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
    16     server = Server()
    16 endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
    17     host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
    17 server.set_endpoint(endpoint)
    18     endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
       
    19     await server.init()
       
    20     server.set_endpoint(endpoint)
    18 
    21 
    19 uri = "http://beremiz.github.io"
    22     uri = "http://beremiz.github.io"
    20 idx = server.register_namespace(uri)
    23     idx = await server.register_namespace(uri)
    21 
    24 
    22 objects = server.get_objects_node()
    25     objects = server.get_objects_node()
    23 
    26 
    24 testobj = objects.add_object(idx, "TestObject")
    27     testobj = await objects.add_object(idx, "TestObject")
    25 testvarout = testobj.add_variable(idx, "TestOut", 1.2)
    28     testvarout = await testobj.add_variable(idx, "TestOut", 1.2)
    26 testvar = testobj.add_variable(idx, "TestIn", 5.6)
    29     testvar = await testobj.add_variable(idx, "TestIn", 5.6)
    27 testvar.set_writable()
    30     await testvar.set_writable()
    28 
    31 
    29 server.start()
    32     await server.start()
       
    33     try:
       
    34         while True:
       
    35             await asyncio.sleep(1)
       
    36             inval = await testvar.get_value()
       
    37             print(inval)
       
    38             await testvarout.set_value(inval*2)
       
    39             sys.stdout.flush()
       
    40     finally:
       
    41         await server.stop()
    30 
    42 
    31 try:
    43 asyncio.run(main())
    32     while True:
    44 
    33         time.sleep(1)
       
    34         inval=testvar.get_value()
       
    35         print inval
       
    36         testvarout.set_value(inval*2)
       
    37         sys.stdout.flush()
       
    38 finally:
       
    39     server.stop()
       
    40 EOF
    45 EOF