OpenRTM-aist  2.1.0
OutPort.h
Go to the documentation of this file.
1 // -*- C++ -*-
20 #ifndef RTC_OUTPORT_H
21 #define RTC_OUTPORT_H
22 
23 #include <coil/TimeMeasure.h>
24 #include <coil/OS.h>
25 
26 #include <rtm/RTC.h>
27 #include <rtm/Typename.h>
28 #include <rtm/OutPortBase.h>
29 #include <rtm/CdrBufferBase.h>
30 #include <rtm/PortCallback.h>
31 #include <rtm/OutPortConnector.h>
32 #include <rtm/Timestamp.h>
33 #include <rtm/DirectOutPortBase.h>
34 #include <rtm/DataTypeUtil.h>
35 
36 #include <functional>
37 #include <string>
38 #include <vector>
39 
40 namespace RTC
41 {
77  template <class DataType>
78  class OutPort
79  : public OutPortBase, DirectOutPortBase<DataType>
80  {
81  public:
105  OutPort(const char* name, DataType& value)
106  : OutPortBase(name, ::CORBA_Util::toRepositoryId<DataType>()),
107  m_value(value), m_onWrite(nullptr), m_onWriteConvert(nullptr),
108  m_directNewData(false), m_directValue(value)
109  {
110  this->initConnectorListeners();
112  new Timestamp<DataType>("on_write"));
114  new Timestamp<DataType>("on_send"));
115 
116  m_directport = this;
117 
118  CdrMemoryStreamInit<DataType>();
119 
120  std::string marshaling_types{coil::eraseBlank(coil::flatten(
121  getSerializerList<DataType>()))};
122 
123  RTC_DEBUG(("available marshaling_types: %s", marshaling_types.c_str()));
124 
125  addProperty("dataport.marshaling_types", marshaling_types.c_str());
126  }
127 
143  ~OutPort() override;
144 
186  virtual bool write(DataType& value)
187  {
188  RTC_TRACE(("DataType write()"));
189 
190  if (m_onWrite != nullptr)
191  {
192  (*m_onWrite)(value);
193  RTC_TRACE(("OnWrite called"));
194  }
195 
196 
197  bool result(true);
198  std::vector<const char *> disconnect_ids;
199  {
200  std::lock_guard<std::mutex> con_guard(m_connectorsMutex);
201  // check number of connectors
202  size_t conn_size(m_connectors.size());
203  if (!(conn_size > 0)) { return false; }
204 
205  m_status.resize(conn_size);
206 
207  for (size_t i(0), len(conn_size); i < len; ++i)
208  {
209 
210  DataPortStatus ret;
211  if (!m_connectors[i]->pullDirectMode())
212  {
213  if (m_onWriteConvert != nullptr)
214  {
215  RTC_DEBUG(("m_connectors.OnWriteConvert called"));
216  DataType tmp = (*m_onWriteConvert)(value);
217  ret = m_connectors[i]->write(tmp);
218  }
219  else
220  {
221  RTC_DEBUG(("m_connectors.write called"));
222  ret = m_connectors[i]->write(value);
223  }
224  }
225  else
226  {
227  std::lock_guard<std::mutex> value_guard(m_valueMutex);
228  if (m_onWriteConvert != nullptr)
229  {
230  RTC_DEBUG(("m_connectors.OnWriteConvert called"));
231  m_directValue = ((*m_onWriteConvert)(value));
232  }
233  else
234  {
235  CORBA_Util::copyData<DataType>(m_directValue, value);
236  }
237  m_directNewData = true;
239  }
240  m_status[i] = ret;
241 
242  if (ret == DataPortStatus::PORT_OK) { continue; }
243 
244  result = false;
245 
247  {
248  const char* id(m_connectors[i]->profile().id.c_str());
249  RTC_WARN(("connection_lost id: %s", id));
250  if (m_onConnectionLost != nullptr)
251  {
252  RTC::ConnectorProfile prof(findConnProfile(id));
253  (*m_onConnectionLost)(prof);
254  }
255  disconnect_ids.emplace_back(id);
256  }
257  }
258  }
259  std::for_each(disconnect_ids.begin(), disconnect_ids.end(),
260  [this](const char * id){this->disconnect(id);});
261  return result;
262  }
263 
285  bool write() override
286  {
287  return write(m_value);
288  }
289 
315  bool operator<<(DataType& value)
316  {
317  return write(value);
318  }
319 
353  {
354  return m_status[index];
355  }
387  {
388  return m_status;
389  }
390 
420  inline void setOnWrite(OnWrite<DataType>* on_write)
421  {
422  m_onWrite = on_write;
423  }
424 
461  inline void setOnWriteConvert(OnWriteConvert<DataType>* on_wconvert)
462  {
463  m_onWriteConvert = on_wconvert;
464  }
465 
481  void read(DataType& data) override
482  {
483  std::lock_guard<std::mutex> guard(m_valueMutex);
484  m_directNewData = false;
485  CORBA_Util::copyData<DataType>(data, m_directValue);
486  }
487  bool isEmpty() override
488  {
489  return !m_directNewData;
490  }
491  bool isNew() override
492  {
493  return m_directNewData;
494  }
495 
496  protected:
511  void initConnectorListeners() override
512  {
513  delete m_listeners;
515  }
516  private:
517  std::string m_typename;
525  DataType& m_value;
526 
534  OnWrite<DataType>* m_onWrite;
535 
543  OnWriteConvert<DataType>* m_onWriteConvert;
544 
545  coil::TimeMeasure m_cdrtime;
546 
547  DataPortStatusList m_status;
548 
549  CORBA::Long m_propValueIndex;
550 
551  std::mutex m_valueMutex;
552  bool m_directNewData;
553  DataType m_directValue;
554  };
555 
556  template <class T> OutPort<T>::~OutPort() = default; // No inline for gcc warning, too big
557 } // namespace RTC
558 
559 #endif // RTC_OUTPORT_H
Data type utility function.
DirectOutPortBase class.
InPortBase base class.
OutPortConnector class.
PortCallback class.
RTComponent header.
#define RTC_WARN(fmt)
Warning log output macro.
Definition: SystemLogger.h:621
#define RTC_TRACE(fmt)
Trace level log output macro.
Definition: SystemLogger.h:687
#define RTC_DEBUG(fmt)
Debug level log output macro.
Definition: SystemLogger.h:665
Timestamp listener class.
Typename function.
DataPortStatus mixin class.
Definition: ConnectorListener.h:35
ConnectorListenersT class.
Definition: ConnectorListener.h:2086
Definition: DirectOutPortBase.h:48
Callback abstract class on write()
Definition: PortCallback.h:193
Output base class.
Definition: OutPortBase.h:229
void addConnectorDataListener(ConnectorDataListenerType type, ConnectorDataListener *listener, bool autoclean=true)
Adding BufferDataListener type listener.
ConnectorListenersBase * m_listeners
ConnectorDataListener listener.
Definition: OutPortBase.h:1098
std::vector< OutPortConnector * > m_connectors
Connection list.
Definition: OutPortBase.h:1066
OutPort template class.
Definition: OutPort.h:80
DataPortStatusList getStatusList()
Getting specified connector's writing status list.
Definition: OutPort.h:386
DataPortStatus getStatus(int index)
Getting specified connector's writing status.
Definition: OutPort.h:352
bool isEmpty() override
Definition: OutPort.h:487
void initConnectorListeners() override
Definition: OutPort.h:511
virtual bool write(DataType &value)
Write data.
Definition: OutPort.h:186
void setOnWrite(OnWrite< DataType > *on_write)
Set OnWrite callback.
Definition: OutPort.h:420
bool operator<<(DataType &value)
Write data.
Definition: OutPort.h:315
void read(DataType &data) override
Definition: OutPort.h:481
~OutPort() override
Destructor.
void setOnWriteConvert(OnWriteConvert< DataType > *on_wconvert)
Set OnWriteConvert callback.
Definition: OutPort.h:461
bool isNew() override
Definition: OutPort.h:491
bool write() override
Write data.
Definition: OutPort.h:285
OutPort(const char *name, DataType &value)
Constructor.
Definition: OutPort.h:105
std::mutex m_connectorsMutex
Definition: PortBase.h:2115
DirectPortBase * m_directport
Definition: PortBase.h:2239
ConnectorProfile findConnProfile(const char *id)
Find ConnectorProfile with id.
ConnectionCallback * m_onConnectionLost
Callback functor objects.
Definition: PortBase.h:2222
void addProperty(const char *key, ValueType value)
Add NameValue data to PortProfile's properties.
Definition: PortBase.h:1877
Definition: Timestamp.h:61
Functor for_each(CorbaSequence &seq, Functor f)
Apply the functor to all CORBA sequence elements.
Definition: CORBA_SeqUtil.h:98
Definition: DataTypeUtil.h:24
const char * toRepositoryId()
Getting CORBA defined type as characters.
Definition: Typename.h:241
RT-Component.
std::vector< DataPortStatus > DataPortStatusList
Definition: DataPortStatus.h:175
Data convert callback abstract class on write()
Definition: PortCallback.h:259