[openrtm-commit:00761] r524 - in branches/work/OpenRTM-aist-Python/OpenRTM_aist: . test

openrtm @ openrtm.org openrtm @ openrtm.org
2012年 3月 12日 (月) 22:00:54 JST


Author: kurihara
Date: 2012-03-12 22:00:54 +0900 (Mon, 12 Mar 2012)
New Revision: 524

Added:
   branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py
   branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py
Log:
[incompat,impl,func] RTObjectStateMachine class was implemented. refs #2343

Added: branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py
===================================================================
--- branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py	                        (rev 0)
+++ branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py	2012-03-12 13:00:54 UTC (rev 524)
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file RTObjectStateMachine.py
+# @brief ExecutionContext's state machine worker class
+# @date $Date$
+# @author Noriaki Ando <n-ando at aist.go.jp> and Shinji Kurihara
+#
+# Copyright (C) 2011
+#     Noriaki Ando
+#     Intelligent Systems Research Institute,
+#     National Institute of
+#         Advanced Industrial Science and Technology (AIST), Japan
+#     All rights reserved.
+#
+# $Id$
+#
+
+from omniORB import CORBA, PortableServer
+
+import OpenRTM_aist
+import RTC
+
+NUM_OF_LIFECYCLESTATE = 4
+
+class RTObjectStateMachine:
+  """
+  """
+
+  # RTObjectStateMachine(RTC::ExecutionContextHandle_t id,
+  #                      RTC::LightweightRTObject_ptr comp);
+  def __init__(self, id, comp):
+    global NUM_OF_LIFECYCLESTATE
+    self._id = id
+    self._rtobj = comp
+    self._sm = OpenRTM_aist.StateMachine(NUM_OF_LIFECYCLESTATE)
+    self._ca   = False
+    self._dfc  = False
+    self._fsm  = False
+    self._mode = False
+    self._caVar   = None
+    self._dfcVar  = None
+    self._fsmVar  = None
+    self._modeVar = None
+
+    # Setting Action callback objects
+    self.setComponentAction(comp)
+    self.setDataFlowComponentAction(comp)
+    self.setFsmParticipantAction(comp)
+    self.setMultiModeComponentAction(comp)
+    # Setting callback functions to StateMachine
+    self._sm.setListener(self)
+    self._sm.setEntryAction (RTC.ACTIVE_STATE,
+                             self.onActivated)
+    self._sm.setDoAction    (RTC.ACTIVE_STATE,
+                             self.onExecute)
+    self._sm.setPostDoAction(RTC.ACTIVE_STATE,
+                             self.onStateUpdate)
+    self._sm.setExitAction  (RTC.ACTIVE_STATE,
+                             self.onDeactivated)
+    self._sm.setEntryAction (RTC.ERROR_STATE,
+                             self.onAborting)
+    self._sm.setDoAction    (RTC.ERROR_STATE,
+                             self.onError)
+    self._sm.setExitAction  (RTC.ERROR_STATE,
+                             self.onReset)
+    # Setting inital state
+    st = OpenRTM_aist.StateHolder()
+    st.prev = RTC.INACTIVE_STATE
+    st.curr = RTC.INACTIVE_STATE
+    st.next = RTC.INACTIVE_STATE
+    self._sm.setStartState(st)
+    self._sm.goTo(RTC.INACTIVE_STATE)
+    return
+
+
+  def __del__(self):
+    if self._ca:
+      self._ca = False
+      self._caVar = None
+
+    if self._dfc:
+      self._dfc = False
+      self._dfcVar = None
+
+    if self._fsm:
+      self._fsm = False
+      self._fsmVar = None
+
+    if self._mode:
+      self._mode = False
+      self._modeVar = None
+
+    return
+
+
+  # functions for stored RTObject reference
+  # RTC::LightweightRTObject_ptr getRTObject();
+  def getRTObject(self):
+    return self._rtobj
+
+  # bool isEquivalent(RTC::LightweightRTObject_ptr comp);
+  def isEquivalent(self, comp):
+    return self._rtobj._is_equivalent(comp)
+
+  # RTC::ExecutionContextHandle_t getExecutionContextHandle();
+  def getExecutionContextHandle(self):
+    return self._id
+
+  # RTC::ComponentAction operations
+  # void onStartup(void);
+  def onStartup(self):
+    if not self._ca:
+      return
+    self._caVar.on_startup(self._id)
+    return
+
+  # void onShutdown(void);
+  def onShutdown(self):
+    if not self._ca:
+      return
+    self._caVar.on_shutdown(self._id)
+    return
+
+  # void onActivated(const ExecContextStates& st);
+  def onActivated(self, st):
+    if not self._ca:
+      return
+    if self._caVar.on_activated(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+
+  # void onDeactivated(const ExecContextStates& st);
+  def onDeactivated(self, st):
+    if not self._ca:
+      return
+    self._caVar.on_deactivated(self._id)
+    return
+
+  # void onAborting(const ExecContextStates& st);
+  def onAborting(self, st):
+    if not self._ca:
+      return
+    self._caVar.on_error(self._id)
+    return
+
+  # void onError(const ExecContextStates& st);
+  def onError(self, st):
+    if not self._ca:
+      return
+    self._caVar.on_error(self._id)
+    return
+
+  # void onReset(const ExecContextStates& st);
+  def onReset(self, st):
+    if not self._ca:
+      return
+    if self._caVar.on_reset(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+
+  # RTC::DataflowComponentAction
+  # void onExecute(const ExecContextStates& st);
+  def onExecute(self, st):
+    if not self._dfc:
+      return
+    
+    if self._dfcVar.on_execute(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+
+  # void onStateUpdate(const ExecContextStates& st);
+  def onStateUpdate(self, st):
+    if not self._dfc:
+      return
+    
+    if self._dfcVar.on_state_update(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+
+  # void onRateChanged(void);
+  def onRateChanged(self):
+    if not self._dfc:
+      return
+    
+    if self._dfcVar.on_rate_changed(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+
+  # FsmParticipantAction
+  # void onAction(const ExecContextStates& st);
+  def onAction(self, st):
+    if not self._fsm:
+      return
+
+    if self._fsmVar.on_action(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+  
+  # MultiModeComponentAction
+  # void onModeChanged(const ExecContextStates& st);
+  def onModeChanged(self, st):
+    if not self._mode:
+      return
+
+    if self._modeVar.on_mode_changed(self._id) != RTC.RTC_OK:
+      self._sm.goTo(RTC.ERROR_STATE)
+    return
+  
+  # Getting state of the context
+  # ExecContextState getState(void);
+  def getState(self):
+    return self._sm.getState()
+
+  # ExecContextStates getStates(void);
+  def getStates(self):
+    return self._sm.getStates()
+
+  # bool isCurrentState(ExecContextState state);
+  def isCurrentState(self, state):
+    return self.getState() == state
+
+  # bool isNextState(ExecContextState state);
+  def isNextState(self, state):
+    return self._sm.getStates().next == state
+
+  # void goTo(ExecContextState state);
+  def goTo(self, state):
+    self._sm.goTo(state)
+    return
+    
+  # Workers
+  # void workerPreDo(void);
+  def workerPreDo(self):
+    return self._sm.worker_pre()
+
+  # void workerDo(void);
+  def workerDo(self):
+    return self._sm.worker_do()
+
+  # void workerPostDo(void);
+  def workerPostDo(self):
+    return self._sm.worker_post()
+
+  # void setComponentAction(const RTC::LightweightRTObject_ptr comp);
+  def setComponentAction(self, comp):
+    self._caVar = comp._narrow(RTC.ComponentAction)
+    if not CORBA.is_nil(self._caVar):
+      self._ca = True
+    return
+
+  # void setDataFlowComponentAction(const RTC::LightweightRTObject_ptr comp);
+  def setDataFlowComponentAction(self, comp):
+    self._dfcVar = comp._narrow(RTC.DataFlowComponentAction)
+    if not CORBA.is_nil(self._dfcVar):
+      self._dfc = True
+    return
+
+  # void setFsmParticipantAction(const RTC::LightweightRTObject_ptr comp);
+  def setFsmParticipantAction(self, comp):
+    self._fsmVar = comp._narrow(RTC.FsmParticipantAction)
+    if not CORBA.is_nil(self._fsmVar):
+      self._fsm = True
+    return
+
+  # void setMultiModeComponentAction(const RTC::LightweightRTObject_ptr comp);
+  def setMultiModeComponentAction(self, comp):
+    self._modeVar = comp._narrow(RTC.MultiModeComponentAction)
+    if not CORBA.is_nil(self._modeVar):
+      self._mode = True
+    return

Added: branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py
===================================================================
--- branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py	                        (rev 0)
+++ branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py	2012-03-12 13:00:54 UTC (rev 524)
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file test_RTObjectStateMachine.py
+# @brief test for ExecutionContext's state machine worker class
+# @date $Date$
+# @author Shinji Kurihara
+#
+# Copyright (C) 2011
+#     Noriaki Ando
+#     Intelligent Systems Research Institute,
+#     National Institute of
+#         Advanced Industrial Science and Technology (AIST), Japan
+#     All rights reserved.
+#
+# $Id$
+#
+
+import sys
+sys.path.insert(1,"../")
+sys.path.insert(1,"../RTM_IDL")
+
+import unittest
+
+from RTObjectStateMachine import *
+import OpenRTM__POA, RTC
+import OpenRTM_aist
+
+
+testcomp_spec = ["implementation_id", "TestComp",
+                 "type_name",         "TestComp",
+                 "description",       "Test example component",
+                 "version",           "1.0",
+                 "vendor",            "Shinji Kurihara, AIST",
+                 "category",          "example",
+                 "activity_type",     "DataFlowComponent",
+                 "max_instance",      "10",
+                 "language",          "Python",
+                 "lang_type",         "compile",
+                 ""]
+
+class TestComp(OpenRTM_aist.DataFlowComponentBase):
+  def __init_(self, manager):
+    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+    
+def TestCompInit(manager):
+  global com
+  profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
+  manager.registerFactory(profile,
+        TestComp,
+        OpenRTM_aist.Delete)
+
+class TestRTObjectStateMachine(unittest.TestCase):
+  """
+  """
+
+  def setUp(self):
+    self._mgr = OpenRTM_aist.Manager.instance()
+    self._mgr.activateManager()
+    profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
+    self._mgr.registerFactory(profile,
+                              TestComp,
+                              OpenRTM_aist.Delete)
+    self._comp = self._mgr.createComponent("TestComp")
+    self._rtsm = RTObjectStateMachine(0, self._comp.getObjRef())
+
+  def tearDown(self):
+    OpenRTM_aist.Manager.instance().shutdownManager()
+    return
+
+  def test_getRTObject(self):
+    self.assertEqual(self._comp.getObjRef(), self._rtsm.getRTObject())
+    return
+
+  def test_isEquivalent(self):
+    self.assertEqual(True, self._rtsm.isEquivalent(self._comp.getObjRef()))
+    return
+
+  def test_getExecutionContextHandle(self):
+    self.assertEqual(0, self._rtsm.getExecutionContextHandle())
+    return
+
+  def test_ComponentActionOperations(self):
+    self._rtsm.onStartup()
+    self._rtsm.onShutdown()
+    self._rtsm.onActivated(RTC.INACTIVE_STATE)
+    self._rtsm.onDeactivated(RTC.ACTIVE_STATE)
+    self._rtsm.onError(RTC.ACTIVE_STATE)
+    self._rtsm.onReset(RTC.ERROR_STATE)
+    self._rtsm.onExecute(RTC.ACTIVE_STATE)
+    self._rtsm.onStateUpdate(RTC.ACTIVE_STATE)
+    self._rtsm.onRateChanged()
+    self._rtsm.onAction(RTC.ACTIVE_STATE)
+    self._rtsm.onModeChanged(RTC.ACTIVE_STATE)
+    return
+
+  def test_getState(self):
+    self.assertEqual(RTC.INACTIVE_STATE, self._rtsm.getState())
+    self.assertEqual(RTC.INACTIVE_STATE, self._rtsm.getStates().curr)
+    self.assertEqual(True, self._rtsm.isCurrentState(RTC.INACTIVE_STATE))
+    self.assertEqual(True, self._rtsm.isNextState(RTC.INACTIVE_STATE))
+    self._rtsm.goTo(RTC.ACTIVE_STATE)
+    self.assertEqual(True, self._rtsm.isNextState(RTC.ACTIVE_STATE))
+    return
+
+
+  def test_worker(self):
+    self._rtsm.workerPreDo()
+    self._rtsm.workerDo()
+    self._rtsm.workerPostDo()
+    return
+
+
+  def test_ComponentAction(self):
+    self._rtsm.setComponentAction(self._comp.getObjRef())
+    self._rtsm.setDataFlowComponentAction(self._comp.getObjRef())
+    self._rtsm.setFsmParticipantAction(self._comp.getObjRef())
+    self._rtsm.setMultiModeComponentAction(self._comp.getObjRef())
+    return
+
+
+
+############### test #################
+if __name__ == '__main__':
+  unittest.main()



openrtm-commit メーリングリストの案内