[openrtm-commit:00349] r457 - branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer
openrtm @ openrtm.org
openrtm @ openrtm.org
2011年 8月 12日 (金) 11:38:46 JST
Author: kurihara
Date: 2011-08-12 11:38:46 +0900 (Fri, 12 Aug 2011)
New Revision: 457
Added:
branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/COCTestRTC.py
branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/rtc.conf
Log:
move from test to sdo.
Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/COCTestRTC.py
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/COCTestRTC.py (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/COCTestRTC.py 2011-08-12 02:38:46 UTC (rev 457)
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# -*- Python -*-
+
+import sys
+
+import RTC
+import OpenRTM_aist
+
+coctestrtc_spec = ["implementation_id", "COCTestRTC",
+ "type_name", "COCTestRTC",
+ "description", "Console input component",
+ "version", "1.0",
+ "vendor", "Shinji Kurihara",
+ "category", "example",
+ "activity_type", "DataFlowComponent",
+ "max_instance", "10",
+ "language", "Python",
+ "lang_type", "script",
+ ""]
+
+
+class DataListener(OpenRTM_aist.ConnectorDataListenerT):
+ def __init__(self, name):
+ self._name = name
+
+ def __del__(self):
+ print "dtor of ", self._name
+
+ def __call__(self, info, cdrdata):
+ data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
+ print "------------------------------"
+ print "Listener: ", self._name
+ print "Profile::name: ", info.name
+ print "Profile::id: ", info.id
+ print "Data: ", data.data
+ print "------------------------------"
+
+class ConnListener(OpenRTM_aist.ConnectorListener):
+ def __init__(self, name):
+ self._name = name
+
+ def __del__(self):
+ print "dtor of ", self._name
+
+ def __call__(self, info):
+ print "------------------------------"
+ print "Listener: ", self._name
+ print "Profile::name: ", info.name
+ print "Profile::id: ", info.id
+ print "------------------------------"
+
+
+class COCTestRTC(OpenRTM_aist.DataFlowComponentBase):
+ def __init__(self, manager):
+ OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+ return
+
+ def onInitialize(self):
+ self._data = RTC.TimedLong(RTC.Time(0,0),0)
+ self._outport = OpenRTM_aist.OutPort("out", self._data)
+ # Set OutPort buffer
+ self.addOutPort("out", self._outport)
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,
+ DataListener("ON_BUFFER_WRITE"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL,
+ DataListener("ON_BUFFER_FULL"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT,
+ DataListener("ON_BUFFER_WRITE_TIMEOUT"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE,
+ DataListener("ON_BUFFER_OVERWRITE"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ,
+ DataListener("ON_BUFFER_READ"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND,
+ DataListener("ON_SEND"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,
+ DataListener("ON_RECEIVED"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL,
+ DataListener("ON_RECEIVER_FULL"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT,
+ DataListener("ON_RECEIVER_TIMEOUT"))
+ self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,
+ DataListener("ON_RECEIVER_ERROR"))
+
+ self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_CONNECT,
+ ConnListener("ON_CONNECT"))
+ self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT,
+ ConnListener("ON_DISCONNECT"))
+
+ return RTC.RTC_OK
+
+
+ def onExecute(self, ec_id):
+ print "Please input number: ",
+ self._data.data = long(sys.stdin.readline())
+ OpenRTM_aist.setTimestamp(self._data)
+ print "Sending to subscriber: ", self._data.data
+ self._outport.write()
+ return RTC.RTC_OK
+
+
+def COCTestRTCInit(manager):
+ profile = OpenRTM_aist.Properties(defaults_str=coctestrtc_spec)
+ manager.registerFactory(profile,
+ COCTestRTC,
+ OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+ COCTestRTCInit(manager)
+
+ # Create a component
+ comp = manager.createComponent("COCTestRTC")
+
+def main():
+ # Initialize manager
+ mgr = OpenRTM_aist.Manager.init(sys.argv)
+
+ # Set module initialization proceduer
+ # This procedure will be invoked in activateManager() function.
+ mgr.setModuleInitProc(MyModuleInit)
+
+ # Activate manager and register to naming service
+ mgr.activateManager()
+
+ # run the manager in blocking mode
+ # runManager(False) is the default
+ mgr.runManager()
+
+ # If you want to run the manager in non-blocking mode, do like this
+ # mgr.runManager(True)
+
+if __name__ == "__main__":
+ main()
Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/rtc.conf
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/rtc.conf (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/rtc.conf 2011-08-12 02:38:46 UTC (rev 457)
@@ -0,0 +1,8 @@
+corba.nameservers: localhost
+naming.formats: %n.rtc
+logger.enable: NO
+logger.log_level: VERBOSE
+#logger.file_name: rtc%p.log, STDOUT
+#manager.naming_formats: %n.mgr
+manager.modules.load_path: .
+manager.modules.preload: ComponentObserverConsumer
openrtm-commit メーリングリストの案内