OpenRTM-aist  1.2.1
RingBuffer.h
Go to the documentation of this file.
1 // -*- C++ -*-
20 #ifndef RTC_RINGBUFFER_H
21 #define RTC_RINGBUFFER_H
22 
23 #include <vector>
24 #include <algorithm>
25 #include <iostream>
26 
27 #include <coil/TimeValue.h>
28 #include <coil/Mutex.h>
29 #include <coil/Guard.h>
30 #include <coil/Condition.h>
31 #include <coil/stringutil.h>
32 
33 #include <rtm/BufferBase.h>
34 #include <rtm/BufferStatus.h>
35 
36 #define RINGBUFFER_DEFAULT_LENGTH 8
37 
51 namespace RTC
52 {
88  template <class DataType>
89  class RingBuffer
90  : public BufferBase<DataType>
91  {
92  public:
119  : m_overwrite(true), m_readback(true),
120  m_timedwrite(false), m_timedread(false),
121  m_wtimeout(1, 0), m_rtimeout(1, 0),
122  m_length(length),
123  m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0),
124  m_buffer(m_length)
125  {
126  this->reset();
127  }
128 
144  virtual ~RingBuffer(void)
145  {
146  }
147 
187  virtual void init(const coil::Properties& prop)
188  {
189  initLength(prop);
190  initWritePolicy(prop);
191  initReadPolicy(prop);
192  }
193 
214  virtual size_t length(void) const
215  {
216  Guard guard(m_posmutex);
217  return m_length;
218  }
219 
242  virtual ReturnCode length(size_t n)
243  {
244  m_buffer.resize(n);
245  m_length = n;
246  this->reset();
247  return ::RTC::BufferStatus::BUFFER_OK; //BUFFER_OK;
248  }
249 
272  virtual ReturnCode reset()
273  {
274  Guard guard(m_posmutex);
275  m_fillcount = 0;
276  m_wcount = 0;
277  m_wpos = 0;
278  m_rpos = 0;
279  return ::RTC::BufferStatus::BUFFER_OK;
280  }
281 
282 
283 
284  //----------------------------------------------------------------------
305  virtual DataType* wptr(long int n = 0)
306  {
307  Guard guard(m_posmutex);
308  return &m_buffer[(m_wpos + n + m_length) % m_length];
309  }
310 
335  virtual ReturnCode advanceWptr(long int n = 1, bool unlock_enable = true)
336  {
337  bool empty_ = false;
338  bool lock_ = (unlock_enable && n > 0);
339  if(lock_)
340  {
341  m_empty.mutex.lock();
342  empty_ = empty();
343  }
344  // n > 0 :
345  // n satisfies n <= writable elements
346  // n <= m_length - m_fillcout
347  // n < 0 : -n = n'
348  // n satisfies n'<= readable elements
349  // n'<= m_fillcount
350  // n >= - m_fillcount
351  {
352  Guard guard(m_posmutex);
353  if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) ||
354  (n < 0 && n < static_cast<long int>(-m_fillcount)))
355  {
356  if (lock_)
357  {
358  m_empty.mutex.unlock();
359  }
360  return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
361  }
362 
363  m_wpos = (m_wpos + n + m_length) % m_length;
364  m_fillcount += n;
365  m_wcount += n;
366  }
367 
368  if(lock_)
369  {
370  if(empty_)
371  {
372  m_empty.cond.signal();
373  }
374  m_empty.mutex.unlock();
375  }
376 
377  return ::RTC::BufferStatus::BUFFER_OK;
378  }
406  virtual ReturnCode put(const DataType& value)
407  {
408  Guard guard(m_posmutex);
409  m_buffer[m_wpos] = value;
410  return ::RTC::BufferStatus::BUFFER_OK;
411  }
412 
454  virtual ReturnCode write(const DataType& value,
455  long int sec = -1, long int nsec = 0)
456  {
457  {
458  Guard guard(m_full.mutex);
459 
460  if (full())
461  {
462 
463  bool timedwrite(m_timedwrite);
464  bool overwrite(m_overwrite);
465 
466  if (!(sec < 0)) // if second arg is set -> block mode
467  {
468  timedwrite = true;
469  overwrite = false;
470  }
471 
472  if (overwrite && !timedwrite) // "overwrite" mode
473  {
474  advanceRptr(1,false);
475  }
476  else if (!overwrite && !timedwrite) // "do_nothing" mode
477  {
478  return ::RTC::BufferStatus::BUFFER_FULL;
479  }
480  else if (!overwrite && timedwrite) // "block" mode
481  {
482  if (sec < 0)
483  {
484  sec = m_wtimeout.sec();
485  nsec = m_wtimeout.usec() * 1000;
486  }
487  // true: signaled, false: timeout
488  if (!m_full.cond.wait(sec, nsec))
489  {
490  return ::RTC::BufferStatus::TIMEOUT;
491  }
492  }
493  else // unknown condition
494  {
495  return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
496  }
497  }
498  }
499 
500  put(value);
501 
502  advanceWptr(1);
503 
504 
505  return ::RTC::BufferStatus::BUFFER_OK;
506  }
507 
529  virtual size_t writable() const
530  {
531  Guard guard(m_posmutex);
532  return m_length - m_fillcount;
533  }
534 
554  virtual bool full(void) const
555  {
556  Guard guard(m_posmutex);
557  return m_length == m_fillcount;
558  }
559 
560  //----------------------------------------------------------------------
581  virtual DataType* rptr(long int n = 0)
582  {
583  Guard guard(m_posmutex);
584  return &(m_buffer[(m_rpos + n + m_length) % m_length]);
585  }
586 
609  virtual ReturnCode advanceRptr(long int n = 1, bool unlock_enable = true)
610  {
611  bool full_ = false;
612  bool lock_ = (unlock_enable && n > 0);
613  if(lock_)
614  {
615  m_full.mutex.lock();
616  full_ = full();
617  }
618  // n > 0 :
619  // n satisfies n <= readable elements
620  // n <= m_fillcout
621  // n < 0 : -n = n'
622  // n satisfies n'<= m_length - m_fillcount
623  // n >= m_fillcount - m_length
624  {
625  Guard guard(m_posmutex);
626  if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
627  (n < 0 && n < static_cast<long int>(m_fillcount - m_length)))
628  {
629  if (lock_)
630  {
631  m_full.mutex.unlock();
632  }
633  return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
634  }
635 
636  m_rpos = (m_rpos + n + m_length) % m_length;
637  m_fillcount -= n;
638  }
639 
640  if(lock_)
641  {
642  if(full_)
643  {
644  m_full.cond.signal();
645  }
646  m_full.mutex.unlock();
647  }
648 
649  return ::RTC::BufferStatus::BUFFER_OK;
650  }
651 
676  virtual ReturnCode get(DataType& value)
677  {
678  Guard gaurd(m_posmutex);
679  value = m_buffer[m_rpos];
680  return ::RTC::BufferStatus::BUFFER_OK;
681  }
682 
683 
701  virtual DataType& get()
702  {
703  Guard gaurd(m_posmutex);
704  return m_buffer[m_rpos];
705  }
706 
707 
749  virtual ReturnCode read(DataType& value,
750  long int sec = -1, long int nsec = 0)
751  {
752  {
753  Guard gaurd(m_empty.mutex);
754 
755  if (empty())
756  {
757  bool timedread(m_timedread);
758  bool readback(m_readback);
759 
760  if (!(sec < 0)) // if second arg is set -> block mode
761  {
762  timedread = true;
763  readback = false;
764  sec = m_rtimeout.sec();
765  nsec = m_rtimeout.usec() * 1000;
766  }
767 
768  if (readback && !timedread) // "readback" mode
769  {
770  if (!(m_wcount > 0))
771  {
772  return ::RTC::BufferStatus::BUFFER_EMPTY;
773  }
774  advanceRptr(-1,false);
775  }
776  else if (!readback && !timedread) // "do_nothing" mode
777  {
778  return ::RTC::BufferStatus::BUFFER_EMPTY;
779  }
780  else if (!readback && timedread) // "block" mode
781  {
782  if (sec < 0)
783  {
784  sec = m_rtimeout.sec();
785  nsec = m_rtimeout.usec() * 1000;
786  }
787  // true: signaled, false: timeout
788  if (!m_empty.cond.wait(sec, nsec))
789  {
790  return ::RTC::BufferStatus::TIMEOUT;
791  }
792  }
793  else // unknown condition
794  {
795  return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
796  }
797  }
798  }
799 
800  get(value);
801 
802  advanceRptr(1);
803 
804  return ::RTC::BufferStatus::BUFFER_OK;
805  }
806 
831  virtual size_t readable() const
832  {
833  Guard guard(m_posmutex);
834  return m_fillcount;
835  }
836 
856  virtual bool empty(void) const
857  {
858  Guard guard(m_posmutex);
859  return m_fillcount == 0;
860  }
861 
862  private:
863  inline void initLength(const coil::Properties& prop)
864  {
865  if (!prop["length"].empty())
866  {
867  size_t n;
868  if (coil::stringTo(n, prop["length"].c_str()))
869  {
870  if (n > 0)
871  {
872  this->length(n);
873  }
874  }
875  }
876  }
877 
878  inline void initWritePolicy(const coil::Properties& prop)
879  {
880  std::string policy(prop["write.full_policy"]);
881  coil::normalize(policy);
882  if (policy == "overwrite")
883  {
884  m_overwrite = true;
885  m_timedwrite = false;
886  }
887  else if (policy == "do_nothing")
888  {
889  m_overwrite = false;
890  m_timedwrite = false;
891  }
892  else if (policy == "block")
893  {
894  m_overwrite = false;
895  m_timedwrite = true;
896 
897  double tm;
898  if (coil::stringTo(tm, prop["write.timeout"].c_str()))
899  {
900  if (!(tm < 0))
901  {
902  m_wtimeout = tm;
903  }
904  }
905  }
906  }
907 
908  inline void initReadPolicy(const coil::Properties& prop)
909  {
910  std::string policy(prop["read.empty_policy"]);
911  if (policy == "readback")
912  {
913  m_readback = true;
914  m_timedread = false;
915  }
916  else if (policy == "do_nothing")
917  {
918  m_readback = false;
919  m_timedread = false;
920  }
921  else if (policy == "block")
922  {
923  m_readback = false;
924  m_timedread = true;
925  double tm;
926  if (coil::stringTo(tm, prop["read.timeout"].c_str()))
927  {
928  m_rtimeout = tm;
929  }
930  }
931  }
932 
933  private:
941  bool m_overwrite;
942 
950  bool m_readback;
951 
959  bool m_timedwrite;
967  bool m_timedread;
968 
976  coil::TimeValue m_wtimeout;
977 
985  coil::TimeValue m_rtimeout;
986 
994  size_t m_length;
995 
1003  size_t m_wpos;
1004 
1012  size_t m_rpos;
1013 
1021  size_t m_fillcount;
1022 
1030  size_t m_wcount;
1031 
1039  std::vector<DataType> m_buffer;
1040 
1048  struct condition
1049  {
1050  condition() : cond(mutex) {}
1052  coil::Mutex mutex;
1053  };
1054 
1062  mutable coil::Mutex m_posmutex;
1063 
1071  condition m_empty;
1072 
1080  condition m_full;
1081  };
1082 }; // namespace RTC
1083 
1084 #endif // RTC_RINGBUFFER_H
std::string normalize(std::string &str)
Erase the head/tail blank and replace upper case to lower case.
RT-Component.
virtual DataType * rptr(long int n=0)
Get the buffer length.
Definition: RingBuffer.h:581
virtual ~RingBuffer(void)
Virtual destractor.
Definition: RingBuffer.h:144
long int sec() const
Get value of second time scale.
Definition: TimeValue.h:110
virtual ReturnCode read(DataType &value, long int sec=-1, long int nsec=0)
Readout data from the buffer.
Definition: RingBuffer.h:749
bool stringTo(To &val, const char *str)
Convert the given std::string to object.
Definition: stringutil.h:633
virtual ReturnCode length(size_t n)
Get the buffer length.
Definition: RingBuffer.h:242
Mutex class.
Definition: Mutex.h:40
#define RINGBUFFER_DEFAULT_LENGTH
Definition: RingBuffer.h:36
virtual void init(const coil::Properties &prop)
Set the buffer.
Definition: RingBuffer.h:187
virtual DataType * wptr(long int n=0)
Get the buffer length.
Definition: RingBuffer.h:305
TimeValue class.
Definition: TimeValue.h:40
virtual ReturnCode reset()
Get the buffer length.
Definition: RingBuffer.h:272
RingBuffer(long int length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
Definition: RingBuffer.h:118
virtual bool full(void) const
Check on whether the buffer is full.
Definition: RingBuffer.h:554
virtual ReturnCode advanceRptr(long int n=1, bool unlock_enable=true)
Get the buffer length.
Definition: RingBuffer.h:609
#define BUFFERSTATUS_ENUM
Importing RTC::BufferStatus macro.
Definition: BufferStatus.h:157
BUFFERSTATUS_ENUM typedef coil::Guard< coil::Mutex > Guard
Definition: RingBuffer.h:94
Buffer status enum definition.
Guard template class.
Ring buffer implementation class.
Definition: RingBuffer.h:89
Buffer abstract class.
virtual ReturnCode write(const DataType &value, long int sec=-1, long int nsec=0)
Write data into the buffer.
Definition: RingBuffer.h:454
virtual size_t readable() const
Write data into the buffer.
Definition: RingBuffer.h:831
Class represents a set of properties.
Definition: Properties.h:101
virtual ReturnCode advanceWptr(long int n=1, bool unlock_enable=true)
Get the buffer length.
Definition: RingBuffer.h:335
virtual ReturnCode put(const DataType &value)
Write data into the buffer.
Definition: RingBuffer.h:406
virtual size_t length(void) const
Get the buffer length.
Definition: RingBuffer.h:214
virtual bool empty(void) const
Check on whether the buffer is empty.
Definition: RingBuffer.h:856
long int usec() const
Get value of micro second time scale.
Definition: TimeValue.h:131
BufferBase abstract class.
Definition: BufferBase.h:104
virtual size_t writable() const
Write data into the buffer.
Definition: RingBuffer.h:529