[openrtm-commit:00761] r524 - in branches/work/OpenRTM-aist-Python/OpenRTM_aist: . test
openrtm @ openrtm.org
openrtm @ openrtm.org
2012年 3月 12日 (月) 22:00:54 JST
Author: kurihara
Date: 2012-03-12 22:00:54 +0900 (Mon, 12 Mar 2012)
New Revision: 524
Added:
branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py
branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py
Log:
[incompat,impl,func] RTObjectStateMachine class was implemented. refs #2343
Added: branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py
===================================================================
--- branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py (rev 0)
+++ branches/work/OpenRTM-aist-Python/OpenRTM_aist/RTObjectStateMachine.py 2012-03-12 13:00:54 UTC (rev 524)
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file RTObjectStateMachine.py
+# @brief ExecutionContext's state machine worker class
+# @date $Date$
+# @author Noriaki Ando <n-ando at aist.go.jp> and Shinji Kurihara
+#
+# Copyright (C) 2011
+# Noriaki Ando
+# Intelligent Systems Research Institute,
+# National Institute of
+# Advanced Industrial Science and Technology (AIST), Japan
+# All rights reserved.
+#
+# $Id$
+#
+
+from omniORB import CORBA, PortableServer
+
+import OpenRTM_aist
+import RTC
+
+NUM_OF_LIFECYCLESTATE = 4
+
+class RTObjectStateMachine:
+ """
+ """
+
+ # RTObjectStateMachine(RTC::ExecutionContextHandle_t id,
+ # RTC::LightweightRTObject_ptr comp);
+ def __init__(self, id, comp):
+ global NUM_OF_LIFECYCLESTATE
+ self._id = id
+ self._rtobj = comp
+ self._sm = OpenRTM_aist.StateMachine(NUM_OF_LIFECYCLESTATE)
+ self._ca = False
+ self._dfc = False
+ self._fsm = False
+ self._mode = False
+ self._caVar = None
+ self._dfcVar = None
+ self._fsmVar = None
+ self._modeVar = None
+
+ # Setting Action callback objects
+ self.setComponentAction(comp)
+ self.setDataFlowComponentAction(comp)
+ self.setFsmParticipantAction(comp)
+ self.setMultiModeComponentAction(comp)
+ # Setting callback functions to StateMachine
+ self._sm.setListener(self)
+ self._sm.setEntryAction (RTC.ACTIVE_STATE,
+ self.onActivated)
+ self._sm.setDoAction (RTC.ACTIVE_STATE,
+ self.onExecute)
+ self._sm.setPostDoAction(RTC.ACTIVE_STATE,
+ self.onStateUpdate)
+ self._sm.setExitAction (RTC.ACTIVE_STATE,
+ self.onDeactivated)
+ self._sm.setEntryAction (RTC.ERROR_STATE,
+ self.onAborting)
+ self._sm.setDoAction (RTC.ERROR_STATE,
+ self.onError)
+ self._sm.setExitAction (RTC.ERROR_STATE,
+ self.onReset)
+ # Setting inital state
+ st = OpenRTM_aist.StateHolder()
+ st.prev = RTC.INACTIVE_STATE
+ st.curr = RTC.INACTIVE_STATE
+ st.next = RTC.INACTIVE_STATE
+ self._sm.setStartState(st)
+ self._sm.goTo(RTC.INACTIVE_STATE)
+ return
+
+
+ def __del__(self):
+ if self._ca:
+ self._ca = False
+ self._caVar = None
+
+ if self._dfc:
+ self._dfc = False
+ self._dfcVar = None
+
+ if self._fsm:
+ self._fsm = False
+ self._fsmVar = None
+
+ if self._mode:
+ self._mode = False
+ self._modeVar = None
+
+ return
+
+
+ # functions for stored RTObject reference
+ # RTC::LightweightRTObject_ptr getRTObject();
+ def getRTObject(self):
+ return self._rtobj
+
+ # bool isEquivalent(RTC::LightweightRTObject_ptr comp);
+ def isEquivalent(self, comp):
+ return self._rtobj._is_equivalent(comp)
+
+ # RTC::ExecutionContextHandle_t getExecutionContextHandle();
+ def getExecutionContextHandle(self):
+ return self._id
+
+ # RTC::ComponentAction operations
+ # void onStartup(void);
+ def onStartup(self):
+ if not self._ca:
+ return
+ self._caVar.on_startup(self._id)
+ return
+
+ # void onShutdown(void);
+ def onShutdown(self):
+ if not self._ca:
+ return
+ self._caVar.on_shutdown(self._id)
+ return
+
+ # void onActivated(const ExecContextStates& st);
+ def onActivated(self, st):
+ if not self._ca:
+ return
+ if self._caVar.on_activated(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # void onDeactivated(const ExecContextStates& st);
+ def onDeactivated(self, st):
+ if not self._ca:
+ return
+ self._caVar.on_deactivated(self._id)
+ return
+
+ # void onAborting(const ExecContextStates& st);
+ def onAborting(self, st):
+ if not self._ca:
+ return
+ self._caVar.on_error(self._id)
+ return
+
+ # void onError(const ExecContextStates& st);
+ def onError(self, st):
+ if not self._ca:
+ return
+ self._caVar.on_error(self._id)
+ return
+
+ # void onReset(const ExecContextStates& st);
+ def onReset(self, st):
+ if not self._ca:
+ return
+ if self._caVar.on_reset(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # RTC::DataflowComponentAction
+ # void onExecute(const ExecContextStates& st);
+ def onExecute(self, st):
+ if not self._dfc:
+ return
+
+ if self._dfcVar.on_execute(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # void onStateUpdate(const ExecContextStates& st);
+ def onStateUpdate(self, st):
+ if not self._dfc:
+ return
+
+ if self._dfcVar.on_state_update(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # void onRateChanged(void);
+ def onRateChanged(self):
+ if not self._dfc:
+ return
+
+ if self._dfcVar.on_rate_changed(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # FsmParticipantAction
+ # void onAction(const ExecContextStates& st);
+ def onAction(self, st):
+ if not self._fsm:
+ return
+
+ if self._fsmVar.on_action(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # MultiModeComponentAction
+ # void onModeChanged(const ExecContextStates& st);
+ def onModeChanged(self, st):
+ if not self._mode:
+ return
+
+ if self._modeVar.on_mode_changed(self._id) != RTC.RTC_OK:
+ self._sm.goTo(RTC.ERROR_STATE)
+ return
+
+ # Getting state of the context
+ # ExecContextState getState(void);
+ def getState(self):
+ return self._sm.getState()
+
+ # ExecContextStates getStates(void);
+ def getStates(self):
+ return self._sm.getStates()
+
+ # bool isCurrentState(ExecContextState state);
+ def isCurrentState(self, state):
+ return self.getState() == state
+
+ # bool isNextState(ExecContextState state);
+ def isNextState(self, state):
+ return self._sm.getStates().next == state
+
+ # void goTo(ExecContextState state);
+ def goTo(self, state):
+ self._sm.goTo(state)
+ return
+
+ # Workers
+ # void workerPreDo(void);
+ def workerPreDo(self):
+ return self._sm.worker_pre()
+
+ # void workerDo(void);
+ def workerDo(self):
+ return self._sm.worker_do()
+
+ # void workerPostDo(void);
+ def workerPostDo(self):
+ return self._sm.worker_post()
+
+ # void setComponentAction(const RTC::LightweightRTObject_ptr comp);
+ def setComponentAction(self, comp):
+ self._caVar = comp._narrow(RTC.ComponentAction)
+ if not CORBA.is_nil(self._caVar):
+ self._ca = True
+ return
+
+ # void setDataFlowComponentAction(const RTC::LightweightRTObject_ptr comp);
+ def setDataFlowComponentAction(self, comp):
+ self._dfcVar = comp._narrow(RTC.DataFlowComponentAction)
+ if not CORBA.is_nil(self._dfcVar):
+ self._dfc = True
+ return
+
+ # void setFsmParticipantAction(const RTC::LightweightRTObject_ptr comp);
+ def setFsmParticipantAction(self, comp):
+ self._fsmVar = comp._narrow(RTC.FsmParticipantAction)
+ if not CORBA.is_nil(self._fsmVar):
+ self._fsm = True
+ return
+
+ # void setMultiModeComponentAction(const RTC::LightweightRTObject_ptr comp);
+ def setMultiModeComponentAction(self, comp):
+ self._modeVar = comp._narrow(RTC.MultiModeComponentAction)
+ if not CORBA.is_nil(self._modeVar):
+ self._mode = True
+ return
Added: branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py
===================================================================
--- branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py (rev 0)
+++ branches/work/OpenRTM-aist-Python/OpenRTM_aist/test/test_RTObjectStateMachine.py 2012-03-12 13:00:54 UTC (rev 524)
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file test_RTObjectStateMachine.py
+# @brief test for ExecutionContext's state machine worker class
+# @date $Date$
+# @author Shinji Kurihara
+#
+# Copyright (C) 2011
+# Noriaki Ando
+# Intelligent Systems Research Institute,
+# National Institute of
+# Advanced Industrial Science and Technology (AIST), Japan
+# All rights reserved.
+#
+# $Id$
+#
+
+import sys
+sys.path.insert(1,"../")
+sys.path.insert(1,"../RTM_IDL")
+
+import unittest
+
+from RTObjectStateMachine import *
+import OpenRTM__POA, RTC
+import OpenRTM_aist
+
+
+testcomp_spec = ["implementation_id", "TestComp",
+ "type_name", "TestComp",
+ "description", "Test example component",
+ "version", "1.0",
+ "vendor", "Shinji Kurihara, AIST",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "Python",
+ "lang_type", "compile",
+ ""]
+
+class TestComp(OpenRTM_aist.DataFlowComponentBase):
+ def __init_(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+
+
+def TestCompInit(manager):
+ global com
+ profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
+ manager.registerFactory(profile,
+ TestComp,
+ OpenRTM_aist.Delete)
+
+class TestRTObjectStateMachine(unittest.TestCase):
+ """
+ """
+
+ def setUp(self):
+ self._mgr = OpenRTM_aist.Manager.instance()
+ self._mgr.activateManager()
+ profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
+ self._mgr.registerFactory(profile,
+ TestComp,
+ OpenRTM_aist.Delete)
+ self._comp = self._mgr.createComponent("TestComp")
+ self._rtsm = RTObjectStateMachine(0, self._comp.getObjRef())
+
+ def tearDown(self):
+ OpenRTM_aist.Manager.instance().shutdownManager()
+ return
+
+ def test_getRTObject(self):
+ self.assertEqual(self._comp.getObjRef(), self._rtsm.getRTObject())
+ return
+
+ def test_isEquivalent(self):
+ self.assertEqual(True, self._rtsm.isEquivalent(self._comp.getObjRef()))
+ return
+
+ def test_getExecutionContextHandle(self):
+ self.assertEqual(0, self._rtsm.getExecutionContextHandle())
+ return
+
+ def test_ComponentActionOperations(self):
+ self._rtsm.onStartup()
+ self._rtsm.onShutdown()
+ self._rtsm.onActivated(RTC.INACTIVE_STATE)
+ self._rtsm.onDeactivated(RTC.ACTIVE_STATE)
+ self._rtsm.onError(RTC.ACTIVE_STATE)
+ self._rtsm.onReset(RTC.ERROR_STATE)
+ self._rtsm.onExecute(RTC.ACTIVE_STATE)
+ self._rtsm.onStateUpdate(RTC.ACTIVE_STATE)
+ self._rtsm.onRateChanged()
+ self._rtsm.onAction(RTC.ACTIVE_STATE)
+ self._rtsm.onModeChanged(RTC.ACTIVE_STATE)
+ return
+
+ def test_getState(self):
+ self.assertEqual(RTC.INACTIVE_STATE, self._rtsm.getState())
+ self.assertEqual(RTC.INACTIVE_STATE, self._rtsm.getStates().curr)
+ self.assertEqual(True, self._rtsm.isCurrentState(RTC.INACTIVE_STATE))
+ self.assertEqual(True, self._rtsm.isNextState(RTC.INACTIVE_STATE))
+ self._rtsm.goTo(RTC.ACTIVE_STATE)
+ self.assertEqual(True, self._rtsm.isNextState(RTC.ACTIVE_STATE))
+ return
+
+
+ def test_worker(self):
+ self._rtsm.workerPreDo()
+ self._rtsm.workerDo()
+ self._rtsm.workerPostDo()
+ return
+
+
+ def test_ComponentAction(self):
+ self._rtsm.setComponentAction(self._comp.getObjRef())
+ self._rtsm.setDataFlowComponentAction(self._comp.getObjRef())
+ self._rtsm.setFsmParticipantAction(self._comp.getObjRef())
+ self._rtsm.setMultiModeComponentAction(self._comp.getObjRef())
+ return
+
+
+
+############### test #################
+if __name__ == '__main__':
+ unittest.main()
openrtm-commit メーリングリストの案内