00001
00020 #ifndef RTC_PUBLISHERPERIODIC_H
00021 #define RTC_PUBLISHERPERIODIC_H
00022
00023 #include <coil/Task.h>
00024 #include <coil/Mutex.h>
00025 #include <coil/Condition.h>
00026 #include <coil/PeriodicTask.h>
00027
00028 #include <rtm/RTC.h>
00029 #include <rtm/PublisherBase.h>
00030 #include <rtm/CdrBufferBase.h>
00031 #include <rtm/SystemLogger.h>
00032 #include <rtm/ConnectorBase.h>
00033 #include <rtm/ConnectorListener.h>
00034
00035 namespace coil
00036 {
00037 class Properties;
00038 };
00039
00040 namespace RTC
00041 {
00042 class InPortConsumer;
00064 class PublisherPeriodic
00065 : public PublisherBase
00066 {
00067 public:
00068 typedef coil::Mutex Mutex;
00069 typedef coil::Condition<Mutex> Condition;
00070 typedef coil::Guard<coil::Mutex> Guard;
00071 DATAPORTSTATUS_ENUM
00072
00082 PublisherPeriodic(void);
00083
00097 virtual ~PublisherPeriodic(void);
00098
00160 virtual ReturnCode init(coil::Properties& prop);
00161
00187 virtual ReturnCode setConsumer(InPortConsumer* consumer);
00188
00214 virtual ReturnCode setBuffer(CdrBufferBase* buffer);
00215
00249 virtual ReturnCode setListener(ConnectorInfo& info,
00250 ConnectorListeners* listeners);
00326 virtual ReturnCode write(const cdrMemoryStream& data,
00327 unsigned long sec,
00328 unsigned long usec);
00356 virtual bool isActive();
00357
00383 virtual ReturnCode activate();
00384
00410 virtual ReturnCode deactivate();
00411
00425 virtual int svc(void);
00426
00427 protected:
00428 enum Policy
00429 {
00430 ALL,
00431 FIFO,
00432 SKIP,
00433 NEW
00434 };
00435
00443 void setPushPolicy(const coil::Properties& prop);
00444
00452 bool createTask(const coil::Properties& prop);
00453
00457 ReturnCode pushAll();
00458
00462 ReturnCode pushFifo();
00463
00467 ReturnCode pushSkip();
00468
00472 ReturnCode pushNew();
00473
00529 ReturnCode convertReturn(BufferStatus::Enum status,
00530 const cdrMemoryStream& data);
00531
00532
00550 ReturnCode invokeListener(DataPortStatus::Enum status,
00551 const cdrMemoryStream& data);
00552
00562 inline void onBufferWrite(const cdrMemoryStream& data)
00563 {
00564 m_listeners->
00565 connectorData_[ON_BUFFER_WRITE].notify(m_profile, data);
00566 }
00567
00577 inline void onBufferFull(const cdrMemoryStream& data)
00578 {
00579 m_listeners->
00580 connectorData_[ON_BUFFER_FULL].notify(m_profile, data);
00581 }
00582
00592 inline void onBufferWriteTimeout(const cdrMemoryStream& data)
00593 {
00594 m_listeners->
00595 connectorData_[ON_BUFFER_WRITE_TIMEOUT].notify(m_profile, data);
00596 }
00597
00607 inline void onBufferRead(const cdrMemoryStream& data)
00608 {
00609 m_listeners->
00610 connectorData_[ON_BUFFER_READ].notify(m_profile, data);
00611 }
00612
00622 inline void onSend(const cdrMemoryStream& data)
00623 {
00624 m_listeners->
00625 connectorData_[ON_SEND].notify(m_profile, data);
00626 }
00627
00637 inline void onReceived(const cdrMemoryStream& data)
00638 {
00639 m_listeners->
00640 connectorData_[ON_RECEIVED].notify(m_profile, data);
00641 }
00642
00652 inline void onReceiverFull(const cdrMemoryStream& data)
00653 {
00654 m_listeners->
00655 connectorData_[ON_RECEIVER_FULL].notify(m_profile, data);
00656 }
00657
00667 inline void onReceiverTimeout(const cdrMemoryStream& data)
00668 {
00669 m_listeners->
00670 connectorData_[ON_RECEIVER_TIMEOUT].notify(m_profile, data);
00671 }
00672
00682 inline void onReceiverError(const cdrMemoryStream& data)
00683 {
00684 m_listeners->
00685 connectorData_[ON_RECEIVER_ERROR].notify(m_profile, data);
00686 }
00687
00695 inline void onBufferEmpty()
00696 {
00697 m_listeners->
00698 connector_[ON_BUFFER_EMPTY].notify(m_profile);
00699 }
00700
00708 inline void onSenderEmpty()
00709 {
00710 m_listeners->
00711 connector_[ON_SENDER_EMPTY].notify(m_profile);
00712 }
00713
00721 inline void onSenderError()
00722 {
00723 m_listeners->
00724 connector_[ON_SENDER_ERROR].notify(m_profile);
00725 }
00726
00727
00728 private:
00729 bool bufferIsEmpty()
00730 {
00731 if (m_buffer->empty() && !m_readback)
00732 {
00733 RTC_DEBUG(("buffer empty"));
00734 onBufferEmpty();
00735 onSenderEmpty();
00736 return true;
00737 }
00738 return false;
00739 }
00740
00741 Logger rtclog;
00742 InPortConsumer* m_consumer;
00743 CdrBufferBase* m_buffer;
00744 ConnectorInfo m_profile;
00745 coil::PeriodicTaskBase* m_task;
00746 ConnectorListeners* m_listeners;
00747 ReturnCode m_retcode;
00748 Mutex m_retmutex;
00749 Policy m_pushPolicy;
00750 int m_skipn;
00751 bool m_active;
00752 bool m_readback;
00753 int m_leftskip;
00754 };
00755 };
00756
00757 extern "C"
00758 {
00759 void DLL_EXPORT PublisherPeriodicInit();
00760 };
00761
00762 #endif // RTC_PUBLISHERPERIODIC_H
00763