[openrtm-commit:01858] r697 - trunk/OpenRTM-aist-Python/OpenRTM_aist/test

openrtm @ openrtm.org openrtm @ openrtm.org
2016年 3月 16日 (水) 14:29:41 JST


Author: miyamoto
Date: 2016-03-16 14:29:41 +0900 (Wed, 16 Mar 2016)
New Revision: 697

Added:
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py
Modified:
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
Log:
[compat,test,->RELENG_1_2] Added testfiles.

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_ComponentActionListener.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -1,229 +1,248 @@
 #!/usr/bin/env python
 # -*- coding: euc-jp -*-
 
-##
-# @file test_ComponentActionListener.py
-# @brief test for ComponentActionListener class
-# @date $Date$
-# @author Shinji Kurihara
 #
-# Copyright (C) 2011
-#     Intelligent Systems Research Institute,
-#     National Institute of
-#         Advanced Industrial Science and Technology (AIST), Japan
-#     All rights reserved.
+# \file test_ComponentActionListener.py
+# \brief 
+# \date $Date: $
+# \author Nobuhiko Miyamoto
+#
 
 
 import sys
 sys.path.insert(1,"../")
 
-import unittest
+try:
+    import unittest2 as unittest
+except (ImportError):
+    import unittest
 
-from ComponentActionListener import *
+import time
+
+#from Manager import *
 import OpenRTM_aist
+import RTC, RTC__POA
+import OpenRTM, OpenRTM__POA
 
-class MockPreComponentActionListener(PreComponentActionListener):
-  def __init__(self):
-    PreComponentActionListener.__init__(self)
-    return
+testcomp1_spec = ["implementation_id", "TestComp1",
+                 "type_name",         "TestComp1",
+                 "description",       "Test example component",
+                 "version",           "1.0",
+                 "vendor",            "Nobuhiko Myiyamoto",
+                 "category",          "example",
+                 "activity_type",     "DataFlowComponent",
+                 "max_instance",      "10",
+                 "language",          "C++",
+                 "lang_type",         "compile",
+                 "conf.default.test1", "0",
+                 ""]
 
-  def __call__(self,id):
-    return id
+testcomp2_spec = ["implementation_id", "TestComp2",
+                 "type_name",         "TestComp2",
+                 "description",       "Test example component",
+                 "version",           "1.0",
+                 "vendor",            "Nobuhiko Myiyamoto",
+                 "category",          "example",
+                 "activity_type",     "DataFlowComponent",
+                 "max_instance",      "10",
+                 "language",          "C++",
+                 "lang_type",         "compile",
+                 ""]
 
-
-class MockPostComponentActionListener(PostComponentActionListener):
+class Test_i(OpenRTM__POA.InPortCdr):
   def __init__(self):
-    PostComponentActionListener.__init__(self)
-    return
+    pass
+  def put(self, data):
+    return OpenRTM.PORT_OK
 
-  def __call__(self,id,ret):
-    return id,ret
 
 
-class MockPortActionListener(PortActionListener):
-  def __init__(self):
-    PortActionListener.__init__(self)
-    return
+class TestPostListener(OpenRTM_aist.PostComponentActionListener):
+    def __init__(self, mes):
+        self.mes = mes
+    def __del__(self):
+        pass
+    def __call__(self, ec_id, ret):
+        print self.mes
+        
 
-  def __call__(self, pprof):
-    return pprof
+class TestPreListener(OpenRTM_aist.PreComponentActionListener):
+    def __init__(self, mes):
+        self.mes = mes
+    def __del__(self):
+        pass
+    def __call__(self, ec_id):
+        print self.mes
 
 
-class MockExecutionContextActionListener(ExecutionContextActionListener):
-  def __init__(self):
-    ExecutionContextActionListener.__init__(self)
-    return
+class TestComp1(OpenRTM_aist.DataFlowComponentBase):
+  def __init__(self, manager):
+    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
 
-  def __call__(self, ec_id):
-    return ec_id
+    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+    self._testService_provided = Test_i()
 
+    self._test1 = [0]
 
-class TestListener(unittest.TestCase):
-  def setUp(self):
-    return
-
-  def tearDown(self):
-    OpenRTM_aist.Manager.instance().shutdownManager()
-    return
-
-  def test_PreComponentActionListener_toString(self):
-    self.assertEqual("PRE_ON_INITIALIZE",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_INITIALIZE))
+  def onInitialize(self):
+    self.addOutPort("out",self._outOut)
+    self.addInPort("in",self._inIn)
     
-    self.assertEqual("PRE_ON_FINALIZE",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_FINALIZE))
+    self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
+    self.addPort(self._servicePort_provided)
+    
+    #self._servicePort_provided.activateInterfaces()
 
-    self.assertEqual("PRE_ON_STARTUP",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_STARTUP))
+    self.bindParameter("test1", self._test1, "0")
 
-    self.assertEqual("PRE_ON_SHUTDOWN",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_SHUTDOWN))
 
-    self.assertEqual("PRE_ON_ACTIVATED",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_ACTIVATED))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE, TestPreListener("RTC2:PRE_ON_INITIALIZE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE, TestPreListener("RTC1:PRE_ON_FINALIZE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STARTUP, TestPreListener("RTC1:PRE_ON_STARTUP"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_SHUTDOWN, TestPreListener("RTC1:PRE_ON_SHUTDOWN"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ACTIVATED, TestPreListener("RTC1:PRE_ON_ACTIVATED"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_DEACTIVATED, TestPreListener("RTC1:PRE_ON_DEACTIVATED"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ABORTING, TestPreListener("RTC1:PRE_ON_ABORTING"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ERROR, TestPreListener("RTC1:PRE_ON_ERROR"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RESET, TestPreListener("RTC1:PRE_ON_RESET"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_EXECUTE, TestPreListener("RTC1:PRE_ON_EXECUTE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STATE_UPDATE, TestPreListener("RTC1:PRE_ON_STATE_UPDATE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RATE_CHANGED, TestPreListener("RTC1:PRE_ON_RATE_CHANGED"))
+    
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_INITIALIZE, TestPostListener("RTC1:POST_ON_INITIALIZE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE, TestPostListener("RTC1:POST_ON_FINALIZE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STARTUP, TestPostListener("RTC1:POST_ON_STARTUP"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_SHUTDOWN, TestPostListener("RTC1:POST_ON_SHUTDOWN"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ACTIVATED, TestPostListener("RTC1:POST_ON_ACTIVATED"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_DEACTIVATED, TestPostListener("RTC1:POST_ON_DEACTIVATED"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ABORTING, TestPostListener("RTC1:POST_ON_ABORTING"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ERROR, TestPostListener("RTC1:POST_ON_ERROR"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RESET, TestPostListener("RTC1:POST_ON_RESET"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_EXECUTE, TestPostListener("RTC1:POST_ON_EXECUTE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STATE_UPDATE, TestPostListener("RTC1:POST_ON_STATE_UPDATE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RATE_CHANGED, TestPostListener("RTC1:POST_ON_RATE_CHANGED"))
 
-    self.assertEqual("PRE_ON_DEACTIVATED",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_DEACTIVATED))
+    
+        
 
-    self.assertEqual("PRE_ON_ABORTING",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_ABORTING))
+    return RTC.RTC_OK
 
-    self.assertEqual("PRE_ON_ERROR",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_ERROR))
+  
+  
+class TestComp2(OpenRTM_aist.DataFlowComponentBase):
+  def __init__(self, manager):
+    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
+    
+        
+    
+    
+    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+    self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
 
-    self.assertEqual("PRE_ON_RESET",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_RESET))
 
-    self.assertEqual("PRE_ON_EXECUTE",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_EXECUTE))
-
-    self.assertEqual("PRE_ON_STATE_UPDATE",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_STATE_UPDATE))
-
-    self.assertEqual("PRE_ON_RATE_CHANGED",
-                    PreComponentActionListener.toString(
-        PreComponentActionListenerType.PRE_ON_RATE_CHANGED))
-
+    
     return
-
-  def test_PostComponentActionListener_toString(self):
-    self.assertEqual("POST_ON_INITIALIZE",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_INITIALIZE))
+  
+  def onInitialize(self):
+    self.addInPort("in",self._inIn)
+    self.addOutPort("out",self._outOut)
     
-    self.assertEqual("POST_ON_FINALIZE",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_FINALIZE))
+    
+    
+    self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
+    self.addPort(self._servicePort_required)
 
-    self.assertEqual("POST_ON_STARTUP",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_STARTUP))
 
-    self.assertEqual("POST_ON_SHUTDOWN",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_SHUTDOWN))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE, TestPreListener("RTC2:PRE_ON_INITIALIZE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE, TestPreListener("RTC2:PRE_ON_FINALIZE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STARTUP, TestPreListener("RTC2:PRE_ON_STARTUP"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_SHUTDOWN, TestPreListener("RTC2:PRE_ON_SHUTDOWN"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ACTIVATED, TestPreListener("RTC2:PRE_ON_ACTIVATED"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_DEACTIVATED, TestPreListener("RTC2:PRE_ON_DEACTIVATED"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ABORTING, TestPreListener("RTC2:PRE_ON_ABORTING"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_ERROR, TestPreListener("RTC2:PRE_ON_ERROR"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RESET, TestPreListener("RTC2:PRE_ON_RESET"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_EXECUTE, TestPreListener("RTC2:PRE_ON_EXECUTE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_STATE_UPDATE, TestPreListener("RTC2:PRE_ON_STATE_UPDATE"))
+    self.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_RATE_CHANGED, TestPreListener("RTC2:PRE_ON_RATE_CHANGED"))
+    
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_INITIALIZE, TestPostListener("RTC2:POST_ON_INITIALIZE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE, TestPostListener("RTC2:POST_ON_FINALIZE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STARTUP, TestPostListener("RTC2:POST_ON_STARTUP"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_SHUTDOWN, TestPostListener("RTC2:POST_ON_SHUTDOWN"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ACTIVATED, TestPostListener("RTC2:POST_ON_ACTIVATED"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_DEACTIVATED, TestPostListener("RTC2:POST_ON_DEACTIVATED"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ABORTING, TestPostListener("RTC2:POST_ON_ABORTING"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_ERROR, TestPostListener("RTC2:POST_ON_ERROR"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RESET, TestPostListener("RTC2:POST_ON_RESET"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_EXECUTE, TestPostListener("RTC2:POST_ON_EXECUTE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_STATE_UPDATE, TestPostListener("RTC2:POST_ON_STATE_UPDATE"))
+    self.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_RATE_CHANGED, TestPostListener("RTC2:POST_ON_RATE_CHANGED"))
 
-    self.assertEqual("POST_ON_ACTIVATED",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_ACTIVATED))
+    
+    return RTC.RTC_OK
+    
 
-    self.assertEqual("POST_ON_DEACTIVATED",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_DEACTIVATED))
+ 
 
-    self.assertEqual("POST_ON_ABORTING",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_ABORTING))
+    
+def TestComp1Init(manager):
+  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
+  manager.registerFactory(profile,
+                          TestComp1,
+                          OpenRTM_aist.Delete)
 
-    self.assertEqual("POST_ON_ERROR",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_ERROR))
 
-    self.assertEqual("POST_ON_RESET",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_RESET))
+    
+def TestComp2Init(manager):
+  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
+  manager.registerFactory(profile,
+                          TestComp2,
+                          OpenRTM_aist.Delete)
+  
 
-    self.assertEqual("POST_ON_EXECUTE",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_EXECUTE))
+def MyModuleInit(manager):
+  TestComp1Init(manager)
+  TestComp2Init(manager)
+  com = manager.createComponent("TestComp1")
+  com = manager.createComponent("TestComp2")
+  
 
-    self.assertEqual("POST_ON_STATE_UPDATE",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_STATE_UPDATE))
 
-    self.assertEqual("POST_ON_RATE_CHANGED",
-                    PostComponentActionListener.toString(
-        PostComponentActionListenerType.POST_ON_RATE_CHANGED))
 
-    return
 
-  def test_PortActionListener_toString(self):
-    self.assertEqual("ADD_PORT",
-                    PortActionListener.toString(
-        PortActionListenerType.ADD_PORT))
+class test_ComponentActionListener(unittest.TestCase):
+  
+  def setUp(self):
+    self.manager = OpenRTM_aist.Manager.init(sys.argv)
+    self.manager.setModuleInitProc(MyModuleInit)
+    self.manager.activateManager()
+    
+    self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
+    self.comp2 = self.manager.getComponent("TestComp20").getObjRef()   
 
-    self.assertEqual("REMOVE_PORT",
-                    PortActionListener.toString(
-        PortActionListenerType.REMOVE_PORT))
+  def tearDown(self):
+    comps = self.manager.getComponents()[:]
+    for comp in comps:
+        self.manager.unregisterComponent(comp)
+        comp_id = comp.getProperties()
+        factory = self.manager._factory.find(comp_id)
+        factory.destroy(comp)
+    self.manager.shutdownNaming()
+    time.sleep(0.1)
 
-    return
+  def test_Component(self):
+    while(True):
+        time.sleep(10)
 
-  def test_ExecutionContextActionListener_toString(self):
-    self.assertEqual("ATTACH_EC",
-                    ExecutionContextActionListener.toString(
-        ExecutionContextActionListenerType.EC_ATTACHED))
-
-    self.assertEqual("DETACH_EC",
-                    ExecutionContextActionListener.toString(
-        ExecutionContextActionListenerType.EC_DETACHED))
-
-    return
-
-
-  def test_PreComponentActionListenerHolder(self):
-    preactions = ComponentActionListeners()
-    listener = MockPreComponentActionListener()
-    preactions.preaction_[0].addListener(listener,True)
-    preactions.preaction_[0].notify("test precomp ec_id")
-    preactions.preaction_[0].removeListener(listener)
-    return
-
-  def test_PostComponentActionListenerHolder(self):
-    postactions = ComponentActionListeners()
-    listener = MockPostComponentActionListener()
-    postactions.postaction_[0].addListener(listener,True)
-    postactions.postaction_[0].notify("test postcomp ec_id",True)
-    postactions.postaction_[0].removeListener(listener)
-    return
-
-  def test_PortActionListenerHolder(self):
-    portactions = ComponentActionListeners()
-    listener = MockPortActionListener()
-    portactions.portaction_[0].addListener(listener,True)
-    portactions.portaction_[0].notify("test port pprof")
-    portactions.portaction_[0].removeListener(listener)
-    return
-
-  def test_ExecutionContextActionListenerHolder(self):
-    ecactions = ComponentActionListeners()
-    listener = MockExecutionContextActionListener()
-    ecactions.ecaction_[0].addListener(listener,True)
-    ecactions.ecaction_[0].notify("test ec ec_id")
-    ecactions.ecaction_[0].removeListener(listener)
-    return
-
-
 ############### test #################
 if __name__ == '__main__':
-  unittest.main()
+        unittest.main()

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_CreateComponent_Slave.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -168,19 +168,29 @@
     
     time.sleep(0.1)
 
+
+
+
+
   def test_createComponent_slave(self):
     self.queue = multiprocessing.Queue()
     self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
     self.outport_process.start()
     self.queue.put("")
     time.sleep(1)
+    test_module_name = ["comp&test=0"]
+    test_value = self.manager.getManagerServant().get_parameter_by_modulename("test",test_module_name)
+    self.assertEqual(test_value,"0")
+    self.assertEqual(test_module_name[0],"comp")
     #rtc = self.manager.getManagerServant().create_component("TestComp1&manager_name=manager2")
     #rtc = self.manager.getManagerServant().create_component("TestComp2&manager_address=localhost:2810")
     rtc = self.manager.getManagerServant().create_component("TestComp2&manager_name=manager2")
     slave = self.manager.getManagerServant().findManager_by_name("manager2")
+    self.assertTrue(slave is not None)
     rtcs = slave.get_components_by_name("TestComp20")
     name = rtcs[0].get_component_profile().instance_name
     self.assertEqual(name,"TestComp20")
+    
     self.queue.put("")
     #rtc = self.manager.getManagerServant().create_component("TestComp2")
     #print rtc

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_DirectServicePort.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -66,6 +66,10 @@
     
                                       
   def test_Service(self):
+    
+    
+
+    
     prop = OpenRTM_aist.Properties()
     ret = OpenRTM_aist.connect("con1",prop,self._servicePort_provided.getPortRef(),self._servicePort_required.getPortRef())
     obj = self._testService_required._ptr()

Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py	                        (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_InPortSHMProvider.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+# -*- Python -*-
+
+
+#  \file test_InPortSHMProvider.py
+#  \brief test for InPortSHMProvider class
+#  \date $Date: 2007/09/20 $
+#  \author Nobuhiko Miyamoto
+# 
+
+ 
+
+from omniORB import *
+from omniORB import any
+
+import sys
+sys.path.insert(1,"../")
+
+import unittest
+
+import RTC, RTC__POA
+import OpenRTM
+import OpenRTM_aist
+
+
+class BufferMock:
+	def __init__(self):
+		self._data = None
+		return
+
+	def write(self, data):
+		self._data = data
+		return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+	def read(self, value):
+		if len(value) > 0:
+			value[0] = self._data
+		else:
+			value.append(self._data)
+		return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+	def full(self):
+		return False
+
+
+class ConnectorMock:
+	def __init__(self, buffer):
+		self._buffer = buffer
+		return
+
+	def write(self, data):
+	    self._buffer.write(data)
+	    return OpenRTM_aist.BufferStatus.BUFFER_OK
+
+
+
+class TestInPortSHMProvider(unittest.TestCase):
+	def setUp(self):
+		OpenRTM_aist.InPortSHMProviderInit()
+		OpenRTM_aist.CdrRingBufferInit()
+		self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("shared_memory")
+		self._inp  = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
+		self._orb  = OpenRTM_aist.Manager.instance().getORB()
+		self._buffer = BufferMock()
+		#self._shm = OpenRTM_aist.SharedMemory()
+		
+		#self._shm.setInterface(self._prov._this())
+		#self._shm.create_memory(1000,"test")
+		self._prov.create_memory(1000,"test")
+		return
+	
+	
+
+	
+
+	def test_put(self):
+		
+		self._con = ConnectorMock(self._buffer)
+		self._prov._connector = self._con
+		self._prov.setBuffer(self._buffer)
+		data = RTC.TimedLong(RTC.Time(0,0),123)
+		cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
+
+		self._prov.write(cdr)
+
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		self.assertEqual(self._prov.put(),OpenRTM.PORT_OK)
+		val = []
+		self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
+		get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
+		self.assertEqual(get_data.data, 123)
+		return
+
+
+
+############### test #################
+if __name__ == '__main__':
+        unittest.main()
+

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_node.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -121,7 +121,14 @@
       #print prop
       comp = self.manager.getComponent("TestComp11")
       self.assertTrue(comp is not None)
+
+      self.__dnp = OpenRTM_aist.NodeNumberingPolicy()
+      num = self.__dnp.onCreate(comp)
+      self.assertEqual(num,"2")
+
+
       
+      
     
 
 ############### test #################

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_NumberingPolicy_ns.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -118,7 +118,11 @@
 
   def test_getComponent(self):
       comp = self.manager.getComponent("TestComp11")
-      self.assertTrue(comp is not None)    
+      self.assertTrue(comp is not None)
+      
+      self.__dnp = OpenRTM_aist.NamingServiceNumberingPolicy()
+      num = self.__dnp.onCreate(comp)
+      self.assertEqual(num,"2")
 
 ############### test #################
 if __name__ == '__main__':

Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py	                        (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_OutPortSHMProvider.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- Python -*-
+
+#
+#  \file  test_OutPortSHMProvider.py
+#  \brief test for OutPortSHMProvider class
+#  \date  $Date: 2016/03/26 $
+#  \author Nobuhiko Miyamoto
+# 
+
+ 
+from omniORB import any
+from omniORB import CORBA
+
+import OpenRTM_aist
+import RTC, RTC__POA 
+import SDOPackage, SDOPackage__POA
+import OpenRTM
+
+import sys
+sys.path.insert(1,"../")
+
+import unittest
+
+
+
+class DummyBuffer:
+  def __init__(self):
+    self._cdr = None
+    self._empty = True
+
+  def empty(self):
+    return self._empty
+
+  def write(self,d):
+    self._cdr = d
+    self._empty = False
+    return 0
+
+  def read(self,cdr):
+    cdr[0] = self._cdr
+    self._empty = True
+    return 0
+
+class TestOutPortSHMProvider(unittest.TestCase):
+
+  def setUp(self):
+    OpenRTM_aist.Manager.instance()
+    OpenRTM_aist.OutPortSHMProviderInit()
+    self._opp = OpenRTM_aist.OutPortSHMProvider()
+
+
+    #self._shm = OpenRTM_aist.SharedMemory()
+    #self._shm_var = self._sh._this()
+    return
+    
+
+
+  def test_get(self):
+    prop = OpenRTM_aist.Properties()
+    self._opp.init(prop)
+    
+    ret=self._opp.get()
+    self.assertEqual(ret,OpenRTM.UNKNOWN_ERROR)
+
+    prop = OpenRTM_aist.Properties()
+    cinfo = OpenRTM_aist.ConnectorInfo("",
+                                       "",
+                                       [],
+                                       prop)
+    self._opp.setListener(cinfo,OpenRTM_aist.ConnectorListeners())
+    buff = DummyBuffer()
+    self._opp.setBuffer(buff)
+    ret=self._opp.get()
+    self.assertEqual(ret,OpenRTM.BUFFER_EMPTY)
+
+
+    
+    #self._opp.setInterface(self._shm_var)
+    
+
+    buff.write("abcde")
+    ret=self._opp.get()
+    self.assertEqual(ret,OpenRTM.PORT_OK)
+    data = self._opp.read()
+    self.assertEqual(data,"abcde")
+    return
+
+
+############### test #################
+if __name__ == '__main__':
+        unittest.main()

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_SharedMemory.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -22,9 +22,13 @@
 #from Manager import *
 import OpenRTM_aist
 import RTC, OpenRTM
+from omniORB import cdrMarshal
+from omniORB import cdrUnmarshal
+import CORBA
+import platform
+import os
 
 
-
   
 
 
@@ -57,9 +61,42 @@
   def tearDown(self):
     self.manager.shutdownManager()
 
+  def test_SharedMemory(self):
+      sh_read = OpenRTM_aist.SharedMemory()
+      sh_read_var = sh_read._this()
+      sh_write = OpenRTM_aist.SharedMemory()
+      sh_write_var = sh_write._this()
+      
+      sh_write.setInterface(sh_read_var)
+      
+      memsize = sh_write.string_to_MemorySize("1")
+      self.assertEqual(memsize, 1)
+      memsize = sh_write.string_to_MemorySize("1k")
+      self.assertEqual(memsize, 1024)
+      memsize = sh_write.string_to_MemorySize("1M")
+      self.assertEqual(memsize, 1024*1024)
+
+      sh_write.create_memory(1000,"test")
+      data_cdr = cdrMarshal(CORBA.TC_ulong, 100)
+      sh_write.write(data_cdr)
+      data_cdr = sh_read.read()
+      data = cdrUnmarshal(CORBA.TC_ulong, data_cdr)
+      self.assertEqual(data, 100)
+      if platform.system() == "Windows":
+          pass
+      else:
+          self.assertTrue(os.path.exists("/dev/shm/test"))
+      sh_write.close_memory(True)
+      if platform.system() == "Windows":
+          pass
+      else:
+          self.assertFalse(os.path.exists("/dev/shm/test"))
+          
+
   def test_Push(self):
     
     prop = OpenRTM_aist.Properties()
+    #prop.setProperty("dataport.shem_default_size","10k")
     prop.setProperty("dataport.interface_type","shared_memory")
     prop.setProperty("dataport.dataflow_type","push")
     ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
@@ -87,6 +124,7 @@
 
   def test_Pull(self):
     prop = OpenRTM_aist.Properties()
+    #prop.setProperty("dataport.shem_default_size","10k")
     prop.setProperty("dataport.interface_type","shared_memory")
     prop.setProperty("dataport.dataflow_type","pull")
     ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_Topic.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -58,8 +58,13 @@
 class TestComp1(OpenRTM_aist.DataFlowComponentBase):
   def __init__(self, manager):
     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
-    
+    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
+    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
 
+    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
+    self._Service_provided = Test_i()
+
+
     self._d_topic_out = RTC.TimedLong(RTC.Time(0,0),0)
     self._topic_outOut = OpenRTM_aist.OutPort("topic_out", self._d_topic_out)
     
@@ -67,6 +72,9 @@
     self._topic_Service_provided = Test_i()
 
   def onInitialize(self):
+    self.addOutPort("out",self._outOut)
+    self._servicePort_provided.registerProvider("service", "TestService", self._Service_provided)
+    self.addPort(self._servicePort_provided)
     
     self.addOutPort("topic_out",self._topic_outOut)
     self._topic_outOut.appendProperty("publish_topic","test")
@@ -83,8 +91,13 @@
 class TestComp2(OpenRTM_aist.DataFlowComponentBase):
   def __init__(self, manager):
     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
-    
+    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
+    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
 
+    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
+    self._Service_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
+
+
     self._d_topic_in = RTC.TimedLong(RTC.Time(0,0),0)
     self._topic_inIn = OpenRTM_aist.InPort("topic_in", self._d_topic_in)
     
@@ -96,8 +109,12 @@
     return
   
   def onInitialize(self):
-    
+    self.addInPort("in",self._inIn)
 
+    self._servicePort_required.registerConsumer("service", "TestService", self._Service_required)
+    self.addPort(self._servicePort_required)
+
+
     self.addInPort("topic_in",self._topic_inIn)
     self._topic_inIn.appendProperty("publish_topic","test")
     
@@ -139,7 +156,11 @@
 class test_Topic(unittest.TestCase):
   
   def setUp(self):
+    #sys.argv.extend(['-o','port.outport.topic_out.publish_topic:test2'])
+    #sys.argv.extend(['-o','port.inport.topic_in.publish_topic:test2'])
+    #sys.argv.extend(['-o','port.corbaport.topic_service.publish_topic:test2'])
     self.manager = OpenRTM_aist.Manager.init(sys.argv)
+    
     self.manager.setModuleInitProc(MyModuleInit)
     self.manager.activateManager()
     
@@ -160,7 +181,49 @@
     self.manager.shutdownNaming()
     time.sleep(0.1)
     
+  def test_port(self):
+    inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
+    outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
+
+    self.manager.connectDataPorts(inport, [outport])
+    ans = OpenRTM_aist.already_connected(inport, outport)
+    self.assertTrue(ans)
+
+
+    service_required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.service")
+    service_provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.service")
+    self.manager.connectServicePorts(service_required, [service_provided])
+    ans = OpenRTM_aist.already_connected(service_required, service_provided)
+    self.assertTrue(ans)
+
+    ports = self.manager.getPortsOnNameServers("dataports.port_cxt/test.topic_cxt","inport")
+    name = ports[0].get_port_profile().name
+    self.assertEqual(name, "TestComp20.topic_in")
+
+    orb = self.manager.getORB()
+    names = "localhost"
+    cns = OpenRTM_aist.CorbaNaming(orb,names)
+    bl = cns.listByKind("dataports.port_cxt/test.topic_cxt","inport")
+    name = bl[0].binding_name[0].id
+    self.assertEqual(name, "TestComp20.topic_in")
+
+
+    self._d_out = RTC.TimedOctetSeq(RTC.Time(0,0),[])
+    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
+    prop = OpenRTM_aist.Properties()
+    self._outOut.init(prop)
+
+    naming = self.manager.getNaming()
+    naming.bindPortObject("test.port",self._outOut)
+
+    port = cns.resolveStr("test.port")
+    self.assertTrue(port is not None)
     
+    naming = OpenRTM_aist.NamingOnCorba(orb, "localhost")
+    naming.bindPortObject("test2.port",self._outOut)
+    
+    port = cns.resolveStr("test2.port")
+    self.assertTrue(port is not None)
 
   def test_Topic(self):
     

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py	2016-03-16 05:28:55 UTC (rev 696)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/test/test_string_to_component.py	2016-03-16 05:29:41 UTC (rev 697)
@@ -135,23 +135,23 @@
   def test_getComponent(self):
       #mgr_sev = self.manager._mgrservant.getObjRef()
       #print mgr_sev.get_components_by_name("example/TestComp10")
-      rtcs = self.manager._namingManager.string_to_component("rtcloc://localhost:2810/example/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcloc://localhost:2810/example/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
-      rtcs = self.manager._namingManager.string_to_component("rtcloc://*/example/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcloc://*/example/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
-      rtcs = self.manager._namingManager.string_to_component("rtcloc://*/*/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcloc://*/*/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
       #print rtcs
-      rtcs = self.manager._namingManager.string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
-      rtcs = self.manager._namingManager.string_to_component("rtcname://*/test.host_cxt/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcname://*/test.host_cxt/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
-      rtcs = self.manager._namingManager.string_to_component("rtcname://*/*/TestComp20")
+      rtcs = self.manager.getNaming().string_to_component("rtcname://*/*/TestComp20")
       name = rtcs[0].get_component_profile().instance_name
       self.assertEqual(name,"TestComp20")
       



More information about the openrtm-commit mailing list