00001
00020 #ifndef RTC_RINGBUFFER_H
00021 #define RTC_RINGBUFFER_H
00022
00023 #include <vector>
00024 #include <algorithm>
00025 #include <iostream>
00026
00027 #include <coil/TimeValue.h>
00028 #include <coil/Mutex.h>
00029 #include <coil/Guard.h>
00030 #include <coil/Condition.h>
00031 #include <coil/stringutil.h>
00032
00033 #include <rtm/BufferBase.h>
00034 #include <rtm/BufferStatus.h>
00035
00036 #define RINGBUFFER_DEFAULT_LENGTH 8
00037
00051 namespace RTC
00052 {
00088 template <class DataType>
00089 class RingBuffer
00090 : public BufferBase<DataType>
00091 {
00092 public:
00093 BUFFERSTATUS_ENUM
00094 typedef coil::Guard<coil::Mutex> Guard;
00118 RingBuffer(long int length = RINGBUFFER_DEFAULT_LENGTH)
00119 : m_overwrite(true), m_readback(true),
00120 m_timedwrite(false), m_timedread(false),
00121 m_wtimeout(1, 0), m_rtimeout(1, 0),
00122 m_length(length),
00123 m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0),
00124 m_buffer(m_length)
00125 {
00126 this->reset();
00127 }
00128
00144 virtual ~RingBuffer(void)
00145 {
00146 }
00147
00187 virtual void init(const coil::Properties& prop)
00188 {
00189 initLength(prop);
00190 initWritePolicy(prop);
00191 initReadPolicy(prop);
00192 }
00193
00214 virtual size_t length(void) const
00215 {
00216 Guard guard(m_posmutex);
00217 return m_length;
00218 }
00219
00242 virtual ReturnCode length(size_t n)
00243 {
00244 m_buffer.resize(n);
00245 m_length = n;
00246 this->reset();
00247 return ::RTC::BufferStatus::BUFFER_OK;
00248 }
00249
00272 virtual ReturnCode reset()
00273 {
00274 Guard guard(m_posmutex);
00275 m_fillcount = 0;
00276 m_wcount = 0;
00277 m_wpos = 0;
00278 m_rpos = 0;
00279 return ::RTC::BufferStatus::BUFFER_OK;
00280 }
00281
00282
00283
00284
00305 virtual DataType* wptr(long int n = 0)
00306 {
00307 Guard guard(m_posmutex);
00308 return &m_buffer[(m_wpos + n + m_length) % m_length];
00309 }
00310
00334 virtual ReturnCode advanceWptr(long int n = 1)
00335 {
00336
00337
00338
00339
00340
00341
00342
00343 Guard guard(m_posmutex);
00344 if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) ||
00345 (n < 0 && n < static_cast<long int>(-m_fillcount)))
00346 {
00347 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00348 }
00349
00350 m_wpos = (m_wpos + n + m_length) % m_length;
00351 m_fillcount += n;
00352 m_wcount += n;
00353 return ::RTC::BufferStatus::BUFFER_OK;
00354 }
00382 virtual ReturnCode put(const DataType& value)
00383 {
00384 Guard guard(m_posmutex);
00385 m_buffer[m_wpos] = value;
00386 return ::RTC::BufferStatus::BUFFER_OK;
00387 }
00388
00430 virtual ReturnCode write(const DataType& value,
00431 long int sec = -1, long int nsec = 0)
00432 {
00433 {
00434 Guard guard(m_full.mutex);
00435
00436 if (full())
00437 {
00438
00439 bool timedwrite(m_timedwrite);
00440 bool overwrite(m_overwrite);
00441
00442 if (!(sec < 0))
00443 {
00444 timedwrite = true;
00445 overwrite = false;
00446 }
00447
00448 if (overwrite && !timedwrite)
00449 {
00450 advanceRptr();
00451 }
00452 else if (!overwrite && !timedwrite)
00453 {
00454 return ::RTC::BufferStatus::BUFFER_FULL;
00455 }
00456 else if (!overwrite && timedwrite)
00457 {
00458 if (sec < 0)
00459 {
00460 sec = m_wtimeout.sec();
00461 nsec = m_wtimeout.usec() * 1000;
00462 }
00463
00464 if (!m_full.cond.wait(sec, nsec))
00465 {
00466 return ::RTC::BufferStatus::TIMEOUT;
00467 }
00468 }
00469 else
00470 {
00471 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00472 }
00473 }
00474 }
00475
00476 put(value);
00477
00478 {
00479 Guard eguard(m_empty.mutex);
00480 if (empty())
00481 {
00482
00483 advanceWptr(1);
00484 m_empty.cond.signal();
00485 }
00486 else
00487 {
00488 advanceWptr(1);
00489 }
00490 }
00491 return ::RTC::BufferStatus::BUFFER_OK;
00492 }
00493
00515 virtual size_t writable() const
00516 {
00517 Guard guard(m_posmutex);
00518 return m_length - m_fillcount;
00519 }
00520
00540 virtual bool full(void) const
00541 {
00542 Guard guard(m_posmutex);
00543 return m_length == m_fillcount;
00544 }
00545
00546
00567 virtual DataType* rptr(long int n = 0)
00568 {
00569 Guard guard(m_posmutex);
00570 return &(m_buffer[(m_rpos + n + m_length) % m_length]);
00571 }
00572
00594 virtual ReturnCode advanceRptr(long int n = 1)
00595 {
00596
00597
00598
00599
00600
00601
00602 Guard guard(m_posmutex);
00603 if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
00604 (n < 0 && n < static_cast<long int>(m_fillcount - m_length)))
00605 {
00606 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00607 }
00608
00609 m_rpos = (m_rpos + n + m_length) % m_length;
00610 m_fillcount -= n;
00611 return ::RTC::BufferStatus::BUFFER_OK;
00612 }
00613
00638 virtual ReturnCode get(DataType& value)
00639 {
00640 Guard gaurd(m_posmutex);
00641 value = m_buffer[m_rpos];
00642 return ::RTC::BufferStatus::BUFFER_OK;
00643 }
00644
00645
00663 virtual DataType& get()
00664 {
00665 Guard gaurd(m_posmutex);
00666 return m_buffer[m_rpos];
00667 }
00668
00669
00711 virtual ReturnCode read(DataType& value,
00712 long int sec = -1, long int nsec = 0)
00713 {
00714 {
00715 Guard gaurd(m_empty.mutex);
00716
00717 if (empty())
00718 {
00719 bool timedread(m_timedread);
00720 bool readback(m_readback);
00721
00722 if (!(sec < 0))
00723 {
00724 timedread = true;
00725 readback = false;
00726 sec = m_rtimeout.sec();
00727 nsec = m_rtimeout.usec() * 1000;
00728 }
00729
00730 if (readback && !timedread)
00731 {
00732 if (!(m_wcount > 0))
00733 {
00734 return ::RTC::BufferStatus::BUFFER_EMPTY;
00735 }
00736 advanceRptr(-1);
00737 }
00738 else if (!readback && !timedread)
00739 {
00740 return ::RTC::BufferStatus::BUFFER_EMPTY;
00741 }
00742 else if (!readback && timedread)
00743 {
00744 if (sec < 0)
00745 {
00746 sec = m_rtimeout.sec();
00747 nsec = m_rtimeout.usec() * 1000;
00748 }
00749
00750 if (!m_empty.cond.wait(sec, nsec))
00751 {
00752 return ::RTC::BufferStatus::TIMEOUT;
00753 }
00754 }
00755 else
00756 {
00757 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00758 }
00759 }
00760 }
00761
00762 get(value);
00763
00764 {
00765 Guard fguard(m_full.mutex);
00766 if (full())
00767 {
00768
00769 advanceRptr(1);
00770 m_full.cond.signal();
00771 }
00772 else
00773 {
00774 advanceRptr(1);
00775 }
00776 }
00777 return ::RTC::BufferStatus::BUFFER_OK;
00778 }
00779
00804 virtual size_t readable() const
00805 {
00806 Guard guard(m_posmutex);
00807 return m_fillcount;
00808 }
00809
00829 virtual bool empty(void) const
00830 {
00831 Guard guard(m_posmutex);
00832 return m_fillcount == 0;
00833 }
00834
00835 private:
00836 inline void initLength(const coil::Properties& prop)
00837 {
00838 if (!prop["length"].empty())
00839 {
00840 size_t n;
00841 if (coil::stringTo(n, prop["length"].c_str()))
00842 {
00843 if (n > 0)
00844 {
00845 this->length(n);
00846 }
00847 }
00848 }
00849 }
00850
00851 inline void initWritePolicy(const coil::Properties& prop)
00852 {
00853 std::string policy(prop["write.full_policy"]);
00854 coil::normalize(policy);
00855 if (policy == "overwrite")
00856 {
00857 m_overwrite = true;
00858 m_timedwrite = false;
00859 }
00860 else if (policy == "do_nothing")
00861 {
00862 m_overwrite = false;
00863 m_timedwrite = false;
00864 }
00865 else if (policy == "block")
00866 {
00867 m_overwrite = false;
00868 m_timedwrite = true;
00869
00870 double tm;
00871 if (coil::stringTo(tm, prop["write.timeout"].c_str()))
00872 {
00873 if (!(tm < 0))
00874 {
00875 m_wtimeout = tm;
00876 }
00877 }
00878 }
00879 }
00880
00881 inline void initReadPolicy(const coil::Properties& prop)
00882 {
00883 std::string policy(prop["read.empty_policy"]);
00884 if (policy == "readback")
00885 {
00886 m_readback = true;
00887 m_timedread = false;
00888 }
00889 else if (policy == "do_nothing")
00890 {
00891 m_readback = false;
00892 m_timedread = false;
00893 }
00894 else if (policy == "block")
00895 {
00896 m_readback = false;
00897 m_timedread = true;
00898 double tm;
00899 if (coil::stringTo(tm, prop["read.timeout"].c_str()))
00900 {
00901 m_rtimeout = tm;
00902 }
00903 }
00904 }
00905
00906 private:
00914 bool m_overwrite;
00915
00923 bool m_readback;
00924
00932 bool m_timedwrite;
00940 bool m_timedread;
00941
00949 coil::TimeValue m_wtimeout;
00950
00958 coil::TimeValue m_rtimeout;
00959
00967 size_t m_length;
00968
00976 size_t m_wpos;
00977
00985 size_t m_rpos;
00986
00994 size_t m_fillcount;
00995
01003 size_t m_wcount;
01004
01012 std::vector<DataType> m_buffer;
01013
01021 struct condition
01022 {
01023 condition() : cond(mutex) {}
01024 coil::Condition<coil::Mutex> cond;
01025 coil::Mutex mutex;
01026 };
01027
01035 mutable coil::Mutex m_posmutex;
01036
01044 condition m_empty;
01045
01053 condition m_full;
01054 };
01055 };
01056
01057 #endif // RTC_RINGBUFFER_H