[openrtm-commit:03344] r1032 - trunk/OpenRTM-aist-Python/OpenRTM_aist
openrtm @ openrtm.org
openrtm @ openrtm.org
2018年 10月 10日 (水) 15:03:32 JST
Author: miyamoto
Date: 2018-10-10 15:03:32 +0900 (Wed, 10 Oct 2018)
New Revision: 1032
Modified:
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
Log:
[incompat] Implementation of synchronous data port
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -229,7 +229,7 @@
# @endif
#
# bool isEmpty()
- def isEmpty(self):
+ def isEmpty(self, names=None):
self._rtcout.RTC_TRACE("isEmpty()")
if self._directNewData == True:
return False
@@ -237,10 +237,33 @@
self._rtcout.RTC_DEBUG("no connectors")
return True
- r = self._connectors[0].getBuffer().readable()
- if r == 0:
- self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
- return True
+ if names is None:
+ r = self._connectors[0].getBuffer().readable()
+ if r == 0:
+ self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
+ return True
+ elif isinstance(names, str):
+ for con in self._connectors:
+ if con.name() == names:
+ r = con.getBuffer().readable()
+ if r == 0:
+ self._rtcout.RTC_DEBUG("isEmpty() = True, connector name: %s, buffer is empty",(names))
+ return True
+ else:
+ self._rtcout.RTC_DEBUG("isEmpty() = False, connector name: %s, readable data: %d",(names,r))
+ return False
+ elif isinstance(names, list):
+ del names[:]
+ for con in self._connectors:
+ r = con.getBuffer().readable()
+ if r == 0:
+ self._rtcout.RTC_DEBUG("isEmpty() = True, connector name: %s",(names))
+ names.append(con.name())
+ if len(names) > 0:
+ return True
+ else:
+ self._rtcout.RTC_DEBUG("isEmpty() = False, no readable data")
+ return False
self._rtcout.RTC_DEBUG("isEmpty() = false, data exists in the buffer")
return False
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -211,3 +211,9 @@
# void setDataTyep(DataType data);
def setDataType(self, data):
self._dataType = data
+
+
+ def write(self, data):
+ pass
+ def read(self, data):
+ pass
\ No newline at end of file
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -171,7 +171,7 @@
try:
self._rtcout.RTC_PARANOID("InPortCorbaCdrProvider.put()")
- if not self._buffer:
+ if not self._connector:
self.onReceiverError(data)
return OpenRTM.PORT_ERROR
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -22,6 +22,7 @@
from omniORB import any
import OpenRTM_aist
+import threading
##
@@ -141,8 +142,21 @@
self._provider.init(info.properties)
self._provider.setBuffer(self._buffer)
self._provider.setListener(info, self._listeners)
+ self.onConnect()
- self.onConnect()
+
+ self._sync_readwrite = False
+ if OpenRTM_aist.toBool(info.properties.getProperty("sync_readwrite"),"YES","NO",False):
+ self._sync_readwrite = True
+
+
+
+
+
+
+ self._writecompleted_worker = InPortPushConnector.WorkerThreadCtrl()
+ self._readcompleted_worker = InPortPushConnector.WorkerThreadCtrl()
+ self._readready_worker = InPortPushConnector.WorkerThreadCtrl()
return
@@ -212,8 +226,30 @@
return self.PRECONDITION_NOT_MET
cdr = [""]
+
+ if self._sync_readwrite:
+ self._readcompleted_worker._completed = False
+
+ self._readready_worker._completed = True
+ self._readready_worker._cond.acquire()
+ self._readready_worker._cond.notify()
+ self._readready_worker._cond.release()
+
+ self._writecompleted_worker._cond.acquire()
+ while not self._writecompleted_worker._completed:
+ self._writecompleted_worker._cond.wait()
+ self._writecompleted_worker._cond.release()
+
ret = self._buffer.read(cdr)
+ if self._sync_readwrite:
+ self._readcompleted_worker._completed = True
+ self._readcompleted_worker._cond.acquire()
+ self._readcompleted_worker._cond.notify()
+ self._readcompleted_worker._cond.release()
+
+ self._readready_worker._completed = False
+
if not self._dataType:
return self.PRECONDITION_NOT_MET
if self._endian is not None:
@@ -232,11 +268,11 @@
return self.PORT_OK
elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
- self.onBufferEmpty()
+ self.onBufferEmpty(cdr[0])
return self.BUFFER_EMPTY
elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
- self.onBufferReadTimeout()
+ self.onBufferReadTimeout(cdr[0])
return self.BUFFER_TIMEOUT
elif ret == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
@@ -381,7 +417,28 @@
#
# ReturnCode write(const OpenRTM::CdrData& data);
def write(self, data):
- return self._buffer.write(data)
+ if self._sync_readwrite:
+ self._readready_worker._cond.acquire()
+ while not self._readready_worker._completed:
+ self._readready_worker._cond.wait()
+ self._readready_worker._cond.release()
+
+ ret = self._buffer.write(data)
+
+ if self._sync_readwrite:
+ self._writecompleted_worker._completed = True
+ self._writecompleted_worker._cond.acquire()
+ self._writecompleted_worker._cond.notify()
+ self._writecompleted_worker._cond.release()
+
+ self._readcompleted_worker._cond.acquire()
+ while not self._readcompleted_worker._completed:
+ self._readcompleted_worker._cond.wait()
+ self._readcompleted_worker._cond.release()
+
+ self._writecompleted_worker._completed = False
+
+ return ret
##
@@ -414,11 +471,17 @@
if self._listeners and self._profile:
self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
return
- def onBufferEmpty(self):
+ def onBufferEmpty(self, data):
if self._listeners and self._profile:
self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
return
- def onBufferReadTimeout(self):
+ def onBufferReadTimeout(self, data):
if self._listeners and self._profile:
self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT].notify(self._profile)
return
+
+ class WorkerThreadCtrl:
+ def __init__(self):
+ self._mutex = threading.RLock()
+ self._cond = threading.Condition(self._mutex)
+ self._completed = False
\ No newline at end of file
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -184,3 +184,9 @@
# const char* name();
def directMode(self):
return self._directMode
+
+
+ def write(self, data):
+ pass
+ def read(self, data):
+ pass
\ No newline at end of file
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -283,17 +283,13 @@
# virtual ::OpenRTM::PortStatus get(::OpenRTM::CdrData_out data);
def get(self):
self._rtcout.RTC_PARANOID("OutPortCorbaCdrProvider.get()")
- if not self._buffer:
+ if not self._connector:
self.onSenderError()
return (OpenRTM.UNKNOWN_ERROR, "")
try:
- if self._buffer.empty():
- self._rtcout.RTC_ERROR("buffer is empty.")
- return (OpenRTM.BUFFER_EMPTY, "")
-
cdr = [None]
- ret = self._buffer.read(cdr)
+ ret = self._connector.read(cdr)
if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
if not cdr[0]:
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -286,17 +286,14 @@
# virtual ::OpenRTM::PortStatus pull(::RTC::CdrData_out data);
def pull(self):
self._rtcout.RTC_PARANOID("OutPortDSProvider.pull()")
- if not self._buffer:
+ if not self._connector:
self.onSenderError()
return (RTC.UNKNOWN_ERROR, "")
try:
- if self._buffer.empty():
- self._rtcout.RTC_ERROR("buffer is empty.")
- return (RTC.BUFFER_EMPTY, "")
cdr = [None]
- ret = self._buffer.read(cdr)
+ ret = self._connector.read(cdr)
if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
if not cdr[0]:
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -145,6 +145,15 @@
self._provider.setConnector(self)
self._provider.setListener(info, self._listeners)
self.onConnect()
+
+ self._sync_readwrite = False
+ if OpenRTM_aist.toBool(info.properties.getProperty("sync_readwrite"),"YES","NO",False):
+ self._sync_readwrite = True
+
+ self._writecompleted_worker = OutPortPullConnector.WorkerThreadCtrl()
+ self._readcompleted_worker = OutPortPullConnector.WorkerThreadCtrl()
+ self._readready_worker = OutPortPullConnector.WorkerThreadCtrl()
+
return
@@ -195,12 +204,66 @@
self._rtcout.RTC_ERROR("write(): endian %s is not support.",self._endian)
return self.UNKNOWN_ERROR
if self._buffer:
+ if self._sync_readwrite:
+ self._readready_worker._cond.acquire()
+ while not self._readready_worker._completed:
+ self._readready_worker._cond.wait()
+ self._readready_worker._cond.release()
+
self._buffer.write(cdr_data)
+
+ if self._sync_readwrite:
+ self._writecompleted_worker._completed = True
+ self._writecompleted_worker._cond.acquire()
+ self._writecompleted_worker._cond.notify()
+ self._writecompleted_worker._cond.release()
+
+ self._readcompleted_worker._cond.acquire()
+ while not self._readcompleted_worker._completed:
+ self._readcompleted_worker._cond.wait()
+ self._readcompleted_worker._cond.release()
+
+ self._writecompleted_worker._completed = False
else:
return self.UNKNOWN_ERROR
return self.PORT_OK
+ def read(self, data):
+ if self._sync_readwrite:
+ self._readcompleted_worker._completed = False
+
+ self._readready_worker._completed = True
+ self._readready_worker._cond.acquire()
+ self._readready_worker._cond.notify()
+ self._readready_worker._cond.release()
+
+ self._writecompleted_worker._cond.acquire()
+ while not self._writecompleted_worker._completed:
+ self._writecompleted_worker._cond.wait()
+ self._writecompleted_worker._cond.release()
+
+
+ if self._buffer.empty():
+ self._rtcout.RTC_ERROR("buffer is empty.")
+ data[0] = ""
+ return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
+
+
+ ret = self._buffer.read(data)
+
+ if self._sync_readwrite:
+ self._readcompleted_worker._completed = True
+ self._readcompleted_worker._cond.acquire()
+ self._readcompleted_worker._cond.notify()
+ self._readcompleted_worker._cond.release()
+
+ self._readready_worker._completed = False
+
+
+ return ret
+
+
##
# @if jp
# @brief Àܳ²ò½ü
@@ -340,5 +403,9 @@
def setDirectMode(self):
self._directMode = True
+ class WorkerThreadCtrl:
+ def __init__(self):
+ self._mutex = threading.RLock()
+ self._cond = threading.Condition(self._mutex)
+ self._completed = False
-
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py 2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py 2018-10-10 06:03:32 UTC (rev 1032)
@@ -166,17 +166,14 @@
def get(self):
self._rtcout.RTC_PARANOID("OutPortSHMProvider.get()")
- if not self._buffer:
+ if not self._connector:
self.onSenderError()
return OpenRTM.UNKNOWN_ERROR
try:
- if self._buffer.empty():
- self._rtcout.RTC_ERROR("buffer is empty.")
- return OpenRTM.BUFFER_EMPTY
cdr = [None]
- ret = self._buffer.read(cdr)
+ ret = self._connector.read(cdr)
if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
if cdr[0] is None:
openrtm-commit メーリングリストの案内