[openrtm-commit:01759] r660 - in trunk/OpenRTM-aist-Python/OpenRTM_aist: . RTM_IDL

openrtm @ openrtm.org openrtm @ openrtm.org
2016年 2月 25日 (木) 23:22:46 JST


Author: miyamoto
Date: 2016-02-25 23:22:46 +0900 (Thu, 25 Feb 2016)
New Revision: 660

Added:
   trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl
   trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py
Modified:
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py
Log:
[incompat,new_func,new_file,->RELENG_1_2] add SharedMemory.idl and SharedMemory.py. refs #3410

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py	2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,10 @@
 from omniORB import CORBA
 import OpenRTM_aist
 import OpenRTM
-import mmap, os
-from omniORB import cdrMarshal
-import CORBA
+import OpenRTM__POA
 
+import threading
+
 ##
 # @if jp
 #
@@ -62,8 +62,14 @@
     OpenRTM_aist.InPortCorbaCdrConsumer.__init__(self)
     self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("InPortSHMConsumer")
     self._properties = None
-    self._shmem = None
-    self.shm_address = ""
+    
+    self._shm_address = str(OpenRTM_aist.uuid1())
+    
+    self._shmem = OpenRTM_aist.SharedMemory()
+    
+
+    self._mutex = threading.RLock()
+      
     return
 
   ##
@@ -88,8 +94,8 @@
   def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
     self._rtcout.RTC_PARANOID("~InPortSHMConsumer()")
     CorbaConsumer.__del__(self)
-    if self._shmem:
-      self._shmem.close()
+    self._shmem.close(True)
+    
     return
 
   ##
@@ -115,10 +121,13 @@
     self._properties = prop
     
     
-    self.shm_address = prop.getProperty("shared_memory.address")
-    if self.shm_address:
-      if self._shmem is None:
-        self._shmem = mmap.mmap(0, 256, self.shm_address, mmap.ACCESS_WRITE)
+    
+    
+    ds = prop.getProperty("shem_default_size")
+    self._memory_size = self._shmem.string_to_MemorySize(ds)
+
+
+    
     return
 
   ##
@@ -127,9 +136,9 @@
   #
   # 接続先のポートへデータを送信する
   # 
+  # データのサイズは共有メモリも先頭8byteから取得する
+  # データは共有メモリに書き込む
   #
-  # CORBAでデータサイズだけ送信して、データは共有メモリに書き込む
-  #
   # @param self
   # @param data 送信するデータ
   # @return リターンコード
@@ -150,16 +159,20 @@
     try:
       ref_ = self.getObject()
       if ref_:
-        inportcdr = ref_._narrow(OpenRTM.InPortCdr)
-        #print dir(ref_)
-        if self._shmem is not None:
-          self._shmem.seek(os.SEEK_SET)
-          
-          self._shmem.write(data)
-          data_size = len(data)
-          mar_data_size = cdrMarshal(CORBA.TC_ushort, data_size)
-          
-          return self.convertReturnCode(inportcdr.put(mar_data_size))
+        inportcdr = ref_._narrow(OpenRTM__POA.SharedMemory)
+        
+        guard = OpenRTM_aist.ScopedLock(self._mutex)
+        
+
+        self._shmem.setInterface(inportcdr)
+        if self._shmem._shmem is None:
+          self._shmem.create_memory(self._memory_size, self._shm_address)
+        self._shmem.write(data)
+        
+        
+        ret = inportcdr.put()
+        del guard
+        return self.convertReturnCode(ret)
       return self.CONNECTION_LOST
     except:
       self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py	2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,8 @@
 
 import OpenRTM_aist
 import OpenRTM__POA,OpenRTM
-import mmap
-from omniORB import cdrUnmarshal
-import CORBA
 
+
 ##
 # @if jp
 # @class InPortSHMProvider
@@ -34,7 +32,7 @@
 #
 # @endif
 #
-class InPortSHMProvider(OpenRTM_aist.InPortCorbaCdrProvider):
+class InPortSHMProvider(OpenRTM_aist.InPortProvider, OpenRTM_aist.SharedMemory):
     
   """
   """
@@ -58,10 +56,12 @@
   # @endif
   #
   def __init__(self):
-    OpenRTM_aist.InPortCorbaCdrProvider.__init__(self)
+    OpenRTM_aist.InPortProvider.__init__(self)
+    OpenRTM_aist.SharedMemory.__init__(self)
 
     # PortProfile setting
     self.setInterfaceType("shared_memory")
+    self._objref = self._this()
     
     
     
@@ -69,11 +69,17 @@
 
     self._profile = None
     self._listeners = None
-    self._shmem = None
+
+    orb = OpenRTM_aist.Manager.instance().getORB()
+    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",
+                                                      orb.object_to_string(self._objref)))
+    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
+                                                      self._objref))
     
-    self.shm_address = str(OpenRTM_aist.uuid1())
-    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.shared_memory.address",self.shm_address))
+
     
+    
+    
 
     return
 
@@ -98,22 +104,32 @@
     oid = OpenRTM_aist.Manager.instance().getPOA.servant_to_id(self)
     OpenRTM_aist.Manager.instance().getPOA.deactivate_object(oid)
     
+      
+      
     return
 
   
   # void init(coil::Properties& prop)
   def init(self, prop):
-    
-    #print prop
+          
     pass
 
+  def setBuffer(self, buffer):
+    self._buffer = buffer
+    return
 
+  def setListener(self, info, listeners):
+    self._profile = info
+    self._listeners = listeners
+    return
 
+
   ##
   # @if jp
   # @brief バッファにデータを書き込む
   #
-  # CORBAでデータサイズだけ受信して、共有メモリからデータを取り出しバッファに書き込む
+  # データのサイズは共有メモリも先頭8byteから取得する
+  # 共有メモリからデータを取り出しバッファに書き込む
   #
   # @param data 書込対象データ
   #
@@ -128,31 +144,30 @@
   #
   # ::OpenRTM::PortStatus put(const ::OpenRTM::CdrData& data)
   #  throw (CORBA::SystemException);
-  def put(self, data):
-    #print self._connector.profile().properties
-    #self._connector.profile().properties.setProperty("dataport.dataflow_type","push")
+  def put(self):
+    
     try:
       self._rtcout.RTC_PARANOID("InPortCorbaCdrProvider.put()")
             
+      
+      
+
+      
+      shm_data = self.read()
+
       if not self._buffer:
-        self.onReceiverError(data)
+        self.onReceiverError(shm_data)
         return OpenRTM.PORT_ERROR
 
-      self._rtcout.RTC_PARANOID("received data size: %d", len(data))
+      self._rtcout.RTC_PARANOID("received data size: %d", len(shm_data))
 
-      self.onReceived(data)
+      self.onReceived(shm_data)
 
       if not self._connector:
         return OpenRTM.PORT_ERROR
 
-      data_size = cdrUnmarshal(CORBA.TC_ushort, data)
 
-      #if self._shmem is None:
-      self._shmem = mmap.mmap(0, data_size, self.shm_address, mmap.ACCESS_READ)
-      #shm_data = cdrUnmarshal(any.to_any(self._connector._dataType).typecode(), self._shmem.read(16),self._connector._endian)
-      shm_data = self._shmem.read(data_size)
-      #print dir(self._connector._provider._this())
-      #print self._connector._provider._this()
+
       ret = self._connector.write(shm_data)
       
 
@@ -167,7 +182,53 @@
       
       
     return self.convertReturn(ret, data)
+      
+      
+  def onBufferWrite(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
+    return
 
+  def onBufferFull(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
+    return
+
+  def onBufferWriteTimeout(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
+    return
+
+  def onBufferWriteOverwrite(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self._profile, data)
+    return
+
+  def onReceived(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
+    return
+
+  def onReceiverFull(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
+    return
+
+  def onReceiverTimeout(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
+    return
+
+  def onReceiverError(self, data):
+    if self._listeners is not None and self._profile is not None:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
+    return
+
+      
+
+      
+      
+
   
 
 def InPortSHMProviderInit():

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py	2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,9 @@
 from omniORB import any
 import OpenRTM_aist
 import OpenRTM
+import OpenRTM__POA
 
-import mmap
-from omniORB import cdrUnmarshal
-import CORBA
+import threading
 
 ##
 # @if jp
@@ -57,7 +56,11 @@
     OpenRTM_aist.OutPortCorbaCdrConsumer.__init__(self)
     self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("OutPortSHMConsumer")
 
-    self._shmem = None
+    self._shmem = OpenRTM_aist.SharedMemory()
+      
+    self._mutex = threading.RLock()
+    self._outportcdr = None
+
     return
 
   ##
@@ -76,8 +79,8 @@
   def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
     self._rtcout.RTC_PARANOID("~OutPortSHMConsumer()")
     CorbaConsumer.__del__(self)
+    self._outportcdr.close_memory(True)
     
-    pass
 
 
   ##
@@ -100,8 +103,7 @@
     self._rtcout.RTC_TRACE("init()")
     
     self._properties = prop
-    self.shm_address = prop.getProperty("shared_memory.address")
-    
+        
     return
 
 
@@ -114,7 +116,8 @@
   #
   # 設定されたデータを読み出す。
   #
-  # CORBAでデータサイズだけ送受信して、データは共有メモリから読み込む
+  # データのサイズは共有メモリも先頭8byteから取得する
+  # データは共有メモリから読み込む
   #
   # @param data 読み出したデータを受け取るオブジェクト
   #
@@ -134,18 +137,24 @@
   # virtual ReturnCode get(cdrMemoryStream& data);
   def get(self, data):
     self._rtcout.RTC_PARANOID("get()")
-
+    
     try:
-      outportcdr = self.getObject()._narrow(OpenRTM.OutPortCdr)
-      ret,cdr_data = outportcdr.get()
+      outportcdr = self.getObject()._narrow(OpenRTM__POA.SharedMemory)
       
+      if self._outportcdr is None:
+        outportcdr.setInterface(self._shmem._this())
+        self._outportcdr = outportcdr
+
+      guard = OpenRTM_aist.ScopedLock(self._mutex)
+      ret = outportcdr.get()
+      
       if ret == OpenRTM.PORT_OK:
         self._rtcout.RTC_DEBUG("get() successful")
 
-        data_size = cdrUnmarshal(CORBA.TC_ushort, cdr_data)
-        self._shmem = mmap.mmap(0, data_size, self.shm_address, mmap.ACCESS_READ)
-        shm_data = self._shmem.read(data_size)
         
+        
+        shm_data = self._shmem.read()
+        
 
         data[0] = shm_data
         self.onReceived(data[0])
@@ -155,7 +164,7 @@
           self._rtcout.RTC_INFO("InPort buffer is full.")
           self.onBufferFull(data[0])
           self.onReceiverFull(data[0])
-          
+        
         self._buffer.put(data[0])
         self._buffer.advanceWptr()
         self._buffer.advanceRptr()

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py	2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -15,10 +15,8 @@
 
 import OpenRTM_aist
 import OpenRTM__POA,OpenRTM
-import mmap, os
-from omniORB import cdrMarshal
-import CORBA
 
+
 ##
 # @if jp
 # @class OutPortSHMProvider
@@ -37,7 +35,7 @@
 #
 # @endif
 #
-class OutPortSHMProvider(OpenRTM_aist.OutPortCorbaCdrProvider):
+class OutPortSHMProvider(OpenRTM_aist.OutPortProvider,OpenRTM_aist.SharedMemory):
   ##
   # @if jp
   # @brief コンストラクタ
@@ -53,16 +51,31 @@
   # @endif
   #
   def __init__(self):
-    OpenRTM_aist.OutPortCorbaCdrProvider.__init__(self)
+    OpenRTM_aist.OutPortProvider.__init__(self)
+    OpenRTM_aist.SharedMemory.__init__(self)
     self.setInterfaceType("shared_memory")
+    
+    self._objref = self._this()
+    self._buffer = None
+    orb = OpenRTM_aist.Manager.instance().getORB()
+    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.outport_ior",
+                                                      orb.object_to_string(self._objref)))
+    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.outport_ref",
+                                                      self._objref))
+    self._listeners = None
+    self._connector = None
+    self._profile   = None
+    
 
-    self.shm_address = str(OpenRTM_aist.uuid1())
+    self._shm_address = str(OpenRTM_aist.uuid1())
     
-    self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.shared_memory.address",self.shm_address))
+    
 
-    self._shmem = mmap.mmap(0, 256, self.shm_address, mmap.ACCESS_WRITE)
+    
 
     
+    
+    
 
     return
 
@@ -84,17 +97,34 @@
     oid = self._default_POA().servant_to_id(self)
     self._default_POA().deactivate_object(oid)
     
-    self._shmem.close()
+      
     return
 
 
   
   # virtual void init(coil::Properties& prop);
   def init(self, prop):
-    pass
 
+    ds = prop.getProperty("shem_default_size")
+    self._memory_size = self.string_to_MemorySize(ds)
 
+    
+  def setBuffer(self, buffer):
+    self._buffer = buffer
+    return
   
+  def setListener(self, info, listeners):
+    self._profile = info
+    self._listeners = listeners
+    return
+  
+  def setConnector(self, connector):
+    self._connector = connector
+    return
+
+  
+    
+  
   ##
   # @if jp
   # @brief バッファからデータを取得する
@@ -112,41 +142,109 @@
   # virtual ::OpenRTM::PortStatus get(::OpenRTM::CdrData_out data);
   def get(self):
     self._rtcout.RTC_PARANOID("OutPortSHMProvider.get()")
+    
     if not self._buffer:
       self.onSenderError()
-      return (OpenRTM.UNKNOWN_ERROR, None)
+      return OpenRTM.UNKNOWN_ERROR
 
     try:
       if self._buffer.empty():
         self._rtcout.RTC_ERROR("buffer is empty.")
-        return (OpenRTM.BUFFER_EMPTY, None)
+        return OpenRTM.BUFFER_EMPTY
 
       cdr = [None]
       ret = self._buffer.read(cdr)
-
+      
       if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
-        if not cdr:
+        if not cdr[0]:
           self._rtcout.RTC_ERROR("buffer is empty.")
-          return (OpenRTM.BUFFER_EMPTY, None)
+          return OpenRTM.BUFFER_EMPTY
       
     except:
       self._rtcout.RTC_TRACE(OpenRTM_aist.Logger.print_exception())
-      return (OpenRTM.UNKNOWN_ERROR, None)
+      return OpenRTM.UNKNOWN_ERROR
 
-    self._shmem.seek(os.SEEK_SET)
-    self._shmem.write(cdr[0])
+    if self._shmem is None:
+      self.create_memory(self._memory_size, self._shm_address)
+    self.write(cdr[0])
     
+    return self.convertReturn(ret, cdr[0])
+
     
-    data_size = len(cdr[0])
-    mar_data_size = cdrMarshal(CORBA.TC_ushort, data_size)
+  def onBufferRead(self, data):
+    if self._listeners and self._profile:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
+    return
+
+  def onSend(self, data):
+    if self._listeners and self._profile:
+      self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
+    return    
     
     
+
+  def onBufferEmpty(self):
+    if self._listeners and self._profile:
+      self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
+    return
+  
+  def onBufferReadTimeout(self):
+    if self._listeners and self._profile:
+      self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT].notify(self._profile)
+    return
+
+  def onSenderEmpty(self):
+    if self._listeners and self._profile:
+      self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self._profile)
+    return
+
+  def onSenderTimeout(self):
+    if self._listeners and self._profile:
+      self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT].notify(self._profile)
+    return
+
+  def onSenderError(self):
+    if self._listeners and self._profile:
+      self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
+    return
+
+  def convertReturn(self, status, data):
+    if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
+      self.onBufferRead(data)
+      self.onSend(data)
+      return OpenRTM.PORT_OK
     
-    return self.convertReturn(ret, mar_data_size)
+    elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
+      self.onSenderError()
+      return OpenRTM.PORT_ERROR
     
-  
+    elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
+      # never come here
+      return OpenRTM.BUFFER_FULL
 
+    elif status == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
+      self.onBufferEmpty()
+      self.onSenderEmpty()
+      return OpenRTM.BUFFER_EMPTY
 
+    elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
+      self.onSenderError()
+      return OpenRTM.PORT_ERROR
+    
+    elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
+      self.onBufferReadTimeout()
+      self.onSenderTimeout()
+      return OpenRTM.BUFFER_TIMEOUT
+    
+    else:
+      return OpenRTM.UNKNOWN_ERROR
+    
+    self.onSenderError()
+    return OpenRTM.UNKNOWN_ERROR
+
+
+
+
 def OutPortSHMProviderInit():
   factory = OpenRTM_aist.OutPortProviderFactory.instance()
   factory.addFactory("shared_memory",

Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl	                        (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl	2016-02-25 14:22:46 UTC (rev 660)
@@ -0,0 +1,36 @@
+// -*- IDL -*-
+/*!
+ * @file SharedMemory.idl
+ * @brief Shared Memory TransPort interface definition
+ * @date $Date: 2016-02-25  $
+ * @author Nobuhiko Miyamoto
+ *
+ *
+ */
+
+#ifndef SharedMemory_idl
+#define SharedMemory_idl
+
+module OpenRTM
+{
+  enum PortStatus
+  {
+    PORT_OK,
+    PORT_ERROR,
+    BUFFER_FULL,
+    BUFFER_EMPTY,
+    BUFFER_TIMEOUT,
+    UNKNOWN_ERROR
+  };
+
+  interface SharedMemory
+  {
+    void open_memory(in long memory_size, in string shm_address);
+    void create_memory(in long memory_size, in string shm_address);
+    void close_memory(in boolean unlink);
+    void setInterface(in SharedMemory sm);
+    PortStatus put();
+    PortStatus get();
+  };
+};
+#endif

Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py	                        (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -0,0 +1,435 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file SharedMemory.py
+# @brief SharedMemory class
+# @date $Date$
+# @author Nobuhiko Miyamoto
+#
+
+import sys
+import mmap, os
+import platform, ctypes
+from omniORB import cdrMarshal
+from omniORB import cdrUnmarshal
+import CORBA
+import OpenRTM_aist
+import OpenRTM__POA
+
+
+
+
+##
+# @if jp
+#
+# @class SharedMemory
+#
+# @brief SharedMemory クラス
+#
+# 共有メモリ操作クラス
+# CORBAによる通信により、mmapの初期化、終了などがリモートに操作できる
+#
+#
+# @else
+# @class SharedMemory
+#
+# @brief SharedMemory class
+#
+#
+#
+# @endif
+#
+class SharedMemory(OpenRTM__POA.SharedMemory):
+  default_size = 8
+  default_memory_size = 2*1024*1024
+
+
+  ##
+  # @if jp
+  # @brief コンストラクタ
+  #
+  # コンストラクタ
+  #
+  # @param self
+  #
+  # @else
+  # @brief Constructor
+  #
+  # Constructor
+  #
+  # @param self
+  #
+  # @endif
+  #
+  def __init__(self):
+    self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("SharedMemory")
+    self._shmem = None
+    self._smInterface = None
+    self._shm_address = ""
+    self._memory_size = SharedMemory.default_memory_size
+    if platform.system() == "Windows":
+      pass
+    else:
+      try:
+        self.rt = ctypes.CDLL('librt.so')
+      except:
+        self.rt = ctypes.CDLL('librt.so.1')
+      self.rt.shm_open.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_int]
+      self.rt.shm_open.restype = ctypes.c_int
+      self.rt.ftruncate.argtypes = [ctypes.c_int, ctypes.c_int]
+      self.rt.ftruncate.restype = ctypes.c_int
+      self.rt.close.argtypes = [ctypes.c_int]
+      self.rt.close.restype = ctypes.c_int
+      self.rt.shm_unlink.argtypes = [ctypes.c_char_p]
+      self.rt.shm_unlink.restype = ctypes.c_int
+
+      self.fd = -1
+    return
+
+
+
+  ##
+  # @if jp
+  # @brief デストラクタ
+  #
+  # デストラクタ
+  #
+  # @param self
+  #
+  # @else
+  # @brief Destructor
+  #
+  # Destructor
+  #
+  # @param self
+  # @endif
+  #
+  def __del__(self):
+    self._rtcout.RTC_PARANOID("~SharedMemory()")
+    return
+
+  
+  ##
+  # @if jp
+  # @brief 文字列で指定したデータサイズを数値に変換する
+  # 1M → 1048576
+  # 1k → 1024
+  # 100 → 100
+  #
+  #
+  # @param self
+  # @param size_str データサイズ(文字列)
+  # @return データサイズ(数値)
+  #
+  # @else
+  # @brief 
+  #
+  # @param self
+  # @param size_str 
+  # @return 
+  #
+  # @endif
+  #
+  # int string_to_MemorySize(string size_str);
+  def string_to_MemorySize(self, size_str):
+    memory_size = SharedMemory.default_memory_size
+    if size_str:
+      if size_str[-1] == "M":
+        memory_size = 1024 * 1024 * int(size_str[0:-1])
+      elif size_str[-1] == "k":
+        memory_size = 1024 * int(size_str[0:-1])
+      else:
+        memory_size = int(size_str[0:-1])
+    return memory_size
+
+
+
+  ##
+  # @if jp
+  # @brief 共有メモリの初期化
+  # windowsではページングファイル上に領域を確保する
+  # Linuxでは/dev/shm以下にファイルを作成する
+  # 作成したファイルの内容を仮想アドレスにマッピングする
+  # 
+  #
+  #
+  # @param self
+  # @param memory_size 共有メモリのサイズ
+  # @parama shm_address 空間名
+  #
+  # @else
+  # @brief 
+  #
+  # @param memory_size 
+  # @parama shm_address
+  #
+  # @endif
+  #
+  # void create_memory(int memory_size, string shm_address);
+  def create_memory(self, memory_size, shm_address):
+    
+    if self._shmem is None:
+      self._rtcout.RTC_TRACE("create():memory_size="+str(memory_size)+",shm_address="+str(shm_address))
+      self._memory_size = memory_size
+      self._shm_address = shm_address
+
+      if platform.system() == "Windows":
+        self._shmem = mmap.mmap(0, self._memory_size, self._shm_address, mmap.ACCESS_WRITE)
+      else:
+        O_RDWR = 2
+        O_CREAT = 64
+
+        S_IRUSR = 256
+        S_IWUSR = 128
+        S_IRGRP = 32
+        S_IWGRP = 16
+        S_IROTH = 4
+
+        self.fd = self.rt.shm_open(self._shm_address,O_RDWR | O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)
+        if self.fd < 0:
+          return self.UNKNOWN_ERROR
+        self.rt.ftruncate(self.fd, self._memory_size)
+        self._shmem = mmap.mmap(self.fd, self._memory_size, mmap.MAP_SHARED)
+        self.rt.close( self.fd )
+
+      
+      if self._smInterface:
+        self._smInterface.open_memory(self._memory_size, self._shm_address)
+
+
+
+  ##
+  # @if jp
+  # @brief 共有メモリのマッピングを行う
+  # 
+  #
+  #
+  # @param self
+  # @param memory_size 共有メモリのサイズ
+  # @parama shm_address 空間名
+  #
+  # @else
+  # @brief 
+  #
+  # @param memory_size 
+  # @parama shm_address
+  #
+  # @endif
+  #
+  # void open_memory(int memory_size, string shm_address);
+  def open_memory(self, memory_size, shm_address):
+    self._rtcout.RTC_TRACE("open():memory_size="+str(memory_size)+",shm_address="+str(shm_address))
+    self._memory_size = memory_size
+    self._shm_address = shm_address
+    if self._shmem is None:
+      if platform.system() == "Windows":
+        self._shmem = mmap.mmap(0, self._memory_size, self._shm_address, mmap.ACCESS_READ)
+      else:
+        O_RDWR = 2
+        self.fd = self.rt.shm_open(self._shm_address,O_RDWR,0)
+        if self.fd < 0:
+          return self.UNKNOWN_ERROR
+        self.rt.ftruncate(self.fd, self._memory_size)
+        self._shmem = mmap.mmap(self.fd, self._memory_size, mmap.MAP_SHARED)
+        self.rt.close( self.fd )
+    
+
+
+  ##
+  # @if jp
+  # @brief マッピングした共有メモリをアンマップする
+  # 
+  #
+  #
+  # @param self
+  # @param unlink Linuxで/dev/shm以下に作成したファイルを削除する場合にTrueにする
+  #
+  # @else
+  # @brief 
+  #
+  # @param self
+  # @param unlink
+  #
+  # @endif
+  #
+  # void close_memory(int memory_size, string shm_address);
+  def close_memory(self, unlink):
+    self.close(unlink)
+
+  ##
+  # @if jp
+  # @brief マッピングした共有メモリをアンマップする
+  # 
+  #
+  #
+  # @param self
+  # @param unlink Linuxで/dev/shm以下に作成したファイルを削除する場合にTrueにする
+  #
+  # @else
+  # @brief
+  #
+  # @param self
+  # @param unlink
+  #
+  #
+  # @endif
+  #
+  # void close(int memory_size, string shm_address);
+  def close(self, unlink=False):
+    self._rtcout.RTC_TRACE("open()")
+    if self._shmem:
+      self._shmem.close()
+      if platform.system() == "Windows":
+        pass
+      else:
+        if unlink:
+           self.rt.shm_unlink(self._shm_address)
+      self._shmem = None
+
+      if self._smInterface:
+        self._smInterface.close_memory(False)
+
+
+  
+  ##
+  # @if jp
+  # @brief データを書き込む
+  # 先頭8byteにデータサイズを書き込み、その後ろにデータを書き込む
+  # 設定したデータサイズが共有メモリのサイズを上回った場合、共有メモリの初期化を行う
+  # 
+  #
+  #
+  # @param self
+  # @param data 書き込むデータ
+  #
+  # @else
+  # @brief
+  #
+  # @param self
+  # @param data 
+  #
+  #
+  # @endif
+  #
+  # void write(cdrMemoryStream& data);
+  def write(self, data):
+    self._rtcout.RTC_TRACE("write()")
+    
+    if self._shmem:
+      data_size = len(data)
+
+      
+      if data_size + SharedMemory.default_size > self._memory_size:
+        self._memory_size = data_size + SharedMemory.default_size
+
+        if self._smInterface:
+          self._smInterface.close_memory(False)
+
+
+        self.close_memory(True)
+        self.create_memory(self._memory_size, self._shm_address)
+
+        
+        
+      data_size_cdr = cdrMarshal(CORBA.TC_ulong, data_size)
+      
+      self._shmem.seek(os.SEEK_SET)
+      self._shmem.write(data_size_cdr)
+      self._shmem.write(data)
+
+
+  ##
+  # @if jp
+  # @brief データを読み込む
+  # 
+  #
+  #
+  # @param self
+  # @return データ
+  #
+  # @else
+  # @brief 
+  #
+  # @param self
+  # @return
+  #
+  # @endif
+  #
+  # void read(::OpenRTM::CdrData_out data);
+  def read(self):
+    self._rtcout.RTC_TRACE("read()")
+    if self._shmem:
+      
+      self._shmem.seek(os.SEEK_SET)
+      
+      data_size_cdr = self._shmem.read(SharedMemory.default_size)
+      data_size = cdrUnmarshal(CORBA.TC_ulong, data_size_cdr)
+      
+      
+      
+      shm_data = self._shmem.read(data_size)
+      
+      return shm_data
+    return ""
+
+
+
+  ##
+  # @if jp
+  # @brief 通信先のCORBAインターフェースを登録する
+  # 登録する事により共有メモリの初期化したときに、通信先でもマッピングをやり直すことができる
+  # 
+  #
+  #
+  # @param self
+  # @param sm SharedMemoryのオブジェクトリファレンス
+  #
+  # @else
+  # @brief 
+  #
+  #
+  # @endif
+  #
+  # void close(int memory_size, string shm_address);
+  def setInterface(self, sm):
+    self._smInterface = sm
+    
+
+  ##
+  # @if jp
+  # @brief データの送信を知らせる
+  # 
+  #
+  #
+  # @param self
+  #
+  # @else
+  # @brief 
+  #
+  # @param self
+  #
+  # @endif
+  #
+  # PortStatus put();
+  def put(self):
+    return OpenRTM.UNKNOWN_ERROR
+
+  ##
+  # @if jp
+  # @brief データの送信を要求する
+  # 
+  #
+  #
+  # @param self
+  #
+  # @else
+  # @brief 
+  #
+  # @param self
+  #
+  # @endif
+  #
+  # PortStatus get();
+  def get(self):
+    return OpenRTM.UNKNOWN_ERROR
\ No newline at end of file

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py	2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py	2016-02-25 14:22:46 UTC (rev 660)
@@ -103,6 +103,7 @@
 from InPortDirectProvider import *
 from OutPortDirectConsumer import *
 from OutPortDirectProvider import *
+from SharedMemory import *
 from InPortSHMConsumer import *
 from InPortSHMProvider import *
 from OutPortSHMConsumer import *



More information about the openrtm-commit mailing list