OpenRTM-aist 2.0.2
Loading...
Searching...
No Matches
InPort.h
Go to the documentation of this file.
1// -*- C++ -*-
20#ifndef RTC_INPORT_H
21#define RTC_INPORT_H
22
23#include <coil/OS.h>
24#include <mutex>
25
26#include <rtm/RTC.h>
27#include <rtm/Typename.h>
28#include <rtm/InPortBase.h>
29#include <rtm/CdrBufferBase.h>
30#include <rtm/PortCallback.h>
31#include <rtm/InPortConnector.h>
32#include <rtm/Timestamp.h>
35#include <rtm/DataTypeUtil.h>
36
37
38namespace RTC
39{
86 template <class DataType>
87 class InPort
88 : public InPortBase, DirectInPortBase<DataType>
89 {
90 public:
115 InPort(const char* name, DataType& value)
116 : InPortBase(name, ::CORBA_Util::toRepositoryId<DataType>()),
117 m_name(name), m_value(value),
118 m_OnRead(nullptr), m_OnReadConvert(nullptr),
119 m_status(1), m_directNewData(false)
120 {
121
123
125 new Timestamp<DataType>("on_received"));
127 new Timestamp<DataType>("on_read"));
128 m_directport = this;
129
131
132 std::string marshaling_types{coil::eraseBlank(coil::flatten(
134
135 RTC_DEBUG(("available marshaling_types: %s", marshaling_types.c_str()));
136
137 addProperty("dataport.marshaling_types", marshaling_types.c_str());
138 }
139
155 ~InPort() override;
156
176 virtual const char* name()
177 {
178 return m_name.c_str();
179 }
180
181
206 virtual bool isNew(std::string name)
207 {
208 RTC_TRACE(("isNew()"));
209
210
211 {
212 std::lock_guard<std::mutex> guard(m_connectorsMutex);
213 if (m_connectors.empty())
214 {
215 RTC_DEBUG(("no connectors"));
216 return false;
217 }
218
219 for(auto & con : m_connectors)
220 {
221 if (std::string(con->name()) == name)
222 {
223 size_t r = con->getBuffer()->readable();
224 if (r > 0)
225 {
226 RTC_DEBUG(("isNew() = true, readable data: %d", r));
227 return true;
228 }
229 }
230 }
231 }
232
233 RTC_DEBUG(("isNew() = false, no readable data"));
234 return false;
235 }
236 virtual bool isNew(coil::vstring &names)
237 {
238 names.clear();
239 RTC_TRACE(("isNew()"));
240
241
242
243 {
244 std::lock_guard<std::mutex> guard(m_connectorsMutex);
245 if (m_connectors.empty())
246 {
247 RTC_DEBUG(("no connectors"));
248 return false;
249 }
250 for(auto & con : m_connectors)
251 {
252 size_t r = con->getBuffer()->readable();
253 if (r > 0)
254 {
255 names.emplace_back(con->name());
256 }
257 }
258
259 }
260
261 if (!names.empty())
262 {
263 RTC_DEBUG(("isNew() = true, buffer is not empty"));
264 return true;
265 }
266
267 RTC_DEBUG(("isNew() = false, no readable data"));
268 return false;
269 }
270 bool isNew() override
271 {
272 RTC_TRACE(("isNew()"));
273
274 // In single-buffer mode, all connectors share the same buffer. This
275 // means that we only need to read from the first connector to get data
276 // received by any connector.
277 {
278 std::lock_guard<std::mutex> guard(m_valueMutex);
279 if (m_directNewData == true)
280 {
281 RTC_DEBUG(("isNew() returns true because of direct write."));
282 return true;
283 }
284 }
285 size_t r(0);
286 {
287 std::lock_guard<std::mutex> guard(m_connectorsMutex);
288 if (m_connectors.empty())
289 {
290 RTC_DEBUG(("no connectors"));
291 return false;
292 }
293 r = m_connectors[0]->getBuffer()->readable();
294 }
295
296 if (r > 0)
297 {
298 RTC_DEBUG(("isNew() = true, readable data: %d", r));
299 return true;
300 }
301
302 RTC_DEBUG(("isNew() = false, no readable data"));
303 return false;
304 }
305
329 virtual bool isEmpty(std::string name)
330 {
331 RTC_TRACE(("isEmpty()"));
332
333
334 {
335 std::lock_guard<std::mutex> guard(m_connectorsMutex);
336 if (m_connectors.empty())
337 {
338 RTC_DEBUG(("no connectors"));
339 return false;
340 }
341
342 for(auto & con : m_connectors)
343 {
344 if (std::string(con->name()) == name)
345 {
346 size_t r = con->getBuffer()->readable();
347 if (r == 0)
348 {
349 RTC_DEBUG(("isEmpty() = true, buffer is empty"));
350 return true;
351 }
352 }
353 }
354 }
355
356 RTC_DEBUG(("isEmpty() = false, no readable data"));
357 return false;
358 }
359 virtual bool isEmpty(coil::vstring &names)
360 {
361 names.clear();
362 RTC_TRACE(("isEmpty()"));
363
364
365
366 {
367 std::lock_guard<std::mutex> guard(m_connectorsMutex);
368 if (m_connectors.empty())
369 {
370 RTC_DEBUG(("no connectors"));
371 return false;
372 }
373
374 for(auto & con : m_connectors)
375 {
376 size_t r = con->getBuffer()->readable();
377 if (r == 0)
378 {
379 names.emplace_back(con->name());
380 }
381 }
382
383 }
384
385 if (!names.empty())
386 {
387 RTC_DEBUG(("isEmpty() = true, buffer is empty"));
388 return true;
389 }
390
391 RTC_DEBUG(("isEmpty() = false, no readable data"));
392 return false;
393 }
394 bool isEmpty() override
395 {
396 RTC_TRACE(("isEmpty()"));
397 if (m_directNewData == true) { return false; }
398 size_t r(0);
399
400 {
401 std::lock_guard<std::mutex> guard(m_connectorsMutex);
402 if (m_connectors.empty())
403 {
404 RTC_DEBUG(("no connectors"));
405 return true;
406 }
407 // In single-buffer mode, all connectors share the same buffer. This
408 // means that we only need to read from the first connector to get data
409 // received by any connector.
410 r = m_connectors[0]->getBuffer()->readable();
411 }
412
413 if (r == 0)
414 {
415 RTC_DEBUG(("isEmpty() = true, buffer is empty"));
416 return true;
417 }
418
419 RTC_DEBUG(("isEmpty() = false, data exists in the buffer"));
420 return false;
421 }
422
423 void write(DataType& data) override
424 {
425 std::lock_guard<std::mutex> guard(m_valueMutex);
426 CORBA_Util::copyData<DataType>(m_value, data);
427 m_directNewData = true;
428 }
429
504 bool read(std::string name="") override
505 {
506 RTC_TRACE(("DataType read()"));
507
508 if (m_OnRead != nullptr)
509 {
510 (*m_OnRead)();
511 RTC_TRACE(("OnRead called"));
512 }
513 // 1) direct connection
514 {
515 std::lock_guard<std::mutex> guard(m_valueMutex);
516 if (m_directNewData == true)
517 {
518 RTC_DEBUG(("Direct data transfer"));
519 if (m_OnReadConvert != nullptr)
520 {
521 m_value = (*m_OnReadConvert)(m_value);
522 RTC_DEBUG(("OnReadConvert for direct data called"));
523 return true;
524 }
525 m_directNewData = false;
526 return true;
527 }
528 }
529 // 2) network connection
530
531 DataPortStatus ret;
532 {
533 std::lock_guard<std::mutex> guard(m_connectorsMutex);
534 if (m_connectors.empty())
535 {
536 RTC_DEBUG(("no connectors"));
537 return false;
538 }
539
540
541 }
542
543 InPortConnector* connector = nullptr;
544
545 if (name.empty())
546 {
547 connector = m_connectors[0];
548 }
549 else
550 {
551 for(auto & con : m_connectors)
552 {
553 if (std::string(con->name()) == name)
554 {
555 connector = con;
556 }
557 }
558 }
559
560 if (connector == nullptr)
561 {
562 RTC_ERROR(("can not find %s",name.c_str()));
563 return false;
564 }
565
566 if (!connector->getDirectData(m_value))
567 {
568 {
569 std::lock_guard<std::mutex> guard(m_connectorsMutex);
570 // In single-buffer mode, all connectors share the same buffer. This
571 // means that we only need to read from the first connector to get data
572 // received by any connector.
573 ret = connector->read(m_value);
574 }
575 m_status[0] = ret;
576 if (ret == DataPortStatus::PORT_OK)
577 {
578 std::lock_guard<std::mutex> guard(m_valueMutex);
579 RTC_DEBUG(("data read succeeded"));
580
581 if (m_OnReadConvert != nullptr)
582 {
583 m_value = (*m_OnReadConvert)(m_value);
584 RTC_DEBUG(("OnReadConvert called"));
585 return true;
586 }
587 return true;
588 }
589 else if (ret == DataPortStatus::BUFFER_EMPTY)
590 {
591 RTC_WARN(("buffer empty"));
592 return false;
593 }
594 else if (ret == DataPortStatus::BUFFER_TIMEOUT)
595 {
596 RTC_WARN(("buffer read timeout"));
597 return false;
598 }
599 }
600 else
601 {
602 return true;
603 }
604 RTC_ERROR(("unknown retern value from buffer.read()"));
605 return false;
606 }
607
608
631 virtual void update()
632 {
633 this->read();
634 }
635
656 void operator>>(DataType& rhs)
657 {
658 this->read();
659 CORBA_Util::copyData<DataType>(rhs, m_value);
660 return;
661 }
662
696 {
697 return m_status[0];
698 }
730 {
731 return m_status;
732 }
733
755 inline void setOnRead(OnRead<DataType>* on_read)
756 {
757 m_OnRead = on_read;
758 }
759
784 {
785 m_OnReadConvert = on_rconvert;
786 }
787 protected:
803 {
804 delete m_listeners;
806 }
807 private:
808 std::string m_typename;
816 std::string m_name;
817
825 DataType& m_value;
826 mutable std::mutex m_valueMutex;
827
835 OnRead<DataType>* m_OnRead;
836
844 OnReadConvert<DataType>* m_OnReadConvert;
845
853 DataPortStatusList m_status;
854
862 bool m_directNewData;
863 };
864
865 template <class T> InPort<T>::~InPort() = default; // No inline for gcc warning, too big
866} // namespace RTC
867
868#endif // RTC_INPORT_H
CORBA CDR Stream Buffer class.
void CdrMemoryStreamInit()
Definition CORBA_CdrMemoryStream.h:650
Data type utility function.
DirectInPortBase class.
RTC::Port implementation for InPort.
InPortConnector base class.
PortCallback class.
RTComponent header.
#define RTC_WARN(fmt)
Warning log output macro.
Definition SystemLogger.h:621
#define RTC_TRACE(fmt)
Trace level log output macro.
Definition SystemLogger.h:687
#define RTC_DEBUG(fmt)
Debug level log output macro.
Definition SystemLogger.h:665
#define RTC_ERROR(fmt)
Error log output macro.
Definition SystemLogger.h:599
Timestamp listener class.
Typename function.
DataPortStatus mixin class.
ConnectorListenersT class.
Definition ConnectorListener.h:2086
Definition DirectInPortBase.h:48
Port for InPort.
Definition InPortBase.h:70
ConnectorList m_connectors
Connection list .
Definition InPortBase.h:884
ConnectorListenersBase * m_listeners
ConnectorDataListener listener .
Definition InPortBase.h:901
void addConnectorDataListener(ConnectorDataListenerType type, ConnectorDataListener *listener, bool autoclean=true)
Adding BufferDataListener type listener.
InPortConnector base class.
Definition InPortConnector.h:54
bool getDirectData(DataType &data)
Definition InPortConnector.h:321
virtual DataPortStatus read(ByteDataStreamBase *data)=0
Destructor.
InPort template class.
Definition InPort.h:89
bool read(std::string name="") override
Readout the value from DataPort.
Definition InPort.h:504
void operator>>(DataType &rhs)
Read the newly value data in InPort to type-T variable.
Definition InPort.h:656
virtual bool isEmpty(coil::vstring &names)
Definition InPort.h:359
DataPortStatusList getStatusList()
Getting specified connector's writing status list.
Definition InPort.h:729
virtual bool isNew(coil::vstring &names)
Definition InPort.h:236
virtual bool isNew(std::string name)
Check whether the data is newest.
Definition InPort.h:206
void initConnectorListeners() override
Definition InPort.h:802
bool isEmpty() override
Check whether the data is newest.
Definition InPort.h:394
void setOnRead(OnRead< DataType > *on_read)
Set callback when data is read from the InPort buffer.
Definition InPort.h:755
void write(DataType &data) override
Definition InPort.h:423
DataPortStatus getStatus(int)
Getting specified connector's writing status.
Definition InPort.h:695
bool isNew() override
Check whether the data is newest.
Definition InPort.h:270
virtual void update()
Read the newly value to type-T variable which is bound to InPort's buffer.
Definition InPort.h:631
virtual bool isEmpty(std::string name)
Check whether the data is newest.
Definition InPort.h:329
virtual const char * name()
Get port name.
Definition InPort.h:176
~InPort() override
Destructor.
InPort(const char *name, DataType &value)
A constructor.
Definition InPort.h:115
void setOnReadConvert(OnReadConvert< DataType > *on_rconvert)
Set callback when data is readout to the InPort buffer.
Definition InPort.h:783
std::mutex m_connectorsMutex
Definition PortBase.h:2115
DirectPortBase * m_directport
Definition PortBase.h:2239
void addProperty(const char *key, ValueType value)
Add NameValue data to PortProfile's properties.
Definition PortBase.h:1877
Definition Timestamp.h:61
Definition DataTypeUtil.h:24
void copyData(T &data1, const T &data2)
Definition DataTypeUtil.h:43
RT-Component.
std::vector< DataPortStatus > DataPortStatusList
Definition DataPortStatus.h:175
std::vector< std::string > getSerializerList()
Definition ByteDataStreamBase.h:385
Data convert callback abstract class on read()
Definition PortCallback.h:386
Callback abstract class on read()
Definition PortCallback.h:324