[openrtm-commit:00347] r455 - branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test
openrtm @ openrtm.org
openrtm @ openrtm.org
2011年 8月 12日 (金) 11:16:19 JST
Author: kurihara
Date: 2011-08-12 11:16:18 +0900 (Fri, 12 Aug 2011)
New Revision: 455
Added:
branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py
branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py
branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf
Log:
Some test programs for ComponentObserverConsumer have been added.
-ComponentObserverProvider.py: Provider side (test for ComponentObserverConsumer).
-COCTestRTC.py: Sample component using the ComponentObserverConsumer.
-rtc.conf: Configuration file for COCTestRTC.py
Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py 2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# -*- Python -*-
+
+import sys
+
+import RTC
+import OpenRTM_aist
+
+coctestrtc_spec = ["implementation_id", "COCTestRTC",
+ "type_name", "COCTestRTC",
+ "description", "Console input component",
+ "version", "1.0",
+ "vendor", "Shinji Kurihara",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "Python",
+ "lang_type", "script",
+ ""]
+
+
+class DataListener(OpenRTM_aist.ConnectorDataListenerT):
+ def __init__(self, name):
+ self._name = name
+
+ def __del__(self):
+ print "dtor of ", self._name
+
+ def __call__(self, info, cdrdata):
+ data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
+ print "------------------------------"
+ print "Listener: ", self._name
+ print "Profile::name: ", info.name
+ print "Profile::id: ", info.id
+ print "Data: ", data.data
+ print "------------------------------"
+
+class ConnListener(OpenRTM_aist.ConnectorListener):
+ def __init__(self, name):
+ self._name = name
+
+ def __del__(self):
+ print "dtor of ", self._name
+
+ def __call__(self, info):
+ print "------------------------------"
+ print "Listener: ", self._name
+ print "Profile::name: ", info.name
+ print "Profile::id: ", info.id
+ print "------------------------------"
+
+
+class COCTestRTC(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ return
+
+ def onInitialize(self):
+ self._data = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outport = OpenRTM_aist.OutPort("out", self._data)
+ # Set OutPort buffer
+ self.addOutPort("out", self._outport)
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,
+ DataListener("ON_BUFFER_WRITE"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL,
+ DataListener("ON_BUFFER_FULL"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT,
+ DataListener("ON_BUFFER_WRITE_TIMEOUT"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE,
+ DataListener("ON_BUFFER_OVERWRITE"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ,
+ DataListener("ON_BUFFER_READ"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND,
+ DataListener("ON_SEND"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,
+ DataListener("ON_RECEIVED"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL,
+ DataListener("ON_RECEIVER_FULL"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT,
+ DataListener("ON_RECEIVER_TIMEOUT"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,
+ DataListener("ON_RECEIVER_ERROR"))
+
+ self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_CONNECT,
+ ConnListener("ON_CONNECT"))
+ self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT,
+ ConnListener("ON_DISCONNECT"))
+
+ return RTC.RTC_OK
+
+
+ def onExecute(self, ec_id):
+ print "Please input number: ",
+ self._data.data = long(sys.stdin.readline())
+ OpenRTM_aist.setTimestamp(self._data)
+ print "Sending to subscriber: ", self._data.data
+ self._outport.write()
+ return RTC.RTC_OK
+
+
+def COCTestRTCInit(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=coctestrtc_spec)
+ manager.registerFactory(profile,
+ COCTestRTC,
+ OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+ COCTestRTCInit(manager)
+
+ # Create a component
+ comp = manager.createComponent("COCTestRTC")
+
+def main():
+ # Initialize manager
+ mgr = OpenRTM_aist.Manager.init(sys.argv)
+
+ # Set module initialization proceduer
+ # This procedure will be invoked in activateManager() function.
+ mgr.setModuleInitProc(MyModuleInit)
+
+ # Activate manager and register to naming service
+ mgr.activateManager()
+
+ # run the manager in blocking mode
+ # runManager(False) is the default
+ mgr.runManager()
+
+ # If you want to run the manager in non-blocking mode, do like this
+ # mgr.runManager(True)
+
+if __name__ == "__main__":
+ main()
Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py 2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file ComponentObserverProvider.py
+# @brief test for ComponentObserverConsumer
+# @date $Date$
+# @author Shinji Kurihara
+#
+# Copyright (C) 2011
+# Noriaki Ando
+# Intelligent Systems Research Institute,
+# National Institute of
+# Advanced Industrial Science and Technology (AIST), Japan
+# All rights reserved.
+#
+
+import sys
+
+from omniORB import CORBA, PortableServer
+import RTC
+import OpenRTM, OpenRTM__POA
+import SDOPackage
+import OpenRTM_aist
+
+class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
+ def __init__(self):
+ pass
+
+ def update_status(self, status_kind, hint):
+ print "update_status: ", status_kind, ", ", hint
+ return
+
+def main():
+ orb = CORBA.ORB_init(sys.argv)
+ poa = orb.resolve_initial_references("RootPOA")
+ poa._get_the_POAManager().activate()
+ naming = OpenRTM_aist.CorbaNaming(orb, "localhost")
+ servant = ComponentObserver_i()
+ oid = poa.servant_to_id(servant)
+ provider = poa.id_to_reference(oid)
+
+ rtc = naming.resolve("ConsoleIn0.rtc")._narrow(RTC.RTObject)
+ config = rtc.get_configuration()
+ properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
+ OpenRTM_aist.NVUtil.newNV("heartbeat.interval","10"),
+ OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
+
+ id = OpenRTM_aist.toTypename(servant)
+ sprof = SDOPackage.ServiceProfile("test_id", id,
+ properties, provider)
+
+ ret = config.add_service_profile(sprof)
+ flag = True
+ print "If you exit program, please input 'q'."
+ sys.stdin.readline()
+ ret = config.remove_service_profile("test_id")
+ print "test program end. ret : ", ret
+ return
+
+############### test #################
+if __name__ == '__main__':
+ main()
Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf 2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,8 @@
+corba.nameservers: localhost
+naming.formats: %n.rtc
+logger.enable: NO
+logger.log_level: VERBOSE
+#logger.file_name: rtc%p.log, STDOUT
+#manager.naming_formats: %n.mgr
+manager.modules.load_path: ../
+manager.modules.preload: ComponentObserverConsumer
openrtm-commit メーリングリストの案内