[openrtm-commit:01826] r689 - in trunk/OpenRTM-aist-Python/OpenRTM_aist: ext/ssl ext/ssl/test test
openrtm @ openrtm.org
openrtm @ openrtm.org
2016年 3月 7日 (月) 19:23:17 JST
Author: miyamoto
Date: 2016-03-07 19:23:17 +0900 (Mon, 07 Mar 2016)
New Revision: 689
Added:
trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/
trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/root.crt
trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/server.pem
trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/test_SSLTrasport.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CORBA_RTCUtil.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectPort.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured_MultiProcess.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
Log:
[compat,test,->RELENG_1_2] Added testfiles.
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/root.crt
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/root.crt (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/root.crt 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDfjCCAmYCCQDdoe79VxqYFjANBgkqhkiG9w0BAQUFADCBgDELMAkGA1UEBhMC
+SlAxDzANBgNVBAgTBlNhbXBsZTEPMA0GA1UEBxMGU2FtcGxlMQ8wDQYDVQQKEwZT
+YW1wbGUxDzANBgNVBAsTBlNhbXBsZTEPMA0GA1UEAxMGU2FtcGxlMRwwGgYJKoZI
+hvcNAQkBFg1TYW1wbGVAU2FtcGxlMB4XDTE2MDMwNzEwMTIwMFoXDTI2MDMwNTEw
+MTIwMFowgYAxCzAJBgNVBAYTAkpQMQ8wDQYDVQQIEwZTYW1wbGUxDzANBgNVBAcT
+BlNhbXBsZTEPMA0GA1UEChMGU2FtcGxlMQ8wDQYDVQQLEwZTYW1wbGUxDzANBgNV
+BAMTBlNhbXBsZTEcMBoGCSqGSIb3DQEJARYNU2FtcGxlQFNhbXBsZTCCASIwDQYJ
+KoZIhvcNAQEBBQADggEPADCCAQoCggEBAMR9Ekn9fgrhqDuwLRQd4UFDily8LE4F
+5hpPMrVwmvXaBnWxyl/XeBopXE9H8xz4w2u9/d6V3h3CJYMeIjBy9MY16ttMQ2Ye
+RBJK8xZlDN0zDN2IQSIn2uVuSQm6Fq1AgtpL8z5XhTMe++hI7vEHfPp3Z1XOshKj
+iOKk9dQkz0H8rhWBRf2cW63CF1jZLhqtW4rjUR1A630IM4GdgaIA/i2ejcQFM1XL
+jEKkHQnM2qel+LbHhuX2um5VYZEwOFWIJNOizOzmTV1heoN7D5xf8/IWojq+q3ls
+a6b80a7fx4Cuskswxd67mVggnkJzMQvBvRQR6UYDvZgyKl7ocdS8k+8CAwEAATAN
+BgkqhkiG9w0BAQUFAAOCAQEAPJjCQzXdqpLLTU3q/LSg9SDp11oHOQDsdnCUKiXc
+ZEOU2UqhW87x3SRLI6s4L1IMrdm4CUurpDbtVpdp32LNLC06+eEB6W6mYfY6npUX
++kgKtRHnt2RNoo9zY99qn8xSLSqYtz0/LHfpsDHobEdqmAOwgzU2BpyQtNPzzYyp
+CciU/1hzhq+iHdD9u2PNUWWZdkqBfko22J4FlyFF+i6nf36hDDeHC5ZXXMnKZZdC
+UljZuvuE7pGRuyXJ3ss8ML1lxGFfyzXG1EyaOs+IjingW4aPTbIgabbjQfOPW1Dx
++j5fP1F445Y7f/bi3+dXKeIi8fziKaPe7JUm0fl/E5JIAg==
+-----END CERTIFICATE-----
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/server.pem
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/server.pem (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/server.pem 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,48 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAxH0SSf1+CuGoO7AtFB3hQUOKXLwsTgXmGk8ytXCa9doGdbHK
+X9d4GilcT0fzHPjDa7393pXeHcIlgx4iMHL0xjXq20xDZh5EEkrzFmUM3TMM3YhB
+Iifa5W5JCboWrUCC2kvzPleFMx776Eju8Qd8+ndnVc6yEqOI4qT11CTPQfyuFYFF
+/ZxbrcIXWNkuGq1biuNRHUDrfQgzgZ2BogD+LZ6NxAUzVcuMQqQdCczap6X4tseG
+5fa6blVhkTA4VYgk06LM7OZNXWF6g3sPnF/z8haiOr6reWxrpvzRrt/HgK6ySzDF
+3ruZWCCeQnMxC8G9FBHpRgO9mDIqXuhx1LyT7wIDAQABAoIBADrlMmIzNWCVmxOp
+DxoEisxBLmv1i2Inqn0gWBbClfAzVdRZYkuwDjPpSd4JtvlsJ0dYP4xEZ7uLwiq3
+EZBsJKp05tys01b8o3LIPGzuBRkYgDa27K8MOzSiBgqAWOO9fntoYJTDLw3Pr7pE
+gjqFABUjTToPJpkno5Qr1fOsxOMGD4EjbFnuH0yWs7H1I8EWDu3ehPIjWNfg2Kcg
+mB8FJTFw5P2V/jmDXo+m27sambLfg1aXJVCozLbpp7qTh3GmkTnbFLmrXPpkcSAH
+SOUwWfR5yJ7yfiNrJpl1HmOpHseFX8X1mOzXfa43NL8rdWLVzp0Ru3T+fAa46Uiy
+5GWUgsECgYEA7r1Rpj9vPinVP+NDT9xvj1oRvZpUecQ4qKYjAca5P+PL5enAbC2S
+kx/lm6phQQDSM5B+ciUTxqYSaTX9B34YOHrngMjI3QF42xgsnoYGsnD+7cEcL4Ch
+YvppbM4pmyidI1x/nKQuX/flAqDFPiSEvkIl/yhogfUPuqhROqkleDkCgYEA0rHB
+dVhj8BOdmoNjcCwTyEg0h8YqLUnBngpt8khDXJ6A4zGl8mCoAS0ZX07C2hABZ8LY
+Yb+2UBq/yC/kMNOAW6+tSyQPzIzj8ACzFrD+7e64jqxDLiwR9CC4aCbUetcaKysX
+I0EdRc/ua4noEG4NLN0SiM87nfygOIp8+xzg3WcCgYBf3jxEYyK7trbAgfVMw3s0
++Hk5Rxj3ELmj15slInTPWB1PxO8VQbOjSuK8AM4u+TJvrG1qTsNDOPsZrqld8d+u
+BoSiLX6jaOzsJif8vFtCGqf0N2CnHqMwCNh+7ca9XLDFO2avMyrANN6MkJzEGIwJ
+jqyNat+UmCDp1p9PzA6U2QKBgQChbET/BbiTwEOYlR+DWpHRedidSasYxZCM6Aqy
+fMD3Xgz7hBdTKtCtDgOJbMuGzIp3F+Y2he5FUykqU97tta17EY6gqkFcg3lDcRXE
+xdibLLHDxaE259bdhpAiY9WZQ+ItvrbccM/fo95UD/V3WQnJHJtNrBBOC7Ype5kj
+v01c8wKBgBc2GfncAkaVGggTZHr+zkpASUTBHX9B6jju8s9X49v88MkO933Pidc6
+eysyFBnIvarhH9nTyDIGvTrrjpLC9/bUT6gPT11Z8cO3W6vrENfru/n8M98gdyXp
+fjgVr9jeCjkZo/ccNi3fklx6dpZIEQKo3vfSeKQPOrbD/wkLcXzd
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDfjCCAmYCCQDdoe79VxqYFjANBgkqhkiG9w0BAQUFADCBgDELMAkGA1UEBhMC
+SlAxDzANBgNVBAgTBlNhbXBsZTEPMA0GA1UEBxMGU2FtcGxlMQ8wDQYDVQQKEwZT
+YW1wbGUxDzANBgNVBAsTBlNhbXBsZTEPMA0GA1UEAxMGU2FtcGxlMRwwGgYJKoZI
+hvcNAQkBFg1TYW1wbGVAU2FtcGxlMB4XDTE2MDMwNzEwMTIwMFoXDTI2MDMwNTEw
+MTIwMFowgYAxCzAJBgNVBAYTAkpQMQ8wDQYDVQQIEwZTYW1wbGUxDzANBgNVBAcT
+BlNhbXBsZTEPMA0GA1UEChMGU2FtcGxlMQ8wDQYDVQQLEwZTYW1wbGUxDzANBgNV
+BAMTBlNhbXBsZTEcMBoGCSqGSIb3DQEJARYNU2FtcGxlQFNhbXBsZTCCASIwDQYJ
+KoZIhvcNAQEBBQADggEPADCCAQoCggEBAMR9Ekn9fgrhqDuwLRQd4UFDily8LE4F
+5hpPMrVwmvXaBnWxyl/XeBopXE9H8xz4w2u9/d6V3h3CJYMeIjBy9MY16ttMQ2Ye
+RBJK8xZlDN0zDN2IQSIn2uVuSQm6Fq1AgtpL8z5XhTMe++hI7vEHfPp3Z1XOshKj
+iOKk9dQkz0H8rhWBRf2cW63CF1jZLhqtW4rjUR1A630IM4GdgaIA/i2ejcQFM1XL
+jEKkHQnM2qel+LbHhuX2um5VYZEwOFWIJNOizOzmTV1heoN7D5xf8/IWojq+q3ls
+a6b80a7fx4Cuskswxd67mVggnkJzMQvBvRQR6UYDvZgyKl7ocdS8k+8CAwEAATAN
+BgkqhkiG9w0BAQUFAAOCAQEAPJjCQzXdqpLLTU3q/LSg9SDp11oHOQDsdnCUKiXc
+ZEOU2UqhW87x3SRLI6s4L1IMrdm4CUurpDbtVpdp32LNLC06+eEB6W6mYfY6npUX
++kgKtRHnt2RNoo9zY99qn8xSLSqYtz0/LHfpsDHobEdqmAOwgzU2BpyQtNPzzYyp
+CciU/1hzhq+iHdD9u2PNUWWZdkqBfko22J4FlyFF+i6nf36hDDeHC5ZXXMnKZZdC
+UljZuvuE7pGRuyXJ3ss8ML1lxGFfyzXG1EyaOs+IjingW4aPTbIgabbjQfOPW1Dx
++j5fP1F445Y7f/bi3+dXKeIi8fziKaPe7JUm0fl/E5JIAg==
+-----END CERTIFICATE-----
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/test_SSLTrasport.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/test_SSLTrasport.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/ext/ssl/test/test_SSLTrasport.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_SSLTrasport.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC
+import multiprocessing
+import os
+import threading
+
+
+
+def RunOutPort(q):
+
+ argv = sys.argv[:]
+ #argv.extend(['-o', 'corba.endpoint::2810'])
+
+ manager = OpenRTM_aist.Manager.init(argv)
+ manager.activateManager()
+ _d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ _outOut = OpenRTM_aist.OutPort("out", _d_out)
+ prop = OpenRTM_aist.Properties()
+ _outOut.init(prop)
+
+
+
+ """orb = manager.getORB()
+ poa = orb.resolve_initial_references("omniINSPOA")
+ poa._get_the_POAManager().activate()
+ id = "test"
+ poa.activate_object_with_id(id, _outOut)"""
+
+ manager._namingManager.bindPortObject("test", _outOut)
+
+
+ q.get()
+
+ _d_out.data = 100
+ _outOut.write()
+
+ q.get()
+
+
+
+ manager.shutdownManager()
+
+
+class TestSSLTransport(unittest.TestCase):
+
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+
+ sys.argv.extend(['-o', 'manager.preload.modules:SSLTransport.py'])
+ sys.argv.extend(['-o', 'manager.modules.load_path:./,../'])
+ sys.argv.extend(['-o', 'corba.ssl.certificate_authority_file:root.crt'])
+ sys.argv.extend(['-o', 'corba.ssl.key_file:server.pem'])
+ sys.argv.extend(['-o', 'corba.ssl.key_file_password:password'])
+ os.environ['ORBsslVerifyMode'] = "none"
+ self.outport_process = multiprocessing.Process(target=RunOutPort, args=(self.queue,))
+ self.outport_process.start()
+
+
+ time.sleep(1)
+ os.environ['ORBtraceLevel'] = '25'
+
+
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.activateManager()
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+ prop = OpenRTM_aist.Properties()
+ self._inIn.init(prop)
+ self.inport_obj = self._inIn.getPortRef()
+
+
+
+ orb = self.manager.getORB()
+ #outport_name = "corbaloc:iiop:localhost:2810/test"
+ outport_name = "corbaname::localhost:2809/NameService#test"
+ oobj = orb.string_to_object(outport_name)
+ self.outport_obj = oobj._narrow(RTC.PortService)
+
+
+
+ def tearDown(self):
+ self.manager.shutdownManager()
+ self.queue.put("")
+
+ def test_Connect(self):
+
+ prop = OpenRTM_aist.Properties()
+ ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
+
+ self.queue.put("")
+
+ #ret = self._inIn.isNew()
+ #data = self._inIn.read()
+
+
+
+ #self.outport_obj.disconnect_all()
+
+
+
+
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CORBA_RTCUtil.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CORBA_RTCUtil.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CORBA_RTCUtil.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,371 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_CORBA_RTCUtil.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+class Test_i(OpenRTM__POA.InPortCdr):
+ def __init__(self):
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
+
+
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._testService_provided = Test_i()
+
+
+ self._test1 = [0]
+
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
+
+ self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+ self.addPort(self._servicePort_provided)
+
+
+
+
+ self.bindParameter("test1", self._test1, "0")
+
+
+ return RTC.RTC_OK
+
+
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+
+
+
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
+
+ return
+
+ def onInitialize(self):
+ self.addInPort("in",self._inIn)
+ self.addOutPort("out",self._outOut)
+
+
+
+ self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+ self.addPort(self._servicePort_required)
+
+
+ return RTC.RTC_OK
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+ TestComp1Init(manager)
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp1")
+ com = manager.createComponent("TestComp2")
+
+
+
+
+
+class Test_CORBA_RTCUtil(unittest.TestCase):
+
+ def setUp(self):
+ sys.argv.extend(['-d'])
+ sys.argv.extend(['-o', 'exec_cxt.periodic.rate:1000'])
+ sys.argv.extend(['-o','naming.type:corba,manager'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(MyModuleInit)
+ self.manager.activateManager()
+
+
+
+ self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
+ self.comp2 = self.manager.getComponent("TestComp20").getObjRef()
+
+
+ def tearDown(self):
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+ def test_Component(self):
+ compProf = OpenRTM_aist.get_component_profile(self.comp1)
+ self.assertEqual(compProf.getProperty("implementation_id"), "TestComp1")
+ ret = OpenRTM_aist.is_existing(self.comp1)
+ self.assertTrue(ret)
+
+ def test_EC(self):
+ ec = OpenRTM_aist.get_actual_ec(self.comp1,0)
+ ec_id = OpenRTM_aist.get_ec_id(self.comp1, ec)
+ self.assertEqual(ec_id, 0)
+
+ ret = OpenRTM_aist.add_rtc_to_default_ec(self.comp1, self.comp2)
+ self.assertEqual(ret, RTC.RTC_OK)
+
+
+ ret = OpenRTM_aist.is_alive_in_default_ec(self.comp1)
+ self.assertEqual(ret, True)
+
+ ret = OpenRTM_aist.set_default_rate(self.comp1, 500)
+ self.assertEqual(ret, RTC.RTC_OK)
+
+ rate = OpenRTM_aist.get_default_rate(self.comp1)
+ self.assertEqual(int(rate), 500)
+
+ ret = OpenRTM_aist.set_current_rate(self.comp2,1000, 100)
+ self.assertEqual(ret, RTC.RTC_OK)
+
+ rate = OpenRTM_aist.get_current_rate(self.comp2, 1000)
+ self.assertEqual(int(rate), 100)
+
+ ret = OpenRTM_aist.activate(self.comp1,0)
+ self.assertEqual(ret, RTC.RTC_OK)
+ state = OpenRTM_aist.is_in_active(self.comp1, 0)
+ self.assertTrue(state)
+
+ ret = OpenRTM_aist.deactivate(self.comp1,0)
+ self.assertEqual(ret, RTC.RTC_OK)
+ state = OpenRTM_aist.is_in_inactive(self.comp1, 0)
+ self.assertTrue(state)
+
+ #state = OpenRTM_aist.is_in_error(self.comp1, 0)
+ #ret = OpenRTM_aist.reset(self.comp1,0)
+ #self.assertEqual(ret, RTC.RTC_OK)
+
+ ret = [None]
+ ans = OpenRTM_aist.get_state(ret, self.comp1, 0)
+ self.assertEqual(ret[0], RTC.INACTIVE_STATE)
+
+ ret = OpenRTM_aist.remove_rtc_to_default_ec(self.comp1, self.comp2)
+ self.assertEqual(ret, RTC.RTC_OK)
+
+
+ def test_Port(self):
+ port_names = OpenRTM_aist.get_port_names(self.comp1)
+ self.assertTrue("TestComp10.out" in port_names)
+
+ inport_names = OpenRTM_aist.get_inport_names(self.comp2)
+ self.assertTrue("TestComp20.in" in inport_names)
+
+ outport_names = OpenRTM_aist.get_outport_names(self.comp1)
+ self.assertTrue("TestComp10.out" in outport_names)
+
+ svcport_names = OpenRTM_aist.get_svcport_names(self.comp1)
+ self.assertTrue("TestComp10.service" in svcport_names)
+
+ rtc1_port_out = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.out")
+ rtc2_port_out = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.out")
+ rtc2_port_in = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.in")
+ rtc1_port_in = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.in")
+
+ pp = rtc1_port_out.get_port_profile()
+ self.assertEqual(pp.name, "TestComp10.out")
+
+
+ prop = OpenRTM_aist.Properties()
+ connect_profile = OpenRTM_aist.create_connector("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ self.assertEqual(ret, RTC.RTC_OK)
+
+ con_names = OpenRTM_aist.get_connector_names_by_portref(rtc1_port_out)
+ self.assertTrue("con1" in con_names)
+ con_names = OpenRTM_aist.get_connector_names(self.comp1,"TestComp10.out")
+ self.assertTrue("con1" in con_names)
+ con_ids = OpenRTM_aist.get_connector_ids_by_portref(rtc1_port_out)
+ con_ids = OpenRTM_aist.get_connector_ids(self.comp1,"TestComp10.out")
+
+
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertTrue(ret)
+
+ ret = OpenRTM_aist.disconnect(rtc1_port_out.get_connector_profiles()[0])
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+
+
+ ret = OpenRTM_aist.connect_multi("con2",prop,rtc1_port_out,[rtc1_port_in,rtc2_port_in])
+ self.assertEqual(ret, RTC.RTC_OK)
+
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc1_port_in)
+ self.assertTrue(ret)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertTrue(ret)
+
+ ret = OpenRTM_aist.disconnect_all_by_ref(rtc1_port_out)
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+
+
+ ret = OpenRTM_aist.connect_by_name("con3",prop,self.comp1,"TestComp10.out",self.comp2,"TestComp20.in")
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertTrue(ret)
+
+ ret = OpenRTM_aist.disconnect_by_portref_connector_name(rtc1_port_out, "con3")
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.disconnect_by_portref_connector_id(rtc1_port_out,rtc1_port_out.get_connector_profiles()[0].connector_id)
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.disconnect_by_port_name(rtc1_port_out,"TestComp20.in")
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.disconnect_all_by_name("rtcloc://localhost:2810/example/TestComp20.in")
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.disconnect_by_portname_connector_name("rtcloc://localhost:2810/example/TestComp20.in","con1")
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+ ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
+ ret = OpenRTM_aist.disconnect_by_portname_connector_id("rtcloc://localhost:2810/example/TestComp10.out",rtc1_port_out.get_connector_profiles()[0].connector_id)
+ self.assertEqual(ret, RTC.RTC_OK)
+ ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
+ self.assertFalse(ret)
+
+
+
+
+ def test_configuration(self):
+ conf = OpenRTM_aist.get_configuration(self.comp1, "default")
+ self.assertEqual(conf.getProperty("test1"),"0")
+
+ ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
+ self.assertEqual(ret,"0")
+
+ conf = OpenRTM_aist.get_active_configuration(self.comp1)
+ self.assertEqual(conf.getProperty("test1"),"0")
+
+ ret = OpenRTM_aist.set_configuration(self.comp1,"default","test1","1")
+ self.assertTrue(ret)
+ ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
+ self.assertEqual(ret,"1")
+
+ ret = OpenRTM_aist.set_active_configuration(self.comp1,"test1","2")
+ self.assertTrue(ret)
+ ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
+ self.assertEqual(ret,"2")
+
+ confsetname = OpenRTM_aist.get_active_configuration_name(self.comp1)
+ self.assertEqual(confsetname,"default")
+
+
+ #ec = self.comp1.get_owned_contexts()[0]
+
+ #print OpenRTM_aist.get_participants_rtc(ec)
+
+ #print ec.add_component(self.comp2)
+ #print ec.get_profile()
+ #prop = OpenRTM_aist.Properties()
+ #conf = self.comp1.get_configuration()
+ #print conf.get_configuration_sets()
+ #confsets = conf.get_configuration_sets()
+ #print confsets
+ #OpenRTM_aist.NVUtil.copyToProperties(prop,confsets)
+ #print prop
+ # self.comp1.get_status_list()
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_CreateComponent_Slave.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import multiprocessing
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+
+
+
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+
+ def onInitialize(self):
+
+ return RTC.RTC_OK
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+
+ return
+
+ def onInitialize(self):
+
+
+ return RTC.RTC_OK
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
+
+def TestComp1ModuleInit(manager):
+ TestComp1Init(manager)
+ #com = manager.createComponent("TestComp1")
+
+def TestComp2ModuleInit(manager):
+ TestComp2Init(manager)
+ #com = manager.createComponent("TestComp2")
+
+
+def runTestComp2(q):
+ q.get()
+ argv = ["dummy"]
+ argv.extend(['-o','naming.type:corba,manager'])
+ argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+ argv.extend(['-o','manager.instance_name:manager2'])
+ argv.extend(['-o','manager.shutdown_on_nortcs:NO'])
+ argv.extend(['-o','manager.shutdown_auto:NO'])
+
+ manager = OpenRTM_aist.Manager.init(argv)
+ manager.setModuleInitProc(TestComp2ModuleInit)
+ manager.activateManager()
+
+
+
+
+ #print manager.getManagerServant().findManager_by_name("manager")
+ #rtc = manager.getManagerServant().create_component("testSHM_in&manager_name=manager3&language=Python")
+ #rtc = manager.getManagerServant().create_component("AbsFunction&manager_name=manager3&language=C++")
+ #$print rtc
+ #rtc = manager.getManagerServant().create_component("testSHM_in")
+ #print rtc
+ q.get()
+
+ comps = manager.getComponents()[:]
+ for comp in comps:
+ manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = manager._factory.find(comp_id)
+ factory.destroy(comp)
+ manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+class Test_CreateComponent_Slave(unittest.TestCase):
+
+ def setUp(self):
+
+
+ sys.argv.extend(['-d'])
+ sys.argv.extend(['-o','naming.type:corba,manager'])
+ sys.argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+ sys.argv.extend(['-o','manager.instance_name:manager'])
+ #sys.argv.extend(['-o','logger.log_level:DEBUG'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(TestComp1ModuleInit)
+ self.manager.activateManager()
+
+
+
+
+
+
+
+
+ def tearDown(self):
+
+
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+
+ time.sleep(0.1)
+
+ def test_createComponent_slave(self):
+ self.queue = multiprocessing.Queue()
+ self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
+ self.outport_process.start()
+ self.queue.put("")
+ time.sleep(1)
+ #rtc = self.manager.getManagerServant().create_component("TestComp1&manager_name=manager2")
+ #rtc = self.manager.getManagerServant().create_component("TestComp2&manager_address=localhost:2810")
+ rtc = self.manager.getManagerServant().create_component("TestComp2&manager_name=manager2")
+ slave = self.manager.getManagerServant().findManager_by_name("manager2")
+ rtcs = slave.get_components_by_name("TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ self.queue.put("")
+ #rtc = self.manager.getManagerServant().create_component("TestComp2")
+ #print rtc
+ #comps = self.manager.getComponents()
+ #print comps
+
+ def test_createComponent_newProcess(self):
+ rtc = self.manager.getManagerServant().create_component("TestComp3&manager_name=manager3&language=Python")
+ slave = self.manager.getManagerServant().findManager_by_name("manager3")
+ rtcs = slave.get_components_by_name("TestComp30")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp30")
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectPort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectPort.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectPort.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,146 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_DirectPort.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC
+
+
+
+
+class DataListener(OpenRTM_aist.ConnectorDataListenerT):
+ def __init__(self, mes):
+ self.mes = mes
+ def __del__(self):
+ pass
+ def __call__(self, info, cdrdata):
+ print self.mes
+
+class ConnectorListener(OpenRTM_aist.ConnectorListener):
+ def __init__(self, mes):
+ self.mes = mes
+ def __del__(self):
+ pass
+ def __call__(self, info):
+ print self.mes
+
+
+
+class TestDirectPort(unittest.TestCase):
+
+ def setUp(self):
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.activateManager()
+
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+ prop = OpenRTM_aist.Properties()
+ self._inIn.init(prop)
+ self.inport_obj = self._inIn.getPortRef()
+
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,DataListener("InPort:ON_BUFFER_WRITE"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL,DataListener("InPort:ON_BUFFER_FULL"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT,DataListener("InPort:ON_BUFFER_WRITE_TIMEOUT"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE,DataListener("InPort:ON_BUFFER_OVERWRITE"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ,DataListener("InPort:ON_BUFFER_READ"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND,DataListener("InPort:ON_SEND"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,DataListener("InPort:ON_RECEIVED"))
+ self._inIn.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY,ConnectorListener("InPort:ON_BUFFER_EMPTY"))
+ self._inIn.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT,ConnectorListener("InPort:ON_BUFFER_READ_TIMEOUT"))
+ self._inIn.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY,ConnectorListener("InPort:ON_SENDER_EMPTY"))
+ self._inIn.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT,ConnectorListener("InPort:ON_SENDER_TIMEOUT"))
+ self._inIn.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR,ConnectorListener("InPort:ON_SENDER_ERROR"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL,DataListener("InPort:ON_RECEIVER_FULL"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT,DataListener("InPort:ON_RECEIVER_TIMEOUT"))
+ self._inIn.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,DataListener("InPort:ON_RECEIVER_ERROR"))
+
+
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ prop = OpenRTM_aist.Properties()
+ self._outOut.init(prop)
+ self.outport_obj = self._outOut.getPortRef()
+
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,DataListener("OutPort:ON_BUFFER_WRITE"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL,DataListener("OutPort:ON_BUFFER_FULL"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT,DataListener("OutPort:ON_BUFFER_WRITE_TIMEOUT"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE,DataListener("OutPort:ON_BUFFER_OVERWRITE"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ,DataListener("OutPort:ON_BUFFER_READ"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND,DataListener("OutPort:ON_SEND"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,DataListener("OutPort:ON_RECEIVED"))
+ self._outOut.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY,ConnectorListener("OutPort:ON_BUFFER_EMPTY"))
+ self._outOut.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT,ConnectorListener("OutPort:ON_BUFFER_READ_TIMEOUT"))
+ self._outOut.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY,ConnectorListener("OutPort:ON_SENDER_EMPTY"))
+ self._outOut.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT,ConnectorListener("OutPort:ON_SENDER_TIMEOUT"))
+ self._outOut.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR,ConnectorListener("OutPort:ON_SENDER_ERROR"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL,DataListener("OutPort:ON_RECEIVER_FULL"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT,DataListener("OutPort:ON_RECEIVER_TIMEOUT"))
+ self._outOut.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,DataListener("OutPort:ON_RECEIVER_ERROR"))
+
+
+ def tearDown(self):
+ self.manager.shutdownManager()
+
+ def test_Push(self):
+ print "Push"
+ prop = OpenRTM_aist.Properties()
+ prop.setProperty("dataport.interface_type","direct")
+ prop.setProperty("dataport.dataflow_type","push")
+ ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
+
+ self._d_out.data = 100
+ #for i in range(10):
+ #self._outOut.write()
+ self._outOut.write()
+
+ ret = self._inIn.isNew()
+ self.assertTrue(ret)
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, 100)
+ self.assertTrue(data is self._d_out)
+
+ self.outport_obj.disconnect_all()
+
+
+
+ def test_Pull(self):
+ print "Pull"
+ prop = OpenRTM_aist.Properties()
+ prop.setProperty("dataport.interface_type","direct")
+ prop.setProperty("dataport.dataflow_type","pull")
+ ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
+
+ self._d_out.data = 100
+ self._outOut.write()
+
+ #ret = self._inIn.isNew()
+ #self.assertTrue(ret)
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, 100)
+ self.assertTrue(data is self._d_out)
+
+ self.outport_obj.disconnect_all()
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_DirectServicePort.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
+
+
+
+class Test_i(OpenRTM__POA.InPortCdr):
+ def __init__(self):
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
+
+
+
+
+
+
+
+class TestDirectServicePort(unittest.TestCase):
+
+ def setUp(self):
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ #self.manager.setModuleInitProc(MyModuleInit)
+ self.manager.activateManager()
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._testService_provided = Test_i()
+
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
+ self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+
+
+ self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+
+
+ self._servicePort_provided.activateInterfaces()
+
+ def tearDown(self):
+ self.manager.shutdownManager()
+ time.sleep(0.1)
+
+
+
+ def test_Service(self):
+ prop = OpenRTM_aist.Properties()
+ ret = OpenRTM_aist.connect("con1",prop,self._servicePort_provided.getPortRef(),self._servicePort_required.getPortRef())
+ obj = self._testService_required._ptr()
+
+ self.assertEqual(obj, self._testService_provided)
+
+
+
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_NumberingPolicy_node.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import multiprocessing
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+
+
+
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ OpenRTM_aist.DataFlowComponentBase,
+ OpenRTM_aist.Delete)
+
+
+
+
+
+
+def TestComp1ModuleInit(manager):
+ TestComp1Init(manager)
+ com = manager.createComponent("TestComp1")
+
+
+
+
+def runTestComp2(q):
+
+ argv = [""]
+ argv.extend(['-d'])
+ argv.extend(['-o','manager.components.naming_policy:node_unique'])
+ argv.extend(['-o','naming.type:corba,manager'])
+
+
+ manager = OpenRTM_aist.Manager.init(argv)
+ manager.setModuleInitProc(TestComp1ModuleInit)
+ manager.activateManager()
+
+
+ q.get()
+
+ comps = manager.getComponents()[:]
+ for comp in comps:
+ manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = manager._factory.find(comp_id)
+ factory.destroy(comp)
+ manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+class Test_NumberingPolicy_node(unittest.TestCase):
+
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+ self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
+ self.outport_process.start()
+
+ time.sleep(1)
+ sys.argv.extend(['-o','manager.components.naming_policy:node_unique'])
+ sys.argv.extend(['-o','naming.type:corba,manager'])
+ #sys.argv.extend(['-o','manager.instance_name: manager2'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(TestComp1ModuleInit)
+ self.manager.activateManager()
+
+
+ def tearDown(self):
+ self.queue.put("")
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+ def test_getComponent(self):
+ #prop = OpenRTM_aist.Properties()
+ #pp = self.manager.getManagerServant().get_profile().properties
+ #OpenRTM_aist.NVUtil.copyToProperties(prop, pp)
+ #print prop
+ comp = self.manager.getComponent("TestComp11")
+ self.assertTrue(comp is not None)
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_NumberingPolicy_ns.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import multiprocessing
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+
+
+
+
+
+
+
+
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ OpenRTM_aist.DataFlowComponentBase,
+ OpenRTM_aist.Delete)
+
+
+
+
+
+
+def TestComp1ModuleInit(manager):
+ TestComp1Init(manager)
+ com = manager.createComponent("TestComp1")
+
+
+
+
+def runTestComp2(q):
+
+ argv = [""]
+ argv.extend(['-d'])
+ argv.extend(['-o','manager.components.naming_policy:ns_unique'])
+
+
+ manager = OpenRTM_aist.Manager.init(argv)
+ manager.setModuleInitProc(TestComp1ModuleInit)
+ manager.activateManager()
+
+
+ q.get()
+
+ comps = manager.getComponents()[:]
+ for comp in comps:
+ manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = manager._factory.find(comp_id)
+ factory.destroy(comp)
+ manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+class Test_NumberingPolicy_ns(unittest.TestCase):
+
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+ self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
+ self.outport_process.start()
+
+ time.sleep(1)
+ sys.argv.extend(['-o','manager.components.naming_policy:ns_unique'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(TestComp1ModuleInit)
+ self.manager.activateManager()
+
+
+ def tearDown(self):
+ self.queue.put("")
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+ def test_getComponent(self):
+ comp = self.manager.getComponent("TestComp11")
+ self.assertTrue(comp is not None)
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,211 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_Preconfigured.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+class Test_i(OpenRTM__POA.InPortCdr):
+ def __init__(self):
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
+
+
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._testService_provided = Test_i()
+
+
+
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
+
+ self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+ self.addPort(self._servicePort_provided)
+
+ #self._servicePort_provided.activateInterfaces()
+
+
+
+ return RTC.RTC_OK
+
+
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+
+
+
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
+
+ return
+
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
+
+
+
+
+ self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+ self.addPort(self._servicePort_required)
+
+ return RTC.RTC_OK
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+ TestComp1Init(manager)
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp1")
+ com = manager.createComponent("TestComp2")
+
+
+
+
+
+class Test_Preconfigured(unittest.TestCase):
+
+ def setUp(self):
+ self.dataPortConnectorName = "TestComp20.in^TestComp10.out(interface_type=direct)"
+ self.servicePortConnectorName = "TestComp10.service^TestComp20.service()"
+ sys.argv.extend(['-o', 'manager.components.preconnect:'+self.dataPortConnectorName+","+self.servicePortConnectorName])
+ sys.argv.extend(['-o', 'manager.components.preactivate:TestComp10'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(MyModuleInit)
+ self.manager.activateManager()
+
+ self.comps = []
+ self.comps.append(self.manager.getComponent("TestComp10"))
+ self.comps.append(self.manager.getComponent("TestComp20"))
+ self.comp1 = self.comps[0].getObjRef()
+ self.comp2 = self.comps[1].getObjRef()
+
+
+
+ def tearDown(self):
+ for comp in self.comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+
+
+
+
+ def test_PreActivation(self):
+
+ state = OpenRTM_aist.is_in_active(self.comp1)
+ self.assertTrue(state)
+
+
+ def test_PreConnection(self):
+
+ inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
+ outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
+ ans = OpenRTM_aist.already_connected(inport, outport)
+ self.assertTrue(ans)
+
+ #con = OpenRTM_aist.get_connector_names(inport)[0]
+ profile = inport.get_connector_profiles()[0]
+ prop = OpenRTM_aist.Properties()
+ OpenRTM_aist.NVUtil.copyToProperties(prop, profile.properties)
+ self.assertEqual(prop.getProperty("dataport.interface_type"),"direct")
+
+
+ provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.service")
+ required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.service")
+ ans = OpenRTM_aist.already_connected(provided, required)
+ self.assertTrue(ans)
+
+
+
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured_MultiProcess.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured_MultiProcess.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Preconfigured_MultiProcess.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_Preconfigured_MultiProcess.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import multiprocessing
+import OpenRTM, OpenRTM__POA
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+class Test_i(OpenRTM__POA.InPortCdr):
+ def __init__(self):
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
+
+
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._testService_provided = Test_i()
+
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
+
+ self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+ self.addPort(self._servicePort_provided)
+
+ return RTC.RTC_OK
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
+
+ return
+
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
+
+
+
+
+ self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+ self.addPort(self._servicePort_required)
+
+ return RTC.RTC_OK
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
+
+def TestComp1ModuleInit(manager):
+ TestComp1Init(manager)
+ com = manager.createComponent("TestComp1")
+
+def TestComp2ModuleInit(manager):
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp2")
+
+
+def runTestComp2(q):
+
+ argv = [""]
+ argv.extend(['-d'])
+ argv.extend(['-o','naming.type:corba,manager'])
+ argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+
+
+ manager = OpenRTM_aist.Manager.init(argv)
+
+ manager.setModuleInitProc(TestComp2ModuleInit)
+ manager.activateManager()
+
+
+ q.get()
+
+ comps = manager.getComponents()[:]
+ for comp in comps:
+ manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = manager._factory.find(comp_id)
+ factory.destroy(comp)
+ manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+class Test_Preconfigured_MultiProcess(unittest.TestCase):
+
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+ self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
+ self.outport_process.start()
+
+ time.sleep(1)
+ sys.argv.extend(['-o','naming.type:corba,manager'])
+ sys.argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+
+ RTC2_URL = "rtcloc://localhost:2810/example/TestComp20"
+ self.dataPortConnectorName = RTC2_URL+".in"+"^TestComp10.out()"
+ self.servicePortConnectorName = "TestComp10.service^"+RTC2_URL+".service()"
+ sys.argv.extend(['-o', 'manager.components.preconnect:'+self.dataPortConnectorName+","+self.servicePortConnectorName])
+ sys.argv.extend(['-o', 'manager.components.preactivate:'+RTC2_URL])
+ #print 'manager.components.preactivate:'+RTC2_URL
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(TestComp1ModuleInit)
+ self.manager.activateManager()
+
+ self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
+ self.comp2 = self.manager._namingManager.string_to_component(RTC2_URL)[0]
+
+
+ def tearDown(self):
+ self.queue.put("")
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+ def test_PreActivation(self):
+ state = OpenRTM_aist.is_in_active(self.comp2)
+ self.assertTrue(state)
+
+ def test_PreConnection(self):
+ inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
+ outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
+ ans = OpenRTM_aist.already_connected(inport, outport)
+ self.assertTrue(ans)
+
+ provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.service")
+ required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.service")
+ ans = OpenRTM_aist.already_connected(provided, required)
+ self.assertTrue(ans)
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_SharedMemory.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, OpenRTM
+
+
+
+
+
+
+
+
+class TestSharedMemory(unittest.TestCase):
+
+ def setUp(self):
+ sys.argv.extend(['-o', 'port.outport.out.shem_default_size:1k'])
+ #sys.argv.extend(['-o', 'port.inport.in.shem_default_size:1k'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.activateManager()
+
+ self._d_in = RTC.TimedOctetSeq(RTC.Time(0,0),[])
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+ prop = self.manager.getConfig().getNode("port.inport.in")
+ self._inIn.init(prop)
+ self.inport_obj = self._inIn.getPortRef()
+
+
+ self._d_out = RTC.TimedOctetSeq(RTC.Time(0,0),[])
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ prop = self.manager.getConfig().getNode("port.outport.out")
+ self._outOut.init(prop)
+ self.outport_obj = self._outOut.getPortRef()
+
+
+
+ def tearDown(self):
+ self.manager.shutdownManager()
+
+ def test_Push(self):
+
+ prop = OpenRTM_aist.Properties()
+ prop.setProperty("dataport.interface_type","shared_memory")
+ prop.setProperty("dataport.dataflow_type","push")
+ ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
+
+ self._d_out.data = "a"*100
+ self._outOut.write()
+
+ ret = self._inIn.isNew()
+ self.assertTrue(ret)
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, self._d_out.data)
+
+
+ self._d_out.data = "a"*50000
+ self._outOut.write()
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, self._d_out.data)
+
+
+ self.outport_obj.disconnect_all()
+
+
+
+ def test_Pull(self):
+ prop = OpenRTM_aist.Properties()
+ prop.setProperty("dataport.interface_type","shared_memory")
+ prop.setProperty("dataport.dataflow_type","pull")
+ ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
+
+ self._d_out.data = "a"*100
+ self._outOut.write()
+
+ #ret = self._inIn.isNew()
+ #self.assertTrue(ret)
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, self._d_out.data)
+
+
+ self._d_out.data = "a"*50000
+ self._outOut.write()
+
+
+ data = self._inIn.read()
+ self.assertEqual(data.data, self._d_out.data)
+
+
+ self.outport_obj.disconnect_all()
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,190 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_Topic.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+class Test_i(OpenRTM__POA.InPortCdr):
+ def __init__(self):
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
+
+
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+
+ self._d_topic_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._topic_outOut = OpenRTM_aist.OutPort("topic_out", self._d_topic_out)
+
+ self._topic_servicePort_provided = OpenRTM_aist.CorbaPort("topic_service")
+ self._topic_Service_provided = Test_i()
+
+ def onInitialize(self):
+
+ self.addOutPort("topic_out",self._topic_outOut)
+ self._topic_outOut.appendProperty("publish_topic","test")
+
+ self._topic_servicePort_provided.registerProvider("topic_service", "TestService", self._topic_Service_provided)
+ self.addPort(self._topic_servicePort_provided)
+ self._topic_servicePort_provided.appendProperty("publish_topic","test")
+
+
+ return RTC.RTC_OK
+
+
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+
+ self._d_topic_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._topic_inIn = OpenRTM_aist.InPort("topic_in", self._d_topic_in)
+
+
+
+ self._topic_servicePort_required = OpenRTM_aist.CorbaPort("topic_service")
+ self._topic_Service_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+ return
+
+ def onInitialize(self):
+
+
+ self.addInPort("topic_in",self._topic_inIn)
+ self._topic_inIn.appendProperty("publish_topic","test")
+
+
+ self._topic_servicePort_required.registerConsumer("topic_service", "TestService", self._topic_Service_required)
+ self.addPort(self._topic_servicePort_required)
+ self._topic_servicePort_required.appendProperty("publish_topic","test")
+
+ return RTC.RTC_OK
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+ TestComp1Init(manager)
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp1")
+ com = manager.createComponent("TestComp2")
+
+
+
+
+
+class test_Topic(unittest.TestCase):
+
+ def setUp(self):
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(MyModuleInit)
+ self.manager.activateManager()
+
+ self.comps = []
+ self.comps.append(self.manager.getComponent("TestComp10"))
+ self.comps.append(self.manager.getComponent("TestComp20"))
+ self.comp1 = self.comps[0].getObjRef()
+ self.comp2 = self.comps[1].getObjRef()
+
+
+
+ def tearDown(self):
+ for comp in self.comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+
+ def test_Topic(self):
+
+ inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.topic_in")
+ outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.topic_out")
+ ans = OpenRTM_aist.already_connected(inport, outport)
+ self.assertTrue(ans)
+
+
+
+
+ provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.topic_service")
+ required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.topic_service")
+ ans = OpenRTM_aist.already_connected(provided, required)
+ self.assertTrue(ans)
+
+
+
+
+
+
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py 2016-03-07 10:23:17 UTC (rev 689)
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+#
+# \file test_string_to_component.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
+
+
+import sys
+sys.path.insert(1,"../")
+
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
+
+import time
+
+#from Manager import *
+import OpenRTM_aist
+import RTC, RTC__POA
+import multiprocessing
+
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
+
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
+
+
+
+
+
+
+
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ OpenRTM_aist.DataFlowComponentBase,
+ OpenRTM_aist.Delete)
+
+
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ OpenRTM_aist.DataFlowComponentBase,
+ OpenRTM_aist.Delete)
+
+
+def TestComp1ModuleInit(manager):
+ TestComp1Init(manager)
+ com = manager.createComponent("TestComp1")
+
+def TestComp2ModuleInit(manager):
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp2")
+
+
+def runTestComp2(q):
+
+ argv = [""]
+ argv.extend(['-d'])
+ argv.extend(['-o','naming.type:corba,manager'])
+ argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+
+
+ manager = OpenRTM_aist.Manager.init(argv)
+ manager.setModuleInitProc(TestComp2ModuleInit)
+ manager.activateManager()
+
+
+ q.get()
+
+ comps = manager.getComponents()[:]
+ for comp in comps:
+ manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = manager._factory.find(comp_id)
+ factory.destroy(comp)
+ manager.shutdownNaming()
+ time.sleep(0.1)
+
+
+class Test_string_to_component(unittest.TestCase):
+
+ def setUp(self):
+ self.queue = multiprocessing.Queue()
+ self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
+ self.outport_process.start()
+
+ time.sleep(1)
+ sys.argv.extend(['-o','naming.type:corba,manager'])
+ sys.argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(TestComp1ModuleInit)
+ self.manager.activateManager()
+
+
+ def tearDown(self):
+
+ self.queue.put("")
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
+
+ def test_getComponent(self):
+ #mgr_sev = self.manager._mgrservant.getObjRef()
+ #print mgr_sev.get_components_by_name("example/TestComp10")
+ rtcs = self.manager._namingManager.string_to_component("rtcloc://localhost:2810/example/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ rtcs = self.manager._namingManager.string_to_component("rtcloc://*/example/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ rtcs = self.manager._namingManager.string_to_component("rtcloc://*/*/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ #print rtcs
+ rtcs = self.manager._namingManager.string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ rtcs = self.manager._namingManager.string_to_component("rtcname://*/test.host_cxt/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+ rtcs = self.manager._namingManager.string_to_component("rtcname://*/*/TestComp20")
+ name = rtcs[0].get_component_profile().instance_name
+ self.assertEqual(name,"TestComp20")
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
More information about the openrtm-commit
mailing list