00001
00020 #ifndef RTC_PUBLISHERNEW_H
00021 #define RTC_PUBLISHERNEW_H
00022
00023 #include <coil/Task.h>
00024 #include <coil/Mutex.h>
00025 #include <coil/Condition.h>
00026 #include <coil/PeriodicTask.h>
00027
00028 #include <rtm/RTC.h>
00029 #include <rtm/PublisherBase.h>
00030 #include <rtm/CdrBufferBase.h>
00031 #include <rtm/DataPortStatus.h>
00032 #include <rtm/SystemLogger.h>
00033 #include <rtm/ConnectorBase.h>
00034 #include <rtm/ConnectorListener.h>
00035
00036 namespace coil
00037 {
00038 class Properties;
00039 };
00040
00041 namespace RTC
00042 {
00043 class InPortConsumer;
00044
00069 class PublisherNew
00070 : public PublisherBase
00071 {
00072 public:
00073 typedef coil::Mutex Mutex;
00074 typedef coil::Condition<Mutex> Condition;
00075 typedef coil::Guard<coil::Mutex> Guard;
00076 DATAPORTSTATUS_ENUM
00077
00091 PublisherNew();
00092
00106 virtual ~PublisherNew(void);
00107
00159 virtual ReturnCode init(coil::Properties& prop);
00160
00186 virtual ReturnCode setConsumer(InPortConsumer* consumer);
00187
00213 virtual ReturnCode setBuffer(CdrBufferBase* buffer);
00214
00248 virtual ReturnCode setListener(ConnectorInfo& info,
00249 ConnectorListeners* listeners);
00250
00326 virtual ReturnCode write(const cdrMemoryStream& data,
00327 unsigned long sec,
00328 unsigned long usec);
00329
00357 virtual bool isActive();
00358
00384 virtual ReturnCode activate();
00385
00411 virtual ReturnCode deactivate();
00412
00426 virtual int svc(void);
00427
00428 protected:
00429 enum Policy
00430 {
00431 ALL,
00432 FIFO,
00433 SKIP,
00434 NEW
00435 };
00436
00444 void setPushPolicy(const coil::Properties& prop);
00445
00453 bool createTask(const coil::Properties& prop);
00454
00458 ReturnCode pushAll();
00459
00463 ReturnCode pushFifo();
00464
00468 ReturnCode pushSkip();
00469
00473 ReturnCode pushNew();
00474
00530 ReturnCode convertReturn(BufferStatus::Enum status,
00531 const cdrMemoryStream& data);
00532
00550 ReturnCode invokeListener(DataPortStatus::Enum status,
00551 const cdrMemoryStream& data);
00552
00562 inline void onBufferWrite(const cdrMemoryStream& data)
00563 {
00564 m_listeners->
00565 connectorData_[ON_BUFFER_WRITE].notify(m_profile, data);
00566 }
00567
00577 inline void onBufferFull(const cdrMemoryStream& data)
00578 {
00579 m_listeners->
00580 connectorData_[ON_BUFFER_FULL].notify(m_profile, data);
00581 }
00582
00592 inline void onBufferWriteTimeout(const cdrMemoryStream& data)
00593 {
00594 m_listeners->
00595 connectorData_[ON_BUFFER_WRITE_TIMEOUT].notify(m_profile, data);
00596 }
00597
00607 inline void onBufferWriteOverwrite(const cdrMemoryStream& data)
00608 {
00609 m_listeners->
00610 connectorData_[ON_BUFFER_OVERWRITE].notify(m_profile, data);
00611 }
00612
00622 inline void onBufferRead(const cdrMemoryStream& data)
00623 {
00624 m_listeners->
00625 connectorData_[ON_BUFFER_READ].notify(m_profile, data);
00626 }
00627
00637 inline void onSend(const cdrMemoryStream& data)
00638 {
00639 m_listeners->
00640 connectorData_[ON_SEND].notify(m_profile, data);
00641 }
00642
00652 inline void onReceived(const cdrMemoryStream& data)
00653 {
00654 m_listeners->
00655 connectorData_[ON_RECEIVED].notify(m_profile, data);
00656 }
00657
00667 inline void onReceiverFull(const cdrMemoryStream& data)
00668 {
00669 m_listeners->
00670 connectorData_[ON_RECEIVER_FULL].notify(m_profile, data);
00671 }
00672
00682 inline void onReceiverTimeout(const cdrMemoryStream& data)
00683 {
00684 m_listeners->
00685 connectorData_[ON_RECEIVER_TIMEOUT].notify(m_profile, data);
00686 }
00687
00697 inline void onReceiverError(const cdrMemoryStream& data)
00698 {
00699 m_listeners->
00700 connectorData_[ON_RECEIVER_ERROR].notify(m_profile, data);
00701 }
00702
00712 inline void onSenderError()
00713 {
00714 m_listeners->
00715 connector_[ON_SENDER_ERROR].notify(m_profile);
00716 }
00717
00718
00719 private:
00720 Logger rtclog;
00721 InPortConsumer* m_consumer;
00722 CdrBufferBase* m_buffer;
00723 ConnectorInfo m_profile;
00724 coil::PeriodicTaskBase* m_task;
00725 ConnectorListeners* m_listeners;
00726 ReturnCode m_retcode;
00727 Mutex m_retmutex;
00728 Policy m_pushPolicy;
00729 int m_skipn;
00730 bool m_active;
00731 int m_leftskip;
00732 };
00733 };
00734
00735 extern "C"
00736 {
00737 void DLL_EXPORT PublisherNewInit();
00738 };
00739
00740 #endif // RTC_PUBLISHERNEW_H
00741