tests/ide_tests/opcua_browse_encrypted.sikuli/opcua_service.bash
changeset 3820 46f3ca3f0157
parent 3718 7841b651d601
equal deleted inserted replaced
3819:4582f0fcf4c4 3820:46f3ca3f0157
     1 #!/bin/bash
     1 #!/bin/bash
     2 
     2 
       
     3 set -x -e
       
     4 
     3 echo "Instant encrypted OPC-UA server for test"
     5 echo "Instant encrypted OPC-UA server for test"
       
     6 
       
     7 rm -f my_cert.pem my_cert.der my_private_key.pem
     4 
     8 
     5 yes "" | openssl req -x509 -newkey rsa:2048 -keyout my_private_key.pem -out my_cert.pem \
     9 yes "" | openssl req -x509 -newkey rsa:2048 -keyout my_private_key.pem -out my_cert.pem \
     6         -days 355 -nodes -addext "subjectAltName = URI:urn:example.org:FreeOpcUa:python-opcua"
    10         -days 355 -nodes -addext "subjectAltName = URI:urn:example.org:FreeOpcUa:python-opcua"
     7 openssl x509 -outform der -in my_cert.pem -out my_cert.der
    11 openssl x509 -outform der -in my_cert.pem -out my_cert.der
     8 
    12 
     9 PROJECT_FILES_DIR=$BEREMIZPATH/tests/projects/opcua_browse_encrypted/project_files
    13 PROJECT_FILES_DIR=$BEREMIZPATH/tests/projects/opcua_browse_encrypted/project_files
    10 mkdir $PROJECT_FILES_DIR
    14 mkdir $PROJECT_FILES_DIR -p
    11 cp my_cert.der my_private_key.pem $PROJECT_FILES_DIR
    15 cp my_cert.der my_private_key.pem $PROJECT_FILES_DIR
    12 
    16 
    13 echo "CERTS READY"
    17 echo "CERTS READY"
    14 
    18 
    15 # Run server
    19 # Run server
    16 exec $BEREMIZPYTHONPATH - << EOF
    20 exec $BEREMIZPYTHONPATH - << EOF
    17 
    21 
    18 import sys
    22 import sys
    19 import os
    23 import os
    20 import time
    24 import time
       
    25 import asyncio
    21 
    26 
    22 from opcua import ua, Server
    27 from asyncua import ua, Server
       
    28 from asyncua.server.users import User, UserRole
    23 
    29 
    24 server = Server()
    30 # Asyncua can't work without (over)simple shared cerificates/privkey.
    25 host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
    31 # No user is involved in that case, but asyncua needs it.
    26 endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
    32 # Over permessive User Manager hereafter helps cuting that corner.
    27 server.set_endpoint(endpoint)
    33 class AllAdminUserManager:
       
    34     def get_user(self, iserver, username=None, password=None, certificate=None):
       
    35         return User(role=UserRole.Admin)
    28 
    36 
    29 server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
    37 async def main():
    30 server.load_certificate("my_cert.der")
    38     server = Server(user_manager=AllAdminUserManager())
    31 server.load_private_key("my_private_key.pem")
    39     host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
       
    40     endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
       
    41     await server.init()
       
    42     server.set_endpoint(endpoint)
    32 
    43 
    33 uri = "http://beremiz.github.io"
    44     server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
    34 idx = server.register_namespace(uri)
    45     await server.load_certificate("my_cert.der")
       
    46     await server.load_private_key("my_private_key.pem")
    35 
    47 
    36 objects = server.get_objects_node()
    48     uri = "http://beremiz.github.io"
       
    49     idx = await server.register_namespace(uri)
    37 
    50 
    38 testobj = objects.add_object(idx, "TestObject")
    51     objects = server.get_objects_node()
    39 testvarout = testobj.add_variable(idx, "TestOut", 1.2)
       
    40 testvar = testobj.add_variable(idx, "TestIn", 5.6)
       
    41 testvar.set_writable()
       
    42 
    52 
    43 server.start()
    53     testobj = await objects.add_object(idx, "TestObject")
       
    54     testvarout = await testobj.add_variable(idx, "TestOut", 1.2)
       
    55     testvar = await testobj.add_variable(idx, "TestIn", 5.6)
       
    56     await testvar.set_writable()
    44 
    57 
    45 try:
    58     await server.start()
    46     while True:
    59 
    47         time.sleep(1)
    60     try:
    48         inval=testvar.get_value()
    61         while True:
    49         print inval
    62             await asyncio.sleep(1)
    50         testvarout.set_value(inval*2)
    63             inval = await testvar.get_value()
    51         sys.stdout.flush()
    64             print(inval)
    52 finally:
    65             await testvarout.set_value(inval*2)
    53     server.stop()
    66             sys.stdout.flush()
       
    67     finally:
       
    68         await server.stop()
       
    69 
       
    70 asyncio.run(main())
       
    71 
    54 EOF
    72 EOF