OpenRTM-aist 2.0.2
Loading...
Searching...
No Matches
RingBuffer.h
Go to the documentation of this file.
1// -*- C++ -*-
20#ifndef RTC_RINGBUFFER_H
21#define RTC_RINGBUFFER_H
22
23#include <mutex>
24#include <condition_variable>
25#include <coil/stringutil.h>
26
27#include <rtm/BufferBase.h>
28#include <rtm/BufferStatus.h>
29
30#include <algorithm>
31#include <iostream>
32#include <string>
33#include <vector>
34
35#define RINGBUFFER_DEFAULT_LENGTH 8
50namespace RTC
51{
87 template <class DataType>
89 : public BufferBase<DataType>
90 {
91 public:
116 : m_length(length), m_buffer(m_length)
117 {
118 this->reset();
119 }
120
136 ~RingBuffer() override;
137
177 void init(const coil::Properties& prop) override
178 {
179 initLength(prop);
180 initWritePolicy(prop);
181 initReadPolicy(prop);
182 }
183
204 size_t length() const override
205 {
206 std::lock_guard<std::mutex> guard(m_posmutex);
207 return m_length;
208 }
209
230 BufferStatus length(size_t n) override
231 {
232 m_buffer.resize(n);
233 m_length = n;
234 this->reset();
235 return BufferStatus::OK;
236 }
237
261 {
262 std::lock_guard<std::mutex> guard(m_posmutex);
263 m_fillcount = 0;
264 m_wcount = 0;
265 m_wpos = 0;
266 m_rpos = 0;
267 return BufferStatus::OK;
268 }
269
270
271
272 //----------------------------------------------------------------------
293 DataType* wptr(long int n = 0) override
294 {
295 std::lock_guard<std::mutex> guard(m_posmutex);
296 return &m_buffer[(m_wpos + n + m_length) % m_length];
297 }
298
323 BufferStatus advanceWptr(long int n = 1, bool unlock_enable = true) override
324 {
325 bool empty_ = false;
326 bool lock_ = (unlock_enable && n > 0);
327 if(lock_)
328 {
329 m_empty.mutex.lock();
330 empty_ = empty();
331 }
332 // n > 0 :
333 // n satisfies n <= writable elements
334 // n <= m_length - m_fillcout
335 // n < 0 : -n = n'
336 // n satisfies n'<= readable elements
337 // n'<= m_fillcount
338 // n >= - m_fillcount
339 {
340 std::lock_guard<std::mutex> guard(m_posmutex);
341 if ((n > 0 && n > static_cast<long int>(m_length) - static_cast<long int>(m_fillcount)) ||
342 (n < 0 && n < -static_cast<long int>(m_fillcount)))
343 {
344 if (lock_)
345 {
346 m_empty.mutex.unlock();
347 }
349 }
350
351 m_wpos = (m_wpos + n + m_length) % m_length;
352 m_fillcount += n;
353 m_wcount += n;
354 }
355
356 if(lock_)
357 {
358 if(empty_)
359 {
360 m_empty.cond.notify_one();
361 }
362 m_empty.mutex.unlock();
363 }
364
365 return BufferStatus::OK;
366 }
394 BufferStatus put(const DataType& value) override
395 {
396 std::lock_guard<std::mutex> guard(m_posmutex);
397 m_buffer[m_wpos] = value;
398 return BufferStatus::OK;
399 }
400
441 BufferStatus write(const DataType& value,
442 std::chrono::nanoseconds timeout
443 = std::chrono::nanoseconds(-1)) override
444 {
445 {
446 std::unique_lock<std::mutex> guard(m_full.mutex);
447
448 if (full())
449 {
450
451 bool timedwrite(m_timedwrite);
452 bool overwrite(m_overwrite);
453
454 if (timeout >= std::chrono::seconds::zero()) // block mode
455 {
456 timedwrite = true;
457 overwrite = false;
458 }
459
460 if (overwrite && !timedwrite) // "overwrite" mode
461 {
462 advanceRptr(1,false);
463 }
464 else if (!overwrite && !timedwrite) // "do_nothing" mode
465 {
466 return BufferStatus::FULL;
467 }
468 else if (!overwrite && timedwrite) // "block" mode
469 {
470 if (timeout < std::chrono::seconds::zero())
471 {
472 timeout = m_wtimeout;
473 }
474 if (std::cv_status::timeout == m_empty.cond.wait_for(guard, timeout))
475 {
477 }
478 }
479 else // unknown condition
480 {
482 }
483 }
484 }
485
486 put(value);
487
488 advanceWptr(1);
489
490
491 return BufferStatus::OK;
492 }
493
515 size_t writable() const override
516 {
517 std::lock_guard<std::mutex> guard(m_posmutex);
518 return m_length - m_fillcount;
519 }
520
540 bool full() const override
541 {
542 std::lock_guard<std::mutex> guard(m_posmutex);
543 return m_length == m_fillcount;
544 }
545
546 //----------------------------------------------------------------------
567 DataType* rptr(long int n = 0) override
568 {
569 std::lock_guard<std::mutex> guard(m_posmutex);
570 return &(m_buffer[(m_rpos + n + m_length) % m_length]);
571 }
572
595 BufferStatus advanceRptr(long int n = 1, bool unlock_enable = true) override
596 {
597 bool full_ = false;
598 bool lock_ = (unlock_enable && n > 0);
599 if(lock_)
600 {
601 m_full.mutex.lock();
602 full_ = full();
603 }
604 // n > 0 :
605 // n satisfies n <= readable elements
606 // n <= m_fillcout
607 // n < 0 : -n = n'
608 // n satisfies n'<= m_length - m_fillcount
609 // n >= m_fillcount - m_length
610 {
611 std::lock_guard<std::mutex> guard(m_posmutex);
612 if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
613 (n < 0 && n < static_cast<long int>(m_fillcount) - static_cast<long int>(m_length)))
614 {
615 if (lock_)
616 {
617 m_full.mutex.unlock();
618 }
620 }
621
622 m_rpos = (m_rpos + n + m_length) % m_length;
623 m_fillcount -= n;
624 }
625
626 if(lock_)
627 {
628 if(full_)
629 {
630 m_full.cond.notify_one();
631 }
632 m_full.mutex.unlock();
633 }
634
635 return BufferStatus::OK;
636 }
637
662 BufferStatus get(DataType& value) override
663 {
664 std::lock_guard<std::mutex> guard(m_posmutex);
665 value = m_buffer[m_rpos];
666 return BufferStatus::OK;
667 }
668
669
687 DataType& get() override
688 {
689 std::lock_guard<std::mutex> guard(m_posmutex);
690 return m_buffer[m_rpos];
691 }
692
693
734 BufferStatus read(DataType& value,
735 std::chrono::nanoseconds timeout
736 = std::chrono::nanoseconds(-1)) override
737 {
738 {
739 std::unique_lock<std::mutex> guard(m_empty.mutex);
740
741 if (empty())
742 {
743 bool timedread(m_timedread);
744 bool readback(m_readback);
745
746 if (timeout >= std::chrono::seconds::zero()) // block mode
747 {
748 timedread = true;
749 readback = false;
750 timeout = m_rtimeout;
751 }
752
753 if (readback && !timedread) // "readback" mode
754 {
755 if (!(m_wcount > 0))
756 {
757 return BufferStatus::EMPTY;
758 }
759 advanceRptr(-1,false);
760 }
761 else if (!readback && !timedread) // "do_nothing" mode
762 {
763 return BufferStatus::EMPTY;
764 }
765 else if (!readback && timedread) // "block" mode
766 {
767 if (timeout < std::chrono::seconds::zero())
768 {
769 timeout = m_rtimeout;
770 }
771 if (std::cv_status::timeout == m_empty.cond.wait_for(guard, timeout))
772 {
774 }
775 }
776 else // unknown condition
777 {
779 }
780 }
781 }
782
783 get(value);
784
785 advanceRptr(1);
786
787 return BufferStatus::OK;
788 }
789
814 size_t readable() const override
815 {
816 std::lock_guard<std::mutex> guard(m_posmutex);
817 return m_fillcount;
818 }
819
839 bool empty() const override
840 {
841 std::lock_guard<std::mutex> guard(m_posmutex);
842 return m_fillcount == 0;
843 }
844
845 private:
846 void initLength(const coil::Properties& prop)
847 {
848 if (!prop["length"].empty())
849 {
850 size_t n;
851 if (coil::stringTo(n, prop["length"].c_str()))
852 {
853 if (n > 0)
854 {
855 this->length(n);
856 }
857 }
858 }
859 }
860
861 void initWritePolicy(const coil::Properties& prop)
862 {
863 std::string policy(coil::normalize(prop["write.full_policy"]));
864 if (policy == "overwrite")
865 {
866 m_overwrite = true;
867 m_timedwrite = false;
868 }
869 else if (policy == "do_nothing")
870 {
871 m_overwrite = false;
872 m_timedwrite = false;
873 }
874 else if (policy == "block")
875 {
876 m_overwrite = false;
877 m_timedwrite = true;
878
879 std::chrono::nanoseconds tm;
880 if (coil::stringTo(tm, prop["write.timeout"].c_str())
881 && !(tm < std::chrono::seconds::zero()))
882 {
883 m_wtimeout = tm;
884 }
885 }
886 }
887
888 void initReadPolicy(const coil::Properties& prop)
889 {
890 std::string policy(prop["read.empty_policy"]);
891 if (policy == "readback")
892 {
893 m_readback = true;
894 m_timedread = false;
895 }
896 else if (policy == "do_nothing")
897 {
898 m_readback = false;
899 m_timedread = false;
900 }
901 else if (policy == "block")
902 {
903 m_readback = false;
904 m_timedread = true;
905 std::chrono::nanoseconds tm;
906 if (coil::stringTo(tm, prop["read.timeout"].c_str()))
907 {
908 m_rtimeout = tm;
909 }
910 }
911 }
912
913 private:
921 bool m_overwrite{true};
922
930 bool m_readback{true};
931
939 bool m_timedwrite{false};
947 bool m_timedread{false};
948
956 std::chrono::nanoseconds m_wtimeout{std::chrono::seconds(1)};
957
965 std::chrono::nanoseconds m_rtimeout{std::chrono::seconds(1)};
966
974 size_t m_length;
975
983 size_t m_wpos{0};
984
992 size_t m_rpos{0};
993
1001 size_t m_fillcount{0};
1002
1010 size_t m_wcount{0};
1011
1019 std::vector<DataType> m_buffer;
1020
1028 struct condition
1029 {
1030 condition() {}
1031 std::condition_variable cond;
1032 std::mutex mutex;
1033 };
1034
1042 mutable std::mutex m_posmutex;
1043
1051 condition m_empty;
1052
1060 condition m_full;
1061 };
1062
1063 template <class T> RingBuffer<T>::~RingBuffer() = default; // no-inline because of its size.
1064} // namespace RTC
1065
1066#endif // RTC_RINGBUFFER_H
Buffer abstract class.
Buffer status enum definition.
#define RINGBUFFER_DEFAULT_LENGTH
Definition RingBuffer.h:35
BufferBase abstract class.
Definition BufferBase.h:106
Ring buffer implementation class.
Definition RingBuffer.h:90
DataType * rptr(long int n=0) override
Get the buffer length.
Definition RingBuffer.h:567
size_t writable() const override
Write data into the buffer.
Definition RingBuffer.h:515
BufferStatus write(const DataType &value, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1)) override
Write data into the buffer.
Definition RingBuffer.h:441
size_t readable() const override
Write data into the buffer.
Definition RingBuffer.h:814
BufferStatus advanceWptr(long int n=1, bool unlock_enable=true) override
Get the buffer length.
Definition RingBuffer.h:323
size_t length() const override
Get the buffer length.
Definition RingBuffer.h:204
RingBuffer(long int length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
Definition RingBuffer.h:115
BufferStatus advanceRptr(long int n=1, bool unlock_enable=true) override
Get the buffer length.
Definition RingBuffer.h:595
BufferStatus reset() override
Get the buffer length.
Definition RingBuffer.h:260
BufferStatus length(size_t n) override
Get the buffer length.
Definition RingBuffer.h:230
DataType * wptr(long int n=0) override
Get the buffer length.
Definition RingBuffer.h:293
BufferStatus get(DataType &value) override
Write data into the buffer.
Definition RingBuffer.h:662
DataType & get() override
Reading data from the buffer.
Definition RingBuffer.h:687
~RingBuffer() override
Virtual destractor.
bool full() const override
Check on whether the buffer is full.
Definition RingBuffer.h:540
BufferStatus put(const DataType &value) override
Write data into the buffer.
Definition RingBuffer.h:394
bool empty() const override
Check on whether the buffer is empty.
Definition RingBuffer.h:839
BufferStatus read(DataType &value, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1)) override
Readout data from the buffer.
Definition RingBuffer.h:734
void init(const coil::Properties &prop) override
Definition RingBuffer.h:177
RT-Component.
BufferStatus
DataPortStatus return codes.
Definition BufferStatus.h:57