OpenRTM-aist  2.1.0
RingBuffer.h
Go to the documentation of this file.
1 // -*- C++ -*-
20 #ifndef RTC_RINGBUFFER_H
21 #define RTC_RINGBUFFER_H
22 
23 #include <mutex>
24 #include <condition_variable>
25 #include <coil/stringutil.h>
26 
27 #include <rtm/BufferBase.h>
28 #include <rtm/BufferStatus.h>
29 
30 #include <algorithm>
31 #include <iostream>
32 #include <string>
33 #include <vector>
34 
35 #define RINGBUFFER_DEFAULT_LENGTH 8
50 namespace RTC
51 {
87  template <class DataType>
88  class RingBuffer
89  : public BufferBase<DataType>
90  {
91  public:
116  : m_length(length), m_buffer(m_length)
117  {
118  this->reset();
119  }
120 
136  ~RingBuffer() override;
137 
177  void init(const coil::Properties& prop) override
178  {
179  initLength(prop);
180  initWritePolicy(prop);
181  initReadPolicy(prop);
182  }
183 
204  size_t length() const override
205  {
206  std::lock_guard<std::mutex> guard(m_posmutex);
207  return m_length;
208  }
209 
230  BufferStatus length(size_t n) override
231  {
232  m_buffer.resize(n);
233  m_length = n;
234  this->reset();
235  return BufferStatus::OK;
236  }
237 
260  BufferStatus reset() override
261  {
262  std::lock_guard<std::mutex> guard(m_posmutex);
263  m_fillcount = 0;
264  m_wcount = 0;
265  m_wpos = 0;
266  m_rpos = 0;
267  return BufferStatus::OK;
268  }
269 
270 
271 
272  //----------------------------------------------------------------------
293  DataType* wptr(long int n = 0) override
294  {
295  std::lock_guard<std::mutex> guard(m_posmutex);
296  return &m_buffer[(m_wpos + n + m_length) % m_length];
297  }
298 
323  BufferStatus advanceWptr(long int n = 1, bool unlock_enable = true) override
324  {
325  bool empty_ = false;
326  bool lock_ = (unlock_enable && n > 0);
327  if(lock_)
328  {
329  m_empty.mutex.lock();
330  empty_ = empty();
331  }
332  // n > 0 :
333  // n satisfies n <= writable elements
334  // n <= m_length - m_fillcout
335  // n < 0 : -n = n'
336  // n satisfies n'<= readable elements
337  // n'<= m_fillcount
338  // n >= - m_fillcount
339  {
340  std::lock_guard<std::mutex> guard(m_posmutex);
341  if ((n > 0 && n > static_cast<long int>(m_length) - static_cast<long int>(m_fillcount)) ||
342  (n < 0 && n < -static_cast<long int>(m_fillcount)))
343  {
344  if (lock_)
345  {
346  m_empty.mutex.unlock();
347  }
349  }
350 
351  m_wpos = (m_wpos + n + m_length) % m_length;
352  m_fillcount += n;
353  m_wcount += n;
354  }
355 
356  if(lock_)
357  {
358  if(empty_)
359  {
360  m_empty.cond.notify_one();
361  }
362  m_empty.mutex.unlock();
363  }
364 
365  return BufferStatus::OK;
366  }
394  BufferStatus put(const DataType& value) override
395  {
396  std::lock_guard<std::mutex> guard(m_posmutex);
397  m_buffer[m_wpos] = value;
398  return BufferStatus::OK;
399  }
400 
441  BufferStatus write(const DataType& value,
442  std::chrono::nanoseconds timeout
443  = std::chrono::nanoseconds(-1)) override
444  {
445  {
446  std::unique_lock<std::mutex> guard(m_full.mutex);
447 
448  if (full())
449  {
450 
451  bool timedwrite(m_timedwrite);
452  bool overwrite(m_overwrite);
453 
454  if (timeout >= std::chrono::seconds::zero()) // block mode
455  {
456  timedwrite = true;
457  overwrite = false;
458  }
459 
460  if (overwrite && !timedwrite) // "overwrite" mode
461  {
462  advanceRptr(1,false);
463  }
464  else if (!overwrite && !timedwrite) // "do_nothing" mode
465  {
466  return BufferStatus::FULL;
467  }
468  else if (!overwrite && timedwrite) // "block" mode
469  {
470  if (timeout < std::chrono::seconds::zero())
471  {
472  timeout = m_wtimeout;
473  }
474  if (std::cv_status::timeout == m_empty.cond.wait_for(guard, timeout))
475  {
476  return BufferStatus::TIMEOUT;
477  }
478  }
479  else // unknown condition
480  {
482  }
483  }
484  }
485 
486  put(value);
487 
488  advanceWptr(1);
489 
490 
491  return BufferStatus::OK;
492  }
493 
515  size_t writable() const override
516  {
517  std::lock_guard<std::mutex> guard(m_posmutex);
518  return m_length - m_fillcount;
519  }
520 
540  bool full() const override
541  {
542  std::lock_guard<std::mutex> guard(m_posmutex);
543  return m_length == m_fillcount;
544  }
545 
546  //----------------------------------------------------------------------
567  DataType* rptr(long int n = 0) override
568  {
569  std::lock_guard<std::mutex> guard(m_posmutex);
570  return &(m_buffer[(m_rpos + n + m_length) % m_length]);
571  }
572 
595  BufferStatus advanceRptr(long int n = 1, bool unlock_enable = true) override
596  {
597  bool full_ = false;
598  bool lock_ = (unlock_enable && n > 0);
599  if(lock_)
600  {
601  m_full.mutex.lock();
602  full_ = full();
603  }
604  // n > 0 :
605  // n satisfies n <= readable elements
606  // n <= m_fillcout
607  // n < 0 : -n = n'
608  // n satisfies n'<= m_length - m_fillcount
609  // n >= m_fillcount - m_length
610  {
611  std::lock_guard<std::mutex> guard(m_posmutex);
612  if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
613  (n < 0 && n < static_cast<long int>(m_fillcount) - static_cast<long int>(m_length)))
614  {
615  if (lock_)
616  {
617  m_full.mutex.unlock();
618  }
620  }
621 
622  m_rpos = (m_rpos + n + m_length) % m_length;
623  m_fillcount -= n;
624  }
625 
626  if(lock_)
627  {
628  if(full_)
629  {
630  m_full.cond.notify_one();
631  }
632  m_full.mutex.unlock();
633  }
634 
635  return BufferStatus::OK;
636  }
637 
662  BufferStatus get(DataType& value) override
663  {
664  std::lock_guard<std::mutex> guard(m_posmutex);
665  value = m_buffer[m_rpos];
666  return BufferStatus::OK;
667  }
668 
669 
687  DataType& get() override
688  {
689  std::lock_guard<std::mutex> guard(m_posmutex);
690  return m_buffer[m_rpos];
691  }
692 
693 
734  BufferStatus read(DataType& value,
735  std::chrono::nanoseconds timeout
736  = std::chrono::nanoseconds(-1)) override
737  {
738  {
739  std::unique_lock<std::mutex> guard(m_empty.mutex);
740 
741  if (empty())
742  {
743  bool timedread(m_timedread);
744  bool readback(m_readback);
745 
746  if (timeout >= std::chrono::seconds::zero()) // block mode
747  {
748  timedread = true;
749  readback = false;
750  timeout = m_rtimeout;
751  }
752 
753  if (readback && !timedread) // "readback" mode
754  {
755  if (!(m_wcount > 0))
756  {
757  return BufferStatus::EMPTY;
758  }
759  advanceRptr(-1,false);
760  }
761  else if (!readback && !timedread) // "do_nothing" mode
762  {
763  return BufferStatus::EMPTY;
764  }
765  else if (!readback && timedread) // "block" mode
766  {
767  if (timeout < std::chrono::seconds::zero())
768  {
769  timeout = m_rtimeout;
770  }
771  if (std::cv_status::timeout == m_empty.cond.wait_for(guard, timeout))
772  {
773  return BufferStatus::TIMEOUT;
774  }
775  }
776  else // unknown condition
777  {
779  }
780  }
781  }
782 
783  get(value);
784 
785  advanceRptr(1);
786 
787  return BufferStatus::OK;
788  }
789 
814  size_t readable() const override
815  {
816  std::lock_guard<std::mutex> guard(m_posmutex);
817  return m_fillcount;
818  }
819 
839  bool empty() const override
840  {
841  std::lock_guard<std::mutex> guard(m_posmutex);
842  return m_fillcount == 0;
843  }
844 
845  private:
846  void initLength(const coil::Properties& prop)
847  {
848  if (!prop["length"].empty())
849  {
850  size_t n;
851  if (coil::stringTo(n, prop["length"].c_str()))
852  {
853  if (n > 0)
854  {
855  this->length(n);
856  }
857  }
858  }
859  }
860 
861  void initWritePolicy(const coil::Properties& prop)
862  {
863  std::string policy(coil::normalize(prop["write.full_policy"]));
864  if (policy == "overwrite")
865  {
866  m_overwrite = true;
867  m_timedwrite = false;
868  }
869  else if (policy == "do_nothing")
870  {
871  m_overwrite = false;
872  m_timedwrite = false;
873  }
874  else if (policy == "block")
875  {
876  m_overwrite = false;
877  m_timedwrite = true;
878 
879  std::chrono::nanoseconds tm;
880  if (coil::stringTo(tm, prop["write.timeout"].c_str())
881  && !(tm < std::chrono::seconds::zero()))
882  {
883  m_wtimeout = tm;
884  }
885  }
886  }
887 
888  void initReadPolicy(const coil::Properties& prop)
889  {
890  std::string policy(prop["read.empty_policy"]);
891  if (policy == "readback")
892  {
893  m_readback = true;
894  m_timedread = false;
895  }
896  else if (policy == "do_nothing")
897  {
898  m_readback = false;
899  m_timedread = false;
900  }
901  else if (policy == "block")
902  {
903  m_readback = false;
904  m_timedread = true;
905  std::chrono::nanoseconds tm;
906  if (coil::stringTo(tm, prop["read.timeout"].c_str()))
907  {
908  m_rtimeout = tm;
909  }
910  }
911  }
912 
913  private:
921  bool m_overwrite{true};
922 
930  bool m_readback{true};
931 
939  bool m_timedwrite{false};
947  bool m_timedread{false};
948 
956  std::chrono::nanoseconds m_wtimeout{std::chrono::seconds(1)};
957 
965  std::chrono::nanoseconds m_rtimeout{std::chrono::seconds(1)};
966 
974  size_t m_length;
975 
983  size_t m_wpos{0};
984 
992  size_t m_rpos{0};
993 
1001  size_t m_fillcount{0};
1002 
1010  size_t m_wcount{0};
1011 
1019  std::vector<DataType> m_buffer;
1020 
1028  struct condition
1029  {
1030  condition() {}
1031  std::condition_variable cond;
1032  std::mutex mutex;
1033  };
1034 
1042  mutable std::mutex m_posmutex;
1043 
1051  condition m_empty;
1052 
1060  condition m_full;
1061  };
1062 
1063  template <class T> RingBuffer<T>::~RingBuffer() = default; // no-inline because of its size.
1064 } // namespace RTC
1065 
1066 #endif // RTC_RINGBUFFER_H
Buffer abstract class.
Buffer status enum definition.
#define RINGBUFFER_DEFAULT_LENGTH
Definition: RingBuffer.h:35
BufferBase abstract class.
Definition: BufferBase.h:106
Ring buffer implementation class.
Definition: RingBuffer.h:90
DataType & get() override
Reading data from the buffer.
Definition: RingBuffer.h:687
size_t writable() const override
Write data into the buffer.
Definition: RingBuffer.h:515
BufferStatus write(const DataType &value, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1)) override
Write data into the buffer.
Definition: RingBuffer.h:441
size_t readable() const override
Write data into the buffer.
Definition: RingBuffer.h:814
BufferStatus advanceWptr(long int n=1, bool unlock_enable=true) override
Get the buffer length.
Definition: RingBuffer.h:323
size_t length() const override
Get the buffer length.
Definition: RingBuffer.h:204
RingBuffer(long int length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
Definition: RingBuffer.h:115
BufferStatus advanceRptr(long int n=1, bool unlock_enable=true) override
Get the buffer length.
Definition: RingBuffer.h:595
BufferStatus reset() override
Get the buffer length.
Definition: RingBuffer.h:260
DataType * wptr(long int n=0) override
Get the buffer length.
Definition: RingBuffer.h:293
BufferStatus length(size_t n) override
Get the buffer length.
Definition: RingBuffer.h:230
BufferStatus get(DataType &value) override
Write data into the buffer.
Definition: RingBuffer.h:662
DataType * rptr(long int n=0) override
Get the buffer length.
Definition: RingBuffer.h:567
~RingBuffer() override
Virtual destractor.
bool full() const override
Check on whether the buffer is full.
Definition: RingBuffer.h:540
BufferStatus put(const DataType &value) override
Write data into the buffer.
Definition: RingBuffer.h:394
bool empty() const override
Check on whether the buffer is empty.
Definition: RingBuffer.h:839
BufferStatus read(DataType &value, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1)) override
Readout data from the buffer.
Definition: RingBuffer.h:734
void init(const coil::Properties &prop) override
Set the buffer.
Definition: RingBuffer.h:177
RT-Component.
coil::Properties Properties
Definition: RTC.h:72
BufferStatus
DataPortStatus return codes.
Definition: BufferStatus.h:57