[openrtm-commit:03338] r3417 - trunk/OpenRTM-aist/src/lib/rtm
openrtm @ openrtm.org
openrtm @ openrtm.org
2018年 10月 9日 (火) 08:58:58 JST
Author: miyamoto
Date: 2018-10-09 08:58:57 +0900 (Tue, 09 Oct 2018)
New Revision: 3417
Modified:
trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp
trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h
trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp
trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp
trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp
trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h
trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp
trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp
trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h
trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp
trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp
trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp
trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h
trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp
Log:
[incompat] Implementation of synchronous data port
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -168,4 +168,9 @@
}
};
+ BufferStatus::Enum InPortConnector::write(cdrMemoryStream &cdr)
+ {
+ return BufferStatus::BUFFER_OK;
+ };
+
}; // namespace RTC
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h 2018-10-08 23:58:57 UTC (rev 3417)
@@ -236,7 +236,9 @@
*/
virtual bool isLittleEndian();
+ virtual BufferStatus::Enum write(cdrMemoryStream &cdr);
+
/*!
* @if jp
* @brief データをダイレクトに書き込むためのOutPortのサーバントを設定する
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,7 @@
* @endif
*/
InPortCorbaCdrProvider::InPortCorbaCdrProvider(void)
- : m_buffer(0)
+ : m_buffer(0), m_connector(NULL)
{
// PortProfile setting
setInterfaceType("corba_cdr");
@@ -155,7 +155,7 @@
{
RTC_PARANOID(("InPortCorbaCdrProvider::put()"));
- if (m_buffer == 0)
+ if (m_connector == NULL)
{
cdrMemoryStream cdr;
#ifdef ORB_IS_ORBEXPRESS
@@ -193,7 +193,7 @@
onReceived(cdr);
- BufferStatus::Enum ret = m_buffer->write(cdr);
+ BufferStatus::Enum ret = m_connector->write(cdr);
return convertReturn(ret, cdr);
}
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -31,7 +31,7 @@
* @endif
*/
InPortDSProvider::InPortDSProvider(void)
- : m_buffer(0)
+ : m_buffer(0), m_connector(0)
{
// PortProfile setting
setInterfaceType("data_service");
@@ -153,7 +153,7 @@
{
RTC_PARANOID(("InPortDSProvider::push()"));
- if (m_buffer == 0)
+ if (m_connector == 0)
{
cdrMemoryStream cdr;
#ifdef ORB_IS_ORBEXPRESS
@@ -191,7 +191,7 @@
onReceived(cdr);
- BufferStatus::Enum ret = m_buffer->write(cdr);
+ BufferStatus::Enum ret = m_connector->write(cdr);
return convertReturn(ret, cdr);
}
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -39,7 +39,8 @@
: InPortConnector(info, listeners, buffer),
m_provider(provider),
m_listeners(listeners),
- m_deleteBuffer(buffer == 0 ? true : false)
+ m_deleteBuffer(buffer == 0 ? true : false),
+ m_sync_readwrite(false)
{
// publisher/buffer creation. This may throw std::bad_alloc;
if (m_buffer == 0)
@@ -53,6 +54,11 @@
m_provider->setBuffer(m_buffer);
m_provider->setListener(info, &m_listeners);
+ if (coil::toBool(info.properties["sync_readwrite"], "YES", "NO", false))
+ {
+ m_sync_readwrite = true;
+ }
+
onConnect();
}
@@ -91,7 +97,44 @@
{
return PRECONDITION_NOT_MET;
}
+ if (m_sync_readwrite)
+ {
+
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ m_readcompleted_worker.completed_ = false;
+ }
+
+ {
+ Guard guard(m_readready_worker.mutex_);
+ m_readready_worker.completed_ = true;
+ m_readready_worker.cond_.signal();
+ }
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ while (!m_writecompleted_worker.completed_)
+ {
+ m_writecompleted_worker.cond_.wait();
+ }
+ }
+ }
+
BufferStatus::Enum ret = m_buffer->read(data);
+
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ m_readcompleted_worker.completed_ = true;
+ m_readcompleted_worker.cond_.signal();
+ }
+
+ {
+ Guard guard(m_readready_worker.mutex_);
+ m_readready_worker.completed_ = false;
+ }
+ }
+
switch (ret)
{
case BufferStatus::BUFFER_OK:
@@ -182,5 +225,46 @@
m_listeners.connector_[ON_DISCONNECT].notify(m_profile);
}
+ BufferStatus::Enum InPortPushConnector::write(cdrMemoryStream &cdr)
+ {
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_readready_worker.mutex_);
+ while (!m_readready_worker.completed_)
+ {
+ m_readready_worker.cond_.wait();
+ }
+ }
+ }
+
+ BufferStatus::Enum ret = m_buffer->write(cdr);
+
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ m_writecompleted_worker.completed_ = true;
+ m_writecompleted_worker.cond_.signal();
+ }
+
+
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ while (!m_readcompleted_worker.completed_)
+ {
+ m_readcompleted_worker.cond_.wait();
+ }
+ }
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ m_writecompleted_worker.completed_ = false;
+ }
+ }
+
+
+ return ret;
+ };
+
}; // namespace RTC
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h 2018-10-08 23:58:57 UTC (rev 3417)
@@ -80,6 +80,7 @@
class InPortPushConnector
: public InPortConnector
{
+ typedef coil::Guard<coil::Mutex> Guard;
public:
DATAPORTSTATUS_ENUM
@@ -254,6 +255,8 @@
*/
virtual CdrBufferBase* createBuffer(ConnectorInfo& info);
+ virtual BufferStatus::Enum write(cdrMemoryStream &cdr);
+
/*!
* @if jp
* @brief 接続確立時にコールバックを呼ぶ
@@ -310,6 +313,20 @@
ConnectorListeners& m_listeners;
bool m_deleteBuffer;
+
+ bool m_sync_readwrite;
+
+ struct WorkerThreadCtrl
+ {
+ WorkerThreadCtrl() : cond_(mutex_), completed_(false) {}
+ coil::Mutex mutex_;
+ coil::Condition<coil::Mutex> cond_;
+ bool completed_;
+ };
+ WorkerThreadCtrl m_writecompleted_worker;
+ WorkerThreadCtrl m_readcompleted_worker;
+ WorkerThreadCtrl m_readready_worker;
+
};
}; // namespace RTC
Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -123,7 +123,7 @@
throw (CORBA::SystemException)
{
RTC_PARANOID(("InPortSHMProvider::put()"));
- if (m_buffer == 0)
+ if (m_connector == NULL)
{
return ::OpenRTM::PORT_ERROR;
}
@@ -152,7 +152,7 @@
onReceived(cdr);
- BufferStatus::Enum ret = m_buffer->write(cdr);
+ BufferStatus::Enum ret = m_connector->write(cdr);
return convertReturn(ret, cdr);
}
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -177,4 +177,9 @@
m_inPortListeners = &(directInPort->getListeners());
return true;
}
+
+ CdrBufferBase::ReturnCode OutPortConnector::read(cdrMemoryStream &data)
+ {
+ return CdrBufferBase::BUFFER_OK;
+ }
}; // namespace RTC
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h 2018-10-08 23:58:57 UTC (rev 3417)
@@ -273,6 +273,8 @@
return write(m_cdr);
}
+ virtual CdrBufferBase::ReturnCode read(cdrMemoryStream &data);
+
bool setInPort(InPortBase* directInPort);
/*!
* @if jp
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,7 @@
* @endif
*/
OutPortCorbaCdrProvider::OutPortCorbaCdrProvider(void)
- : m_buffer(0)
+ : m_buffer(0), m_connector(NULL)
{
// PortProfile setting
setInterfaceType("corba_cdr");
@@ -169,7 +169,7 @@
// at least the output "data" area should be allocated
data = new ::OpenRTM::CdrData();
- if (m_buffer == 0)
+ if (m_connector == NULL)
{
onSenderError();
return ::OpenRTM::UNKNOWN_ERROR;
@@ -176,7 +176,7 @@
}
cdrMemoryStream cdr = cdrMemoryStream();
- CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+ CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
if (ret == CdrBufferBase::BUFFER_OK)
{
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -31,7 +31,7 @@
* @endif
*/
OutPortDSProvider::OutPortDSProvider(void)
- : m_buffer(0)
+ : m_buffer(0), m_connector(NULL)
{
// PortProfile setting
setInterfaceType("data_service");
@@ -167,7 +167,7 @@
// at least the output "data" area should be allocated
data = new ::RTC::OctetSeq();
- if (m_buffer == 0)
+ if (m_connector == NULL)
{
onSenderError();
return ::RTC::UNKNOWN_ERROR;
@@ -174,7 +174,7 @@
}
cdrMemoryStream cdr = cdrMemoryStream();
- CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+ CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
if (ret == CdrBufferBase::BUFFER_OK)
{
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -38,7 +38,8 @@
: OutPortConnector(info, listeners),
m_provider(provider),
m_listeners(listeners),
- m_buffer(buffer)
+ m_buffer(buffer),
+ m_sync_readwrite(false)
{
// create buffer
if (m_buffer == 0)
@@ -54,6 +55,11 @@
// m_provider->init(m_profile /* , m_listeners */);
m_provider->setListener(info, &m_listeners);
+ if (coil::toBool(info.properties["sync_readwrite"], "YES", "NO", false))
+ {
+ m_sync_readwrite = true;
+ }
+
onConnect();
}
@@ -80,10 +86,101 @@
ConnectorBase::ReturnCode
OutPortPullConnector::write(cdrMemoryStream& data)
{
+
+ if (m_buffer == 0)
+ {
+ return PRECONDITION_NOT_MET;
+ }
+
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_readready_worker.mutex_);
+ while (!m_readready_worker.completed_)
+ {
+ m_readready_worker.cond_.wait();
+ }
+ }
+ }
+
m_buffer->write(data);
+
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ m_writecompleted_worker.completed_ = true;
+ m_writecompleted_worker.cond_.signal();
+ }
+
+
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ while (!m_readcompleted_worker.completed_)
+ {
+ m_readcompleted_worker.cond_.wait();
+ }
+ }
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ m_writecompleted_worker.completed_ = false;
+ }
+ }
+
return PORT_OK;
}
+ CdrBufferBase::ReturnCode
+ OutPortPullConnector::read(cdrMemoryStream &data)
+ {
+ if (m_buffer == 0)
+ {
+ return CdrBufferBase::PRECONDITION_NOT_MET;
+ }
+
+ if (m_sync_readwrite)
+ {
+
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ m_readcompleted_worker.completed_ = false;
+ }
+
+ {
+ Guard guard(m_readready_worker.mutex_);
+ m_readready_worker.completed_ = true;
+ m_readready_worker.cond_.signal();
+ }
+ {
+ Guard guard(m_writecompleted_worker.mutex_);
+ while (!m_writecompleted_worker.completed_)
+ {
+ m_writecompleted_worker.cond_.wait();
+ }
+ }
+ }
+
+ CdrBufferBase::ReturnCode ret = m_buffer->read(data);
+
+
+ if (m_sync_readwrite)
+ {
+ {
+ Guard guard(m_readcompleted_worker.mutex_);
+ m_readcompleted_worker.completed_ = true;
+ m_readcompleted_worker.cond_.signal();
+ }
+
+ {
+ Guard guard(m_readready_worker.mutex_);
+ m_readready_worker.completed_ = false;
+ }
+ }
+
+
+ return ret;
+ }
+
/*!
* @if jp
* @brief 接続解除関数
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h 2018-10-08 23:58:57 UTC (rev 3417)
@@ -81,6 +81,7 @@
class OutPortPullConnector
: public OutPortConnector
{
+ typedef coil::Guard<coil::Mutex> Guard;
public:
DATAPORTSTATUS_ENUM
@@ -164,6 +165,8 @@
*/
virtual ReturnCode write(cdrMemoryStream& data);
+ virtual CdrBufferBase::ReturnCode read(cdrMemoryStream &data);
+
/*!
* @if jp
* @brief 接続解除
@@ -282,6 +285,20 @@
* @endif
*/
CdrBufferBase* m_buffer;
+ private:
+ bool m_sync_readwrite;
+
+ struct WorkerThreadCtrl
+ {
+ WorkerThreadCtrl() : cond_(mutex_), completed_(false) {}
+ coil::Mutex mutex_;
+ coil::Condition<coil::Mutex> cond_;
+ bool completed_;
+ };
+ WorkerThreadCtrl m_writecompleted_worker;
+ WorkerThreadCtrl m_readcompleted_worker;
+ WorkerThreadCtrl m_readready_worker;
+
};
}; // namespace RTC
Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp 2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp 2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,8 @@
*/
OutPortSHMProvider::OutPortSHMProvider(void)
: m_buffer(0),
- m_memory_size(0)
+ m_memory_size(0),
+ m_connector(NULL)
{
// PortProfile setting
setInterfaceType("shared_memory");
@@ -172,7 +173,7 @@
}
cdrMemoryStream cdr;
- CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+ CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
if (ret == CdrBufferBase::BUFFER_OK)
{
#ifdef ORB_IS_ORBEXPRESS
openrtm-commit メーリングリストの案内