[openrtm-commit:01858] r697 - trunk/OpenRTM-aist-Python/OpenRTM_aist/test
openrtm @ openrtm.org
openrtm @ openrtm.org
2016年 3月 16日 (水) 14:29:41 JST
Author: miyamoto
Date: 2016-03-16 14:29:41 +0900 (Wed, 16 Mar 2016)
New Revision: 697
Added:
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py
Modified:
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
Log:
[compat,test,->RELENG_1_2] Added testfiles.
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -1,229 +1,248 @@
#!/usr/bin/env python
# -*- coding: euc-jp -*-
-##
-# @file test_ComponentActionListener.py
-# @brief test for ComponentActionListener class
-# @date $Date$
-# @author Shinji Kurihara
#
-# Copyright (C) 2011
-# Intelligent Systems Research Institute,
-# National Institute of
-# Advanced Industrial Science and Technology (AIST), Japan
-# All rights reserved.
+# \file test_ComponentActionListener.py
+# \brief
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
import sys
sys.path.insert(1,"../")
-import unittest
+try:
+ import unittest2 as unittest
+except (ImportError):
+ import unittest
-from ComponentActionListener import *
+import time
+
+#from Manager import *
import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
-class MockPreComponentActionListener(PreComponentActionListener):
- def __init__(self):
- PreComponentActionListener.__init__(self)
- return
+testcomp1_spec = ["implementation_id", "TestComp1",
+ "type_name", "TestComp1",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ "conf.default.test1", "0",
+ ""]
- def __call__(self,id):
- return id
+testcomp2_spec = ["implementation_id", "TestComp2",
+ "type_name", "TestComp2",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Nobuhiko Myiyamoto",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "C++",
+ "lang_type", "compile",
+ ""]
-
-class MockPostComponentActionListener(PostComponentActionListener):
+class Test_i(OpenRTM__POA.InPortCdr):
def __init__(self):
- PostComponentActionListener.__init__(self)
- return
+ pass
+ def put(self, data):
+ return OpenRTM.PORT_OK
- def __call__(self,id,ret):
- return id,ret
-class MockPortActionListener(PortActionListener):
- def __init__(self):
- PortActionListener.__init__(self)
- return
+class TestPostListener(OpenRTM_aist.PostComponentActionListener):
+ def __init__(self, mes):
+ self.mes = mes
+ def __del__(self):
+ pass
+ def __call__(self, ec_id, ret):
+ print self.mes
+
- def __call__(self, pprof):
- return pprof
+class TestPreListener(OpenRTM_aist.PreComponentActionListener):
+ def __init__(self, mes):
+ self.mes = mes
+ def __del__(self):
+ pass
+ def __call__(self, ec_id):
+ print self.mes
-class MockExecutionContextActionListener(ExecutionContextActionListener):
- def __init__(self):
- ExecutionContextActionListener.__init__(self)
- return
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
- def __call__(self, ec_id):
- return ec_id
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._testService_provided = Test_i()
+ self._test1 = [0]
-class TestListener(unittest.TestCase):
- def setUp(self):
- return
-
- def tearDown(self):
- OpenRTM_aist.Manager.instance().shutdownManager()
- return
-
- def test_PreComponentActionListener_toString(self):
- self.assertEqual("PRE_ON_INITIALIZE",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_INITIALIZE))
+ def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self.addInPort("in",self._inIn)
- self.assertEqual("PRE_ON_FINALIZE",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_FINALIZE))
+ self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+ self.addPort(self._servicePort_provided)
+
+ #self._servicePort_provided.activateInterfaces()
- self.assertEqual("PRE_ON_STARTUP",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_STARTUP))
+ self.bindParameter("test1", self._test1, "0")
- self.assertEqual("PRE_ON_SHUTDOWN",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_SHUTDOWN))
- self.assertEqual("PRE_ON_ACTIVATED",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_ACTIVATED))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE, TestPreListener("RTC2:PRE_ON_INITIALIZE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE, TestPreListener("RTC1:PRE_ON_FINALIZE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STARTUP, TestPreListener("RTC1:PRE_ON_STARTUP"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_SHUTDOWN, TestPreListener("RTC1:PRE_ON_SHUTDOWN"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ACTIVATED, TestPreListener("RTC1:PRE_ON_ACTIVATED"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_DEACTIVATED, TestPreListener("RTC1:PRE_ON_DEACTIVATED"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ABORTING, TestPreListener("RTC1:PRE_ON_ABORTING"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ERROR, TestPreListener("RTC1:PRE_ON_ERROR"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RESET, TestPreListener("RTC1:PRE_ON_RESET"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_EXECUTE, TestPreListener("RTC1:PRE_ON_EXECUTE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STATE_UPDATE, TestPreListener("RTC1:PRE_ON_STATE_UPDATE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RATE_CHANGED, TestPreListener("RTC1:PRE_ON_RATE_CHANGED"))
+
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_INITIALIZE, TestPostListener("RTC1:POST_ON_INITIALIZE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE, TestPostListener("RTC1:POST_ON_FINALIZE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STARTUP, TestPostListener("RTC1:POST_ON_STARTUP"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_SHUTDOWN, TestPostListener("RTC1:POST_ON_SHUTDOWN"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ACTIVATED, TestPostListener("RTC1:POST_ON_ACTIVATED"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_DEACTIVATED, TestPostListener("RTC1:POST_ON_DEACTIVATED"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ABORTING, TestPostListener("RTC1:POST_ON_ABORTING"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ERROR, TestPostListener("RTC1:POST_ON_ERROR"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RESET, TestPostListener("RTC1:POST_ON_RESET"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_EXECUTE, TestPostListener("RTC1:POST_ON_EXECUTE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STATE_UPDATE, TestPostListener("RTC1:POST_ON_STATE_UPDATE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RATE_CHANGED, TestPostListener("RTC1:POST_ON_RATE_CHANGED"))
- self.assertEqual("PRE_ON_DEACTIVATED",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_DEACTIVATED))
+
+
- self.assertEqual("PRE_ON_ABORTING",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_ABORTING))
+ return RTC.RTC_OK
- self.assertEqual("PRE_ON_ERROR",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_ERROR))
+
+
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+
+
+
+
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
- self.assertEqual("PRE_ON_RESET",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_RESET))
- self.assertEqual("PRE_ON_EXECUTE",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_EXECUTE))
-
- self.assertEqual("PRE_ON_STATE_UPDATE",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_STATE_UPDATE))
-
- self.assertEqual("PRE_ON_RATE_CHANGED",
- PreComponentActionListener.toString(
- PreComponentActionListenerType.PRE_ON_RATE_CHANGED))
-
+
return
-
- def test_PostComponentActionListener_toString(self):
- self.assertEqual("POST_ON_INITIALIZE",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_INITIALIZE))
+
+ def onInitialize(self):
+ self.addInPort("in",self._inIn)
+ self.addOutPort("out",self._outOut)
- self.assertEqual("POST_ON_FINALIZE",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_FINALIZE))
+
+
+ self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+ self.addPort(self._servicePort_required)
- self.assertEqual("POST_ON_STARTUP",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_STARTUP))
- self.assertEqual("POST_ON_SHUTDOWN",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_SHUTDOWN))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE, TestPreListener("RTC2:PRE_ON_INITIALIZE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE, TestPreListener("RTC2:PRE_ON_FINALIZE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STARTUP, TestPreListener("RTC2:PRE_ON_STARTUP"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_SHUTDOWN, TestPreListener("RTC2:PRE_ON_SHUTDOWN"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ACTIVATED, TestPreListener("RTC2:PRE_ON_ACTIVATED"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_DEACTIVATED, TestPreListener("RTC2:PRE_ON_DEACTIVATED"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ABORTING, TestPreListener("RTC2:PRE_ON_ABORTING"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ERROR, TestPreListener("RTC2:PRE_ON_ERROR"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RESET, TestPreListener("RTC2:PRE_ON_RESET"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_EXECUTE, TestPreListener("RTC2:PRE_ON_EXECUTE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STATE_UPDATE, TestPreListener("RTC2:PRE_ON_STATE_UPDATE"))
+ self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RATE_CHANGED, TestPreListener("RTC2:PRE_ON_RATE_CHANGED"))
+
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_INITIALIZE, TestPostListener("RTC2:POST_ON_INITIALIZE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE, TestPostListener("RTC2:POST_ON_FINALIZE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STARTUP, TestPostListener("RTC2:POST_ON_STARTUP"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_SHUTDOWN, TestPostListener("RTC2:POST_ON_SHUTDOWN"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ACTIVATED, TestPostListener("RTC2:POST_ON_ACTIVATED"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_DEACTIVATED, TestPostListener("RTC2:POST_ON_DEACTIVATED"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ABORTING, TestPostListener("RTC2:POST_ON_ABORTING"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ERROR, TestPostListener("RTC2:POST_ON_ERROR"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RESET, TestPostListener("RTC2:POST_ON_RESET"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_EXECUTE, TestPostListener("RTC2:POST_ON_EXECUTE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STATE_UPDATE, TestPostListener("RTC2:POST_ON_STATE_UPDATE"))
+ self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RATE_CHANGED, TestPostListener("RTC2:POST_ON_RATE_CHANGED"))
- self.assertEqual("POST_ON_ACTIVATED",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_ACTIVATED))
+
+ return RTC.RTC_OK
+
- self.assertEqual("POST_ON_DEACTIVATED",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_DEACTIVATED))
+
- self.assertEqual("POST_ON_ABORTING",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_ABORTING))
+
+def TestComp1Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+ manager.registerFactory(profile,
+ TestComp1,
+ OpenRTM_aist.Delete)
- self.assertEqual("POST_ON_ERROR",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_ERROR))
- self.assertEqual("POST_ON_RESET",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_RESET))
+
+def TestComp2Init(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+ manager.registerFactory(profile,
+ TestComp2,
+ OpenRTM_aist.Delete)
+
- self.assertEqual("POST_ON_EXECUTE",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_EXECUTE))
+def MyModuleInit(manager):
+ TestComp1Init(manager)
+ TestComp2Init(manager)
+ com = manager.createComponent("TestComp1")
+ com = manager.createComponent("TestComp2")
+
- self.assertEqual("POST_ON_STATE_UPDATE",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_STATE_UPDATE))
- self.assertEqual("POST_ON_RATE_CHANGED",
- PostComponentActionListener.toString(
- PostComponentActionListenerType.POST_ON_RATE_CHANGED))
- return
- def test_PortActionListener_toString(self):
- self.assertEqual("ADD_PORT",
- PortActionListener.toString(
- PortActionListenerType.ADD_PORT))
+class test_ComponentActionListener(unittest.TestCase):
+
+ def setUp(self):
+ self.manager = OpenRTM_aist.Manager.init(sys.argv)
+ self.manager.setModuleInitProc(MyModuleInit)
+ self.manager.activateManager()
+
+ self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
+ self.comp2 = self.manager.getComponent("TestComp20").getObjRef()
- self.assertEqual("REMOVE_PORT",
- PortActionListener.toString(
- PortActionListenerType.REMOVE_PORT))
+ def tearDown(self):
+ comps = self.manager.getComponents()[:]
+ for comp in comps:
+ self.manager.unregisterComponent(comp)
+ comp_id = comp.getProperties()
+ factory = self.manager._factory.find(comp_id)
+ factory.destroy(comp)
+ self.manager.shutdownNaming()
+ time.sleep(0.1)
- return
+ def test_Component(self):
+ while(True):
+ time.sleep(10)
- def test_ExecutionContextActionListener_toString(self):
- self.assertEqual("ATTACH_EC",
- ExecutionContextActionListener.toString(
- ExecutionContextActionListenerType.EC_ATTACHED))
-
- self.assertEqual("DETACH_EC",
- ExecutionContextActionListener.toString(
- ExecutionContextActionListenerType.EC_DETACHED))
-
- return
-
-
- def test_PreComponentActionListenerHolder(self):
- preactions = ComponentActionListeners()
- listener = MockPreComponentActionListener()
- preactions.preaction_[0].addListener(listener,True)
- preactions.preaction_[0].notify("test precomp ec_id")
- preactions.preaction_[0].removeListener(listener)
- return
-
- def test_PostComponentActionListenerHolder(self):
- postactions = ComponentActionListeners()
- listener = MockPostComponentActionListener()
- postactions.postaction_[0].addListener(listener,True)
- postactions.postaction_[0].notify("test postcomp ec_id",True)
- postactions.postaction_[0].removeListener(listener)
- return
-
- def test_PortActionListenerHolder(self):
- portactions = ComponentActionListeners()
- listener = MockPortActionListener()
- portactions.portaction_[0].addListener(listener,True)
- portactions.portaction_[0].notify("test port pprof")
- portactions.portaction_[0].removeListener(listener)
- return
-
- def test_ExecutionContextActionListenerHolder(self):
- ecactions = ComponentActionListeners()
- listener = MockExecutionContextActionListener()
- ecactions.ecaction_[0].addListener(listener,True)
- ecactions.ecaction_[0].notify("test ec ec_id")
- ecactions.ecaction_[0].removeListener(listener)
- return
-
-
############### test #################
if __name__ == '__main__':
- unittest.main()
+ unittest.main()
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -168,19 +168,29 @@
time.sleep(0.1)
+
+
+
+
def test_createComponent_slave(self):
self.queue = multiprocessing.Queue()
self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
self.outport_process.start()
self.queue.put("")
time.sleep(1)
+ test_module_name = ["comp&test=0"]
+ test_value = self.manager.getManagerServant().get_parameter_by_modulename("test",test_module_name)
+ self.assertEqual(test_value,"0")
+ self.assertEqual(test_module_name[0],"comp")
#rtc = self.manager.getManagerServant().create_component("TestComp1&manager_name=manager2")
#rtc = self.manager.getManagerServant().create_component("TestComp2&manager_address=localhost:2810")
rtc = self.manager.getManagerServant().create_component("TestComp2&manager_name=manager2")
slave = self.manager.getManagerServant().findManager_by_name("manager2")
+ self.assertTrue(slave is not None)
rtcs = slave.get_components_by_name("TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
+
self.queue.put("")
#rtc = self.manager.getManagerServant().create_component("TestComp2")
#print rtc
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -66,6 +66,10 @@
def test_Service(self):
+
+
+
+
prop = OpenRTM_aist.Properties()
ret = OpenRTM_aist.connect("con1",prop,self._servicePort_provided.getPortRef(),self._servicePort_required.getPortRef())
obj = self._testService_required._ptr()
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+# -*- Python -*-
+
+
+# \file test_InPortSHMProvider.py
+# \brief test for InPortSHMProvider class
+# \date $Date: 2007/09/20 $
+# \author Nobuhiko Miyamoto
+#
+
+
+
+from omniORB import *
+from omniORB import any
+
+import sys
+sys.path.insert(1,"../")
+
+import unittest
+
+import RTC, RTC__POA
+import OpenRTM
+import OpenRTM_aist
+
+
+class BufferMock:
+ def __init__(self):
+ self._data = None
+ return
+
+ def write(self, data):
+ self._data = data
+ return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+ def read(self, value):
+ if len(value) > 0:
+ value[0] = self._data
+ else:
+ value.append(self._data)
+ return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+ def full(self):
+ return False
+
+
+class ConnectorMock:
+ def __init__(self, buffer):
+ self._buffer = buffer
+ return
+
+ def write(self, data):
+ self._buffer.write(data)
+ return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+
+
+class TestInPortSHMProvider(unittest.TestCase):
+ def setUp(self):
+ OpenRTM_aist.InPortSHMProviderInit()
+ OpenRTM_aist.CdrRingBufferInit()
+ self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("shared_memory")
+ self._inp = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
+ self._orb = OpenRTM_aist.Manager.instance().getORB()
+ self._buffer = BufferMock()
+ #self._shm = OpenRTM_aist.SharedMemory()
+
+ #self._shm.setInterface(self._prov._this())
+ #self._shm.create_memory(1000,"test")
+ self._prov.create_memory(1000,"test")
+ return
+
+
+
+
+
+ def test_put(self):
+
+ self._con = ConnectorMock(self._buffer)
+ self._prov._connector = self._con
+ self._prov.setBuffer(self._buffer)
+ data = RTC.TimedLong(RTC.Time(0,0),123)
+ cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
+
+ self._prov.write(cdr)
+
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+ val = []
+ self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
+ get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
+ self.assertEqual(get_data.data, 123)
+ return
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
+
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -121,7 +121,14 @@
#print prop
comp = self.manager.getComponent("TestComp11")
self.assertTrue(comp is not None)
+
+ self.__dnp = OpenRTM_aist.NodeNumberingPolicy()
+ num = self.__dnp.onCreate(comp)
+ self.assertEqual(num,"2")
+
+
+
############### test #################
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -118,7 +118,11 @@
def test_getComponent(self):
comp = self.manager.getComponent("TestComp11")
- self.assertTrue(comp is not None)
+ self.assertTrue(comp is not None)
+
+ self.__dnp = OpenRTM_aist.NamingServiceNumberingPolicy()
+ num = self.__dnp.onCreate(comp)
+ self.assertEqual(num,"2")
############### test #################
if __name__ == '__main__':
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- Python -*-
+
+#
+# \file test_OutPortSHMProvider.py
+# \brief test for OutPortSHMProvider class
+# \date $Date: 2016/03/26 $
+# \author Nobuhiko Miyamoto
+#
+
+
+from omniORB import any
+from omniORB import CORBA
+
+import OpenRTM_aist
+import RTC, RTC__POA
+import SDOPackage, SDOPackage__POA
+import OpenRTM
+
+import sys
+sys.path.insert(1,"../")
+
+import unittest
+
+
+
+class DummyBuffer:
+ def __init__(self):
+ self._cdr = None
+ self._empty = True
+
+ def empty(self):
+ return self._empty
+
+ def write(self,d):
+ self._cdr = d
+ self._empty = False
+ return 0
+
+ def read(self,cdr):
+ cdr[0] = self._cdr
+ self._empty = True
+ return 0
+
+class TestOutPortSHMProvider(unittest.TestCase):
+
+ def setUp(self):
+ OpenRTM_aist.Manager.instance()
+ OpenRTM_aist.OutPortSHMProviderInit()
+ self._opp = OpenRTM_aist.OutPortSHMProvider()
+
+
+ #self._shm = OpenRTM_aist.SharedMemory()
+ #self._shm_var = self._sh._this()
+ return
+
+
+
+ def test_get(self):
+ prop = OpenRTM_aist.Properties()
+ self._opp.init(prop)
+
+ ret=self._opp.get()
+ self.assertEqual(ret,OpenRTM.UNKNOWN_ERROR)
+
+ prop = OpenRTM_aist.Properties()
+ cinfo = OpenRTM_aist.ConnectorInfo("",
+ "",
+ [],
+ prop)
+ self._opp.setListener(cinfo,OpenRTM_aist.ConnectorListeners())
+ buff = DummyBuffer()
+ self._opp.setBuffer(buff)
+ ret=self._opp.get()
+ self.assertEqual(ret,OpenRTM.BUFFER_EMPTY)
+
+
+
+ #self._opp.setInterface(self._shm_var)
+
+
+ buff.write("abcde")
+ ret=self._opp.get()
+ self.assertEqual(ret,OpenRTM.PORT_OK)
+ data = self._opp.read()
+ self.assertEqual(data,"abcde")
+ return
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -22,9 +22,13 @@
#from Manager import *
import OpenRTM_aist
import RTC, OpenRTM
+from omniORB import cdrMarshal
+from omniORB import cdrUnmarshal
+import CORBA
+import platform
+import os
-
@@ -57,9 +61,42 @@
def tearDown(self):
self.manager.shutdownManager()
+ def test_SharedMemory(self):
+ sh_read = OpenRTM_aist.SharedMemory()
+ sh_read_var = sh_read._this()
+ sh_write = OpenRTM_aist.SharedMemory()
+ sh_write_var = sh_write._this()
+
+ sh_write.setInterface(sh_read_var)
+
+ memsize = sh_write.string_to_MemorySize("1")
+ self.assertEqual(memsize, 1)
+ memsize = sh_write.string_to_MemorySize("1k")
+ self.assertEqual(memsize, 1024)
+ memsize = sh_write.string_to_MemorySize("1M")
+ self.assertEqual(memsize, 1024*1024)
+
+ sh_write.create_memory(1000,"test")
+ data_cdr = cdrMarshal(CORBA.TC_ulong, 100)
+ sh_write.write(data_cdr)
+ data_cdr = sh_read.read()
+ data = cdrUnmarshal(CORBA.TC_ulong, data_cdr)
+ self.assertEqual(data, 100)
+ if platform.system() == "Windows":
+ pass
+ else:
+ self.assertTrue(os.path.exists("/dev/shm/test"))
+ sh_write.close_memory(True)
+ if platform.system() == "Windows":
+ pass
+ else:
+ self.assertFalse(os.path.exists("/dev/shm/test"))
+
+
def test_Push(self):
prop = OpenRTM_aist.Properties()
+ #prop.setProperty("dataport.shem_default_size","10k")
prop.setProperty("dataport.interface_type","shared_memory")
prop.setProperty("dataport.dataflow_type","push")
ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
@@ -87,6 +124,7 @@
def test_Pull(self):
prop = OpenRTM_aist.Properties()
+ #prop.setProperty("dataport.shem_default_size","10k")
prop.setProperty("dataport.interface_type","shared_memory")
prop.setProperty("dataport.dataflow_type","pull")
ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -58,8 +58,13 @@
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
def __init__(self, manager):
OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
-
+ self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+ self._Service_provided = Test_i()
+
+
self._d_topic_out = RTC.TimedLong(RTC.Time(0,0),0)
self._topic_outOut = OpenRTM_aist.OutPort("topic_out", self._d_topic_out)
@@ -67,6 +72,9 @@
self._topic_Service_provided = Test_i()
def onInitialize(self):
+ self.addOutPort("out",self._outOut)
+ self._servicePort_provided.registerProvider("service", "TestService", self._Service_provided)
+ self.addPort(self._servicePort_provided)
self.addOutPort("topic_out",self._topic_outOut)
self._topic_outOut.appendProperty("publish_topic","test")
@@ -83,8 +91,13 @@
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
def __init__(self, manager):
OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
-
+ self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+ self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+ self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+ self._Service_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
self._d_topic_in = RTC.TimedLong(RTC.Time(0,0),0)
self._topic_inIn = OpenRTM_aist.InPort("topic_in", self._d_topic_in)
@@ -96,8 +109,12 @@
return
def onInitialize(self):
-
+ self.addInPort("in",self._inIn)
+ self._servicePort_required.registerConsumer("service", "TestService", self._Service_required)
+ self.addPort(self._servicePort_required)
+
+
self.addInPort("topic_in",self._topic_inIn)
self._topic_inIn.appendProperty("publish_topic","test")
@@ -139,7 +156,11 @@
class test_Topic(unittest.TestCase):
def setUp(self):
+ #sys.argv.extend(['-o','port.outport.topic_out.publish_topic:test2'])
+ #sys.argv.extend(['-o','port.inport.topic_in.publish_topic:test2'])
+ #sys.argv.extend(['-o','port.corbaport.topic_service.publish_topic:test2'])
self.manager = OpenRTM_aist.Manager.init(sys.argv)
+
self.manager.setModuleInitProc(MyModuleInit)
self.manager.activateManager()
@@ -160,7 +181,49 @@
self.manager.shutdownNaming()
time.sleep(0.1)
+ def test_port(self):
+ inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
+ outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
+
+ self.manager.connectDataPorts(inport, [outport])
+ ans = OpenRTM_aist.already_connected(inport, outport)
+ self.assertTrue(ans)
+
+
+ service_required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.service")
+ service_provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.service")
+ self.manager.connectServicePorts(service_required, [service_provided])
+ ans = OpenRTM_aist.already_connected(service_required, service_provided)
+ self.assertTrue(ans)
+
+ ports = self.manager.getPortsOnNameServers("dataports.port_cxt/test.topic_cxt","inport")
+ name = ports[0].get_port_profile().name
+ self.assertEqual(name, "TestComp20.topic_in")
+
+ orb = self.manager.getORB()
+ names = "localhost"
+ cns = OpenRTM_aist.CorbaNaming(orb,names)
+ bl = cns.listByKind("dataports.port_cxt/test.topic_cxt","inport")
+ name = bl[0].binding_name[0].id
+ self.assertEqual(name, "TestComp20.topic_in")
+
+
+ self._d_out = RTC.TimedOctetSeq(RTC.Time(0,0),[])
+ self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+ prop = OpenRTM_aist.Properties()
+ self._outOut.init(prop)
+
+ naming = self.manager.getNaming()
+ naming.bindPortObject("test.port",self._outOut)
+
+ port = cns.resolveStr("test.port")
+ self.assertTrue(port is not None)
+ naming = OpenRTM_aist.NamingOnCorba(orb, "localhost")
+ naming.bindPortObject("test2.port",self._outOut)
+
+ port = cns.resolveStr("test2.port")
+ self.assertTrue(port is not None)
def test_Topic(self):
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py 2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py 2016-03-16 05:29:41 UTC (rev 697)
@@ -135,23 +135,23 @@
def test_getComponent(self):
#mgr_sev = self.manager._mgrservant.getObjRef()
#print mgr_sev.get_components_by_name("example/TestComp10")
- rtcs = self.manager._namingManager.string_to_component("rtcloc://localhost:2810/example/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcloc://localhost:2810/example/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
- rtcs = self.manager._namingManager.string_to_component("rtcloc://*/example/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcloc://*/example/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
- rtcs = self.manager._namingManager.string_to_component("rtcloc://*/*/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcloc://*/*/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
#print rtcs
- rtcs = self.manager._namingManager.string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
- rtcs = self.manager._namingManager.string_to_component("rtcname://*/test.host_cxt/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcname://*/test.host_cxt/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
- rtcs = self.manager._namingManager.string_to_component("rtcname://*/*/TestComp20")
+ rtcs = self.manager.getNaming().string_to_component("rtcname://*/*/TestComp20")
name = rtcs[0].get_component_profile().instance_name
self.assertEqual(name,"TestComp20")
More information about the openrtm-commit
mailing list