OpenRTM-aist  2.1.0
InPort.h
Go to the documentation of this file.
1 // -*- C++ -*-
20 #ifndef RTC_INPORT_H
21 #define RTC_INPORT_H
22 
23 #include <coil/OS.h>
24 #include <mutex>
25 
26 #include <rtm/RTC.h>
27 #include <rtm/Typename.h>
28 #include <rtm/InPortBase.h>
29 #include <rtm/CdrBufferBase.h>
30 #include <rtm/PortCallback.h>
31 #include <rtm/InPortConnector.h>
32 #include <rtm/Timestamp.h>
33 #include <rtm/DirectInPortBase.h>
35 #include <rtm/DataTypeUtil.h>
36 
37 
38 namespace RTC
39 {
86  template <class DataType>
87  class InPort
88  : public InPortBase, DirectInPortBase<DataType>
89  {
90  public:
115  InPort(const char* name, DataType& value)
116  : InPortBase(name, ::CORBA_Util::toRepositoryId<DataType>()),
117  m_name(name), m_value(value),
118  m_OnRead(nullptr), m_OnReadConvert(nullptr),
119  m_status(1), m_directNewData(false)
120  {
121 
122  this->initConnectorListeners();
123 
125  new Timestamp<DataType>("on_received"));
127  new Timestamp<DataType>("on_read"));
128  m_directport = this;
129 
130  CdrMemoryStreamInit<DataType>();
131 
132  std::string marshaling_types{coil::eraseBlank(coil::flatten(
133  getSerializerList<DataType>()))};
134 
135  RTC_DEBUG(("available marshaling_types: %s", marshaling_types.c_str()));
136 
137  addProperty("dataport.marshaling_types", marshaling_types.c_str());
138  }
139 
155  ~InPort() override;
156 
176  virtual const char* name()
177  {
178  return m_name.c_str();
179  }
180 
181 
206  virtual bool isNew(std::string name)
207  {
208  RTC_TRACE(("isNew()"));
209 
210 
211  {
212  std::lock_guard<std::mutex> guard(m_connectorsMutex);
213  if (m_connectors.empty())
214  {
215  RTC_DEBUG(("no connectors"));
216  return false;
217  }
218 
219  for(auto & con : m_connectors)
220  {
221  if (std::string(con->name()) == name)
222  {
223  size_t r = con->getBuffer()->readable();
224  if (r > 0)
225  {
226  RTC_DEBUG(("isNew() = true, readable data: %d", r));
227  return true;
228  }
229  }
230  }
231  }
232 
233  RTC_DEBUG(("isNew() = false, no readable data"));
234  return false;
235  }
236  virtual bool isNew(coil::vstring &names)
237  {
238  names.clear();
239  RTC_TRACE(("isNew()"));
240 
241 
242 
243  {
244  std::lock_guard<std::mutex> guard(m_connectorsMutex);
245  if (m_connectors.empty())
246  {
247  RTC_DEBUG(("no connectors"));
248  return false;
249  }
250  for(auto & con : m_connectors)
251  {
252  size_t r = con->getBuffer()->readable();
253  if (r > 0)
254  {
255  names.emplace_back(con->name());
256  }
257  }
258 
259  }
260 
261  if (!names.empty())
262  {
263  RTC_DEBUG(("isNew() = true, buffer is not empty"));
264  return true;
265  }
266 
267  RTC_DEBUG(("isNew() = false, no readable data"));
268  return false;
269  }
270  bool isNew() override
271  {
272  RTC_TRACE(("isNew()"));
273 
274  // In single-buffer mode, all connectors share the same buffer. This
275  // means that we only need to read from the first connector to get data
276  // received by any connector.
277  {
278  std::lock_guard<std::mutex> guard(m_valueMutex);
279  if (m_directNewData == true)
280  {
281  RTC_DEBUG(("isNew() returns true because of direct write."));
282  return true;
283  }
284  }
285  size_t r(0);
286  {
287  std::lock_guard<std::mutex> guard(m_connectorsMutex);
288  if (m_connectors.empty())
289  {
290  RTC_DEBUG(("no connectors"));
291  return false;
292  }
293  r = m_connectors[0]->getBuffer()->readable();
294  }
295 
296  if (r > 0)
297  {
298  RTC_DEBUG(("isNew() = true, readable data: %d", r));
299  return true;
300  }
301 
302  RTC_DEBUG(("isNew() = false, no readable data"));
303  return false;
304  }
305 
329  virtual bool isEmpty(std::string name)
330  {
331  RTC_TRACE(("isEmpty()"));
332 
333 
334  {
335  std::lock_guard<std::mutex> guard(m_connectorsMutex);
336  if (m_connectors.empty())
337  {
338  RTC_DEBUG(("no connectors"));
339  return false;
340  }
341 
342  for(auto & con : m_connectors)
343  {
344  if (std::string(con->name()) == name)
345  {
346  size_t r = con->getBuffer()->readable();
347  if (r == 0)
348  {
349  RTC_DEBUG(("isEmpty() = true, buffer is empty"));
350  return true;
351  }
352  }
353  }
354  }
355 
356  RTC_DEBUG(("isEmpty() = false, no readable data"));
357  return false;
358  }
359  virtual bool isEmpty(coil::vstring &names)
360  {
361  names.clear();
362  RTC_TRACE(("isEmpty()"));
363 
364 
365 
366  {
367  std::lock_guard<std::mutex> guard(m_connectorsMutex);
368  if (m_connectors.empty())
369  {
370  RTC_DEBUG(("no connectors"));
371  return false;
372  }
373 
374  for(auto & con : m_connectors)
375  {
376  size_t r = con->getBuffer()->readable();
377  if (r == 0)
378  {
379  names.emplace_back(con->name());
380  }
381  }
382 
383  }
384 
385  if (!names.empty())
386  {
387  RTC_DEBUG(("isEmpty() = true, buffer is empty"));
388  return true;
389  }
390 
391  RTC_DEBUG(("isEmpty() = false, no readable data"));
392  return false;
393  }
394  bool isEmpty() override
395  {
396  RTC_TRACE(("isEmpty()"));
397  if (m_directNewData == true) { return false; }
398  size_t r(0);
399 
400  {
401  std::lock_guard<std::mutex> guard(m_connectorsMutex);
402  if (m_connectors.empty())
403  {
404  RTC_DEBUG(("no connectors"));
405  return true;
406  }
407  // In single-buffer mode, all connectors share the same buffer. This
408  // means that we only need to read from the first connector to get data
409  // received by any connector.
410  r = m_connectors[0]->getBuffer()->readable();
411  }
412 
413  if (r == 0)
414  {
415  RTC_DEBUG(("isEmpty() = true, buffer is empty"));
416  return true;
417  }
418 
419  RTC_DEBUG(("isEmpty() = false, data exists in the buffer"));
420  return false;
421  }
422 
423  void write(DataType& data) override
424  {
425  std::lock_guard<std::mutex> guard(m_valueMutex);
426  CORBA_Util::copyData<DataType>(m_value, data);
427  m_directNewData = true;
428  }
429 
504  bool read(std::string name="") override
505  {
506  RTC_TRACE(("DataType read()"));
507 
508  if (m_OnRead != nullptr)
509  {
510  (*m_OnRead)();
511  RTC_TRACE(("OnRead called"));
512  }
513  // 1) direct connection
514  {
515  std::lock_guard<std::mutex> guard(m_valueMutex);
516  if (m_directNewData == true)
517  {
518  RTC_DEBUG(("Direct data transfer"));
519  if (m_OnReadConvert != nullptr)
520  {
521  m_value = (*m_OnReadConvert)(m_value);
522  RTC_DEBUG(("OnReadConvert for direct data called"));
523  return true;
524  }
525  m_directNewData = false;
526  return true;
527  }
528  }
529  // 2) network connection
530 
531  DataPortStatus ret;
532  {
533  std::lock_guard<std::mutex> guard(m_connectorsMutex);
534  if (m_connectors.empty())
535  {
536  RTC_DEBUG(("no connectors"));
537  return false;
538  }
539 
540 
541  }
542 
543  InPortConnector* connector = nullptr;
544 
545  if (name.empty())
546  {
547  connector = m_connectors[0];
548  }
549  else
550  {
551  for(auto & con : m_connectors)
552  {
553  if (std::string(con->name()) == name)
554  {
555  connector = con;
556  }
557  }
558  }
559 
560  if (connector == nullptr)
561  {
562  RTC_ERROR(("can not find %s",name.c_str()));
563  return false;
564  }
565 
566  if (!connector->getDirectData(m_value))
567  {
568  {
569  std::lock_guard<std::mutex> guard(m_connectorsMutex);
570  // In single-buffer mode, all connectors share the same buffer. This
571  // means that we only need to read from the first connector to get data
572  // received by any connector.
573  ret = connector->read(m_value);
574  }
575  m_status[0] = ret;
576  if (ret == DataPortStatus::PORT_OK)
577  {
578  std::lock_guard<std::mutex> guard(m_valueMutex);
579  RTC_DEBUG(("data read succeeded"));
580 
581  if (m_OnReadConvert != nullptr)
582  {
583  m_value = (*m_OnReadConvert)(m_value);
584  RTC_DEBUG(("OnReadConvert called"));
585  return true;
586  }
587  return true;
588  }
589  else if (ret == DataPortStatus::BUFFER_EMPTY)
590  {
591  RTC_WARN(("buffer empty"));
592  return false;
593  }
594  else if (ret == DataPortStatus::BUFFER_TIMEOUT)
595  {
596  RTC_WARN(("buffer read timeout"));
597  return false;
598  }
599  }
600  else
601  {
602  return true;
603  }
604  RTC_ERROR(("unknown retern value from buffer.read()"));
605  return false;
606  }
607 
608 
631  virtual void update()
632  {
633  this->read();
634  }
635 
656  void operator>>(DataType& rhs)
657  {
658  this->read();
659  CORBA_Util::copyData<DataType>(rhs, m_value);
660  return;
661  }
662 
695  DataPortStatus getStatus(int /*index*/)
696  {
697  return m_status[0];
698  }
730  {
731  return m_status;
732  }
733 
755  inline void setOnRead(OnRead<DataType>* on_read)
756  {
757  m_OnRead = on_read;
758  }
759 
783  inline void setOnReadConvert(OnReadConvert<DataType>* on_rconvert)
784  {
785  m_OnReadConvert = on_rconvert;
786  }
787  protected:
802  void initConnectorListeners() override
803  {
804  delete m_listeners;
806  }
807  private:
808  std::string m_typename;
816  std::string m_name;
817 
825  DataType& m_value;
826  mutable std::mutex m_valueMutex;
827 
835  OnRead<DataType>* m_OnRead;
836 
844  OnReadConvert<DataType>* m_OnReadConvert;
845 
853  DataPortStatusList m_status;
854 
862  bool m_directNewData;
863  };
864 
865  template <class T> InPort<T>::~InPort() = default; // No inline for gcc warning, too big
866 } // namespace RTC
867 
868 #endif // RTC_INPORT_H
CORBA CDR Stream Buffer class.
Data type utility function.
DirectInPortBase class.
RTC::Port implementation for InPort.
InPortConnector base class.
PortCallback class.
RTComponent header.
#define RTC_WARN(fmt)
Warning log output macro.
Definition: SystemLogger.h:621
#define RTC_TRACE(fmt)
Trace level log output macro.
Definition: SystemLogger.h:687
#define RTC_DEBUG(fmt)
Debug level log output macro.
Definition: SystemLogger.h:665
#define RTC_ERROR(fmt)
Error log output macro.
Definition: SystemLogger.h:599
Timestamp listener class.
Typename function.
DataPortStatus mixin class.
Definition: ConnectorListener.h:35
ConnectorListenersT class.
Definition: ConnectorListener.h:2086
Definition: DirectInPortBase.h:48
Port for InPort.
Definition: InPortBase.h:70
ConnectorList m_connectors
Connection list.
Definition: InPortBase.h:884
ConnectorListenersBase * m_listeners
ConnectorDataListener listener.
Definition: InPortBase.h:901
void addConnectorDataListener(ConnectorDataListenerType type, ConnectorDataListener *listener, bool autoclean=true)
Adding BufferDataListener type listener.
InPortConnector base class.
Definition: InPortConnector.h:54
bool getDirectData(DataType &data)
Definition: InPortConnector.h:321
virtual DataPortStatus read(ByteDataStreamBase *data)=0
Destructor.
InPort template class.
Definition: InPort.h:89
bool read(std::string name="") override
Readout the value from DataPort.
Definition: InPort.h:504
void operator>>(DataType &rhs)
Read the newly value data in InPort to type-T variable.
Definition: InPort.h:656
virtual bool isEmpty(coil::vstring &names)
Definition: InPort.h:359
DataPortStatusList getStatusList()
Getting specified connector's writing status list.
Definition: InPort.h:729
virtual bool isNew(coil::vstring &names)
Definition: InPort.h:236
virtual bool isNew(std::string name)
Check whether the data is newest.
Definition: InPort.h:206
void initConnectorListeners() override
Definition: InPort.h:802
bool isEmpty() override
Check whether the data is newest.
Definition: InPort.h:394
void setOnRead(OnRead< DataType > *on_read)
Set callback when data is read from the InPort buffer.
Definition: InPort.h:755
void write(DataType &data) override
Definition: InPort.h:423
DataPortStatus getStatus(int)
Getting specified connector's writing status.
Definition: InPort.h:695
bool isNew() override
Check whether the data is newest.
Definition: InPort.h:270
virtual void update()
Read the newly value to type-T variable which is bound to InPort's buffer.
Definition: InPort.h:631
virtual bool isEmpty(std::string name)
Check whether the data is newest.
Definition: InPort.h:329
~InPort() override
Destructor.
virtual const char * name()
Get port name.
Definition: InPort.h:176
InPort(const char *name, DataType &value)
A constructor.
Definition: InPort.h:115
void setOnReadConvert(OnReadConvert< DataType > *on_rconvert)
Set callback when data is readout to the InPort buffer.
Definition: InPort.h:783
std::mutex m_connectorsMutex
Definition: PortBase.h:2115
DirectPortBase * m_directport
Definition: PortBase.h:2239
void addProperty(const char *key, ValueType value)
Add NameValue data to PortProfile's properties.
Definition: PortBase.h:1877
Definition: Timestamp.h:61
Definition: DataTypeUtil.h:24
const char * toRepositoryId()
Getting CORBA defined type as characters.
Definition: Typename.h:241
RT-Component.
std::vector< DataPortStatus > DataPortStatusList
Definition: DataPortStatus.h:175
Data convert callback abstract class on read()
Definition: PortCallback.h:386
Callback abstract class on read()
Definition: PortCallback.h:324