[openrtm-commit:01759] r660 - in trunk/OpenRTM-aist-Python/OpenRTM_aist: . RTM_IDL
openrtm @ openrtm.org
openrtm @ openrtm.org
2016年 2月 25日 (木) 23:22:46 JST
Author: miyamoto
Date: 2016-02-25 23:22:46 +0900 (Thu, 25 Feb 2016)
New Revision: 660
Added:
trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl
trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py
Modified:
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py
Log:
[incompat,new_func,new_file,->RELENG_1_2] add SharedMemory.idl and SharedMemory.py. refs #3410
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py 2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMConsumer.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,10 @@
from omniORB import CORBA
import OpenRTM_aist
import OpenRTM
-import mmap, os
-from omniORB import cdrMarshal
-import CORBA
+import OpenRTM__POA
+import threading
+
##
# @if jp
#
@@ -62,8 +62,14 @@
OpenRTM_aist.InPortCorbaCdrConsumer.__init__(self)
self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("InPortSHMConsumer")
self._properties = None
- self._shmem = None
- self.shm_address = ""
+
+ self._shm_address = str(OpenRTM_aist.uuid1())
+
+ self._shmem = OpenRTM_aist.SharedMemory()
+
+
+ self._mutex = threading.RLock()
+
return
##
@@ -88,8 +94,8 @@
def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
self._rtcout.RTC_PARANOID("~InPortSHMConsumer()")
CorbaConsumer.__del__(self)
- if self._shmem:
- self._shmem.close()
+ self._shmem.close(True)
+
return
##
@@ -115,10 +121,13 @@
self._properties = prop
- self.shm_address = prop.getProperty("shared_memory.address")
- if self.shm_address:
- if self._shmem is None:
- self._shmem = mmap.mmap(0, 256, self.shm_address, mmap.ACCESS_WRITE)
+
+
+ ds = prop.getProperty("shem_default_size")
+ self._memory_size = self._shmem.string_to_MemorySize(ds)
+
+
+
return
##
@@ -127,9 +136,9 @@
#
# 接続先のポートへデータを送信する
#
+ # データのサイズは共有メモリも先頭8byteから取得する
+ # データは共有メモリに書き込む
#
- # CORBAでデータサイズだけ送信して、データは共有メモリに書き込む
- #
# @param self
# @param data 送信するデータ
# @return リターンコード
@@ -150,16 +159,20 @@
try:
ref_ = self.getObject()
if ref_:
- inportcdr = ref_._narrow(OpenRTM.InPortCdr)
- #print dir(ref_)
- if self._shmem is not None:
- self._shmem.seek(os.SEEK_SET)
-
- self._shmem.write(data)
- data_size = len(data)
- mar_data_size = cdrMarshal(CORBA.TC_ushort, data_size)
-
- return self.convertReturnCode(inportcdr.put(mar_data_size))
+ inportcdr = ref_._narrow(OpenRTM__POA.SharedMemory)
+
+ guard = OpenRTM_aist.ScopedLock(self._mutex)
+
+
+ self._shmem.setInterface(inportcdr)
+ if self._shmem._shmem is None:
+ self._shmem.create_memory(self._memory_size, self._shm_address)
+ self._shmem.write(data)
+
+
+ ret = inportcdr.put()
+ del guard
+ return self.convertReturnCode(ret)
return self.CONNECTION_LOST
except:
self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py 2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortSHMProvider.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,8 @@
import OpenRTM_aist
import OpenRTM__POA,OpenRTM
-import mmap
-from omniORB import cdrUnmarshal
-import CORBA
+
##
# @if jp
# @class InPortSHMProvider
@@ -34,7 +32,7 @@
#
# @endif
#
-class InPortSHMProvider(OpenRTM_aist.InPortCorbaCdrProvider):
+class InPortSHMProvider(OpenRTM_aist.InPortProvider, OpenRTM_aist.SharedMemory):
"""
"""
@@ -58,10 +56,12 @@
# @endif
#
def __init__(self):
- OpenRTM_aist.InPortCorbaCdrProvider.__init__(self)
+ OpenRTM_aist.InPortProvider.__init__(self)
+ OpenRTM_aist.SharedMemory.__init__(self)
# PortProfile setting
self.setInterfaceType("shared_memory")
+ self._objref = self._this()
@@ -69,11 +69,17 @@
self._profile = None
self._listeners = None
- self._shmem = None
+
+ orb = OpenRTM_aist.Manager.instance().getORB()
+ self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ior",
+ orb.object_to_string(self._objref)))
+ self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.inport_ref",
+ self._objref))
- self.shm_address = str(OpenRTM_aist.uuid1())
- self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.shared_memory.address",self.shm_address))
+
+
+
return
@@ -98,22 +104,32 @@
oid = OpenRTM_aist.Manager.instance().getPOA.servant_to_id(self)
OpenRTM_aist.Manager.instance().getPOA.deactivate_object(oid)
+
+
return
# void init(coil::Properties& prop)
def init(self, prop):
-
- #print prop
+
pass
+ def setBuffer(self, buffer):
+ self._buffer = buffer
+ return
+ def setListener(self, info, listeners):
+ self._profile = info
+ self._listeners = listeners
+ return
+
##
# @if jp
# @brief バッファにデータを書き込む
#
- # CORBAでデータサイズだけ受信して、共有メモリからデータを取り出しバッファに書き込む
+ # データのサイズは共有メモリも先頭8byteから取得する
+ # 共有メモリからデータを取り出しバッファに書き込む
#
# @param data 書込対象データ
#
@@ -128,31 +144,30 @@
#
# ::OpenRTM::PortStatus put(const ::OpenRTM::CdrData& data)
# throw (CORBA::SystemException);
- def put(self, data):
- #print self._connector.profile().properties
- #self._connector.profile().properties.setProperty("dataport.dataflow_type","push")
+ def put(self):
+
try:
self._rtcout.RTC_PARANOID("InPortCorbaCdrProvider.put()")
+
+
+
+
+ shm_data = self.read()
+
if not self._buffer:
- self.onReceiverError(data)
+ self.onReceiverError(shm_data)
return OpenRTM.PORT_ERROR
- self._rtcout.RTC_PARANOID("received data size: %d", len(data))
+ self._rtcout.RTC_PARANOID("received data size: %d", len(shm_data))
- self.onReceived(data)
+ self.onReceived(shm_data)
if not self._connector:
return OpenRTM.PORT_ERROR
- data_size = cdrUnmarshal(CORBA.TC_ushort, data)
- #if self._shmem is None:
- self._shmem = mmap.mmap(0, data_size, self.shm_address, mmap.ACCESS_READ)
- #shm_data = cdrUnmarshal(any.to_any(self._connector._dataType).typecode(), self._shmem.read(16),self._connector._endian)
- shm_data = self._shmem.read(data_size)
- #print dir(self._connector._provider._this())
- #print self._connector._provider._this()
+
ret = self._connector.write(shm_data)
@@ -167,7 +182,53 @@
return self.convertReturn(ret, data)
+
+
+ def onBufferWrite(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
+ return
+ def onBufferFull(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
+ return
+
+ def onBufferWriteTimeout(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
+ return
+
+ def onBufferWriteOverwrite(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self._profile, data)
+ return
+
+ def onReceived(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
+ return
+
+ def onReceiverFull(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
+ return
+
+ def onReceiverTimeout(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
+ return
+
+ def onReceiverError(self, data):
+ if self._listeners is not None and self._profile is not None:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
+ return
+
+
+
+
+
+
def InPortSHMProviderInit():
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py 2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMConsumer.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -14,10 +14,9 @@
from omniORB import any
import OpenRTM_aist
import OpenRTM
+import OpenRTM__POA
-import mmap
-from omniORB import cdrUnmarshal
-import CORBA
+import threading
##
# @if jp
@@ -57,7 +56,11 @@
OpenRTM_aist.OutPortCorbaCdrConsumer.__init__(self)
self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("OutPortSHMConsumer")
- self._shmem = None
+ self._shmem = OpenRTM_aist.SharedMemory()
+
+ self._mutex = threading.RLock()
+ self._outportcdr = None
+
return
##
@@ -76,8 +79,8 @@
def __del__(self, CorbaConsumer=OpenRTM_aist.CorbaConsumer):
self._rtcout.RTC_PARANOID("~OutPortSHMConsumer()")
CorbaConsumer.__del__(self)
+ self._outportcdr.close_memory(True)
- pass
##
@@ -100,8 +103,7 @@
self._rtcout.RTC_TRACE("init()")
self._properties = prop
- self.shm_address = prop.getProperty("shared_memory.address")
-
+
return
@@ -114,7 +116,8 @@
#
# 設定されたデータを読み出す。
#
- # CORBAでデータサイズだけ送受信して、データは共有メモリから読み込む
+ # データのサイズは共有メモリも先頭8byteから取得する
+ # データは共有メモリから読み込む
#
# @param data 読み出したデータを受け取るオブジェクト
#
@@ -134,18 +137,24 @@
# virtual ReturnCode get(cdrMemoryStream& data);
def get(self, data):
self._rtcout.RTC_PARANOID("get()")
-
+
try:
- outportcdr = self.getObject()._narrow(OpenRTM.OutPortCdr)
- ret,cdr_data = outportcdr.get()
+ outportcdr = self.getObject()._narrow(OpenRTM__POA.SharedMemory)
+ if self._outportcdr is None:
+ outportcdr.setInterface(self._shmem._this())
+ self._outportcdr = outportcdr
+
+ guard = OpenRTM_aist.ScopedLock(self._mutex)
+ ret = outportcdr.get()
+
if ret == OpenRTM.PORT_OK:
self._rtcout.RTC_DEBUG("get() successful")
- data_size = cdrUnmarshal(CORBA.TC_ushort, cdr_data)
- self._shmem = mmap.mmap(0, data_size, self.shm_address, mmap.ACCESS_READ)
- shm_data = self._shmem.read(data_size)
+
+ shm_data = self._shmem.read()
+
data[0] = shm_data
self.onReceived(data[0])
@@ -155,7 +164,7 @@
self._rtcout.RTC_INFO("InPort buffer is full.")
self.onBufferFull(data[0])
self.onReceiverFull(data[0])
-
+
self._buffer.put(data[0])
self._buffer.advanceWptr()
self._buffer.advanceRptr()
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py 2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -15,10 +15,8 @@
import OpenRTM_aist
import OpenRTM__POA,OpenRTM
-import mmap, os
-from omniORB import cdrMarshal
-import CORBA
+
##
# @if jp
# @class OutPortSHMProvider
@@ -37,7 +35,7 @@
#
# @endif
#
-class OutPortSHMProvider(OpenRTM_aist.OutPortCorbaCdrProvider):
+class OutPortSHMProvider(OpenRTM_aist.OutPortProvider,OpenRTM_aist.SharedMemory):
##
# @if jp
# @brief コンストラクタ
@@ -53,16 +51,31 @@
# @endif
#
def __init__(self):
- OpenRTM_aist.OutPortCorbaCdrProvider.__init__(self)
+ OpenRTM_aist.OutPortProvider.__init__(self)
+ OpenRTM_aist.SharedMemory.__init__(self)
self.setInterfaceType("shared_memory")
+
+ self._objref = self._this()
+ self._buffer = None
+ orb = OpenRTM_aist.Manager.instance().getORB()
+ self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.outport_ior",
+ orb.object_to_string(self._objref)))
+ self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.corba_cdr.outport_ref",
+ self._objref))
+ self._listeners = None
+ self._connector = None
+ self._profile = None
+
- self.shm_address = str(OpenRTM_aist.uuid1())
+ self._shm_address = str(OpenRTM_aist.uuid1())
- self._properties.append(OpenRTM_aist.NVUtil.newNV("dataport.shared_memory.address",self.shm_address))
+
- self._shmem = mmap.mmap(0, 256, self.shm_address, mmap.ACCESS_WRITE)
+
+
+
return
@@ -84,17 +97,34 @@
oid = self._default_POA().servant_to_id(self)
self._default_POA().deactivate_object(oid)
- self._shmem.close()
+
return
# virtual void init(coil::Properties& prop);
def init(self, prop):
- pass
+ ds = prop.getProperty("shem_default_size")
+ self._memory_size = self.string_to_MemorySize(ds)
+
+ def setBuffer(self, buffer):
+ self._buffer = buffer
+ return
+ def setListener(self, info, listeners):
+ self._profile = info
+ self._listeners = listeners
+ return
+
+ def setConnector(self, connector):
+ self._connector = connector
+ return
+
+
+
+
##
# @if jp
# @brief バッファからデータを取得する
@@ -112,41 +142,109 @@
# virtual ::OpenRTM::PortStatus get(::OpenRTM::CdrData_out data);
def get(self):
self._rtcout.RTC_PARANOID("OutPortSHMProvider.get()")
+
if not self._buffer:
self.onSenderError()
- return (OpenRTM.UNKNOWN_ERROR, None)
+ return OpenRTM.UNKNOWN_ERROR
try:
if self._buffer.empty():
self._rtcout.RTC_ERROR("buffer is empty.")
- return (OpenRTM.BUFFER_EMPTY, None)
+ return OpenRTM.BUFFER_EMPTY
cdr = [None]
ret = self._buffer.read(cdr)
-
+
if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
- if not cdr:
+ if not cdr[0]:
self._rtcout.RTC_ERROR("buffer is empty.")
- return (OpenRTM.BUFFER_EMPTY, None)
+ return OpenRTM.BUFFER_EMPTY
except:
self._rtcout.RTC_TRACE(OpenRTM_aist.Logger.print_exception())
- return (OpenRTM.UNKNOWN_ERROR, None)
+ return OpenRTM.UNKNOWN_ERROR
- self._shmem.seek(os.SEEK_SET)
- self._shmem.write(cdr[0])
+ if self._shmem is None:
+ self.create_memory(self._memory_size, self._shm_address)
+ self.write(cdr[0])
+ return self.convertReturn(ret, cdr[0])
+
- data_size = len(cdr[0])
- mar_data_size = cdrMarshal(CORBA.TC_ushort, data_size)
+ def onBufferRead(self, data):
+ if self._listeners and self._profile:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
+ return
+
+ def onSend(self, data):
+ if self._listeners and self._profile:
+ self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
+ return
+
+ def onBufferEmpty(self):
+ if self._listeners and self._profile:
+ self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
+ return
+
+ def onBufferReadTimeout(self):
+ if self._listeners and self._profile:
+ self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT].notify(self._profile)
+ return
+
+ def onSenderEmpty(self):
+ if self._listeners and self._profile:
+ self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self._profile)
+ return
+
+ def onSenderTimeout(self):
+ if self._listeners and self._profile:
+ self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT].notify(self._profile)
+ return
+
+ def onSenderError(self):
+ if self._listeners and self._profile:
+ self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
+ return
+
+ def convertReturn(self, status, data):
+ if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
+ self.onBufferRead(data)
+ self.onSend(data)
+ return OpenRTM.PORT_OK
- return self.convertReturn(ret, mar_data_size)
+ elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
+ self.onSenderError()
+ return OpenRTM.PORT_ERROR
-
+ elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
+ # never come here
+ return OpenRTM.BUFFER_FULL
+ elif status == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
+ self.onBufferEmpty()
+ self.onSenderEmpty()
+ return OpenRTM.BUFFER_EMPTY
+ elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
+ self.onSenderError()
+ return OpenRTM.PORT_ERROR
+
+ elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
+ self.onBufferReadTimeout()
+ self.onSenderTimeout()
+ return OpenRTM.BUFFER_TIMEOUT
+
+ else:
+ return OpenRTM.UNKNOWN_ERROR
+
+ self.onSenderError()
+ return OpenRTM.UNKNOWN_ERROR
+
+
+
+
def OutPortSHMProviderInit():
factory = OpenRTM_aist.OutPortProviderFactory.instance()
factory.addFactory("shared_memory",
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/RTM_IDL/SharedMemory.idl 2016-02-25 14:22:46 UTC (rev 660)
@@ -0,0 +1,36 @@
+// -*- IDL -*-
+/*!
+ * @file SharedMemory.idl
+ * @brief Shared Memory TransPort interface definition
+ * @date $Date: 2016-02-25 $
+ * @author Nobuhiko Miyamoto
+ *
+ *
+ */
+
+#ifndef SharedMemory_idl
+#define SharedMemory_idl
+
+module OpenRTM
+{
+ enum PortStatus
+ {
+ PORT_OK,
+ PORT_ERROR,
+ BUFFER_FULL,
+ BUFFER_EMPTY,
+ BUFFER_TIMEOUT,
+ UNKNOWN_ERROR
+ };
+
+ interface SharedMemory
+ {
+ void open_memory(in long memory_size, in string shm_address);
+ void create_memory(in long memory_size, in string shm_address);
+ void close_memory(in boolean unlink);
+ void setInterface(in SharedMemory sm);
+ PortStatus put();
+ PortStatus get();
+ };
+};
+#endif
Added: trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py (rev 0)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/SharedMemory.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -0,0 +1,435 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file SharedMemory.py
+# @brief SharedMemory class
+# @date $Date$
+# @author Nobuhiko Miyamoto
+#
+
+import sys
+import mmap, os
+import platform, ctypes
+from omniORB import cdrMarshal
+from omniORB import cdrUnmarshal
+import CORBA
+import OpenRTM_aist
+import OpenRTM__POA
+
+
+
+
+##
+# @if jp
+#
+# @class SharedMemory
+#
+# @brief SharedMemory クラス
+#
+# 共有メモリ操作クラス
+# CORBAによる通信により、mmapの初期化、終了などがリモートに操作できる
+#
+#
+# @else
+# @class SharedMemory
+#
+# @brief SharedMemory class
+#
+#
+#
+# @endif
+#
+class SharedMemory(OpenRTM__POA.SharedMemory):
+ default_size = 8
+ default_memory_size = 2*1024*1024
+
+
+ ##
+ # @if jp
+ # @brief コンストラクタ
+ #
+ # コンストラクタ
+ #
+ # @param self
+ #
+ # @else
+ # @brief Constructor
+ #
+ # Constructor
+ #
+ # @param self
+ #
+ # @endif
+ #
+ def __init__(self):
+ self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("SharedMemory")
+ self._shmem = None
+ self._smInterface = None
+ self._shm_address = ""
+ self._memory_size = SharedMemory.default_memory_size
+ if platform.system() == "Windows":
+ pass
+ else:
+ try:
+ self.rt = ctypes.CDLL('librt.so')
+ except:
+ self.rt = ctypes.CDLL('librt.so.1')
+ self.rt.shm_open.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_int]
+ self.rt.shm_open.restype = ctypes.c_int
+ self.rt.ftruncate.argtypes = [ctypes.c_int, ctypes.c_int]
+ self.rt.ftruncate.restype = ctypes.c_int
+ self.rt.close.argtypes = [ctypes.c_int]
+ self.rt.close.restype = ctypes.c_int
+ self.rt.shm_unlink.argtypes = [ctypes.c_char_p]
+ self.rt.shm_unlink.restype = ctypes.c_int
+
+ self.fd = -1
+ return
+
+
+
+ ##
+ # @if jp
+ # @brief デストラクタ
+ #
+ # デストラクタ
+ #
+ # @param self
+ #
+ # @else
+ # @brief Destructor
+ #
+ # Destructor
+ #
+ # @param self
+ # @endif
+ #
+ def __del__(self):
+ self._rtcout.RTC_PARANOID("~SharedMemory()")
+ return
+
+
+ ##
+ # @if jp
+ # @brief 文字列で指定したデータサイズを数値に変換する
+ # 1M → 1048576
+ # 1k → 1024
+ # 100 → 100
+ #
+ #
+ # @param self
+ # @param size_str データサイズ(文字列)
+ # @return データサイズ(数値)
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ # @param size_str
+ # @return
+ #
+ # @endif
+ #
+ # int string_to_MemorySize(string size_str);
+ def string_to_MemorySize(self, size_str):
+ memory_size = SharedMemory.default_memory_size
+ if size_str:
+ if size_str[-1] == "M":
+ memory_size = 1024 * 1024 * int(size_str[0:-1])
+ elif size_str[-1] == "k":
+ memory_size = 1024 * int(size_str[0:-1])
+ else:
+ memory_size = int(size_str[0:-1])
+ return memory_size
+
+
+
+ ##
+ # @if jp
+ # @brief 共有メモリの初期化
+ # windowsではページングファイル上に領域を確保する
+ # Linuxでは/dev/shm以下にファイルを作成する
+ # 作成したファイルの内容を仮想アドレスにマッピングする
+ #
+ #
+ #
+ # @param self
+ # @param memory_size 共有メモリのサイズ
+ # @parama shm_address 空間名
+ #
+ # @else
+ # @brief
+ #
+ # @param memory_size
+ # @parama shm_address
+ #
+ # @endif
+ #
+ # void create_memory(int memory_size, string shm_address);
+ def create_memory(self, memory_size, shm_address):
+
+ if self._shmem is None:
+ self._rtcout.RTC_TRACE("create():memory_size="+str(memory_size)+",shm_address="+str(shm_address))
+ self._memory_size = memory_size
+ self._shm_address = shm_address
+
+ if platform.system() == "Windows":
+ self._shmem = mmap.mmap(0, self._memory_size, self._shm_address, mmap.ACCESS_WRITE)
+ else:
+ O_RDWR = 2
+ O_CREAT = 64
+
+ S_IRUSR = 256
+ S_IWUSR = 128
+ S_IRGRP = 32
+ S_IWGRP = 16
+ S_IROTH = 4
+
+ self.fd = self.rt.shm_open(self._shm_address,O_RDWR | O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)
+ if self.fd < 0:
+ return self.UNKNOWN_ERROR
+ self.rt.ftruncate(self.fd, self._memory_size)
+ self._shmem = mmap.mmap(self.fd, self._memory_size, mmap.MAP_SHARED)
+ self.rt.close( self.fd )
+
+
+ if self._smInterface:
+ self._smInterface.open_memory(self._memory_size, self._shm_address)
+
+
+
+ ##
+ # @if jp
+ # @brief 共有メモリのマッピングを行う
+ #
+ #
+ #
+ # @param self
+ # @param memory_size 共有メモリのサイズ
+ # @parama shm_address 空間名
+ #
+ # @else
+ # @brief
+ #
+ # @param memory_size
+ # @parama shm_address
+ #
+ # @endif
+ #
+ # void open_memory(int memory_size, string shm_address);
+ def open_memory(self, memory_size, shm_address):
+ self._rtcout.RTC_TRACE("open():memory_size="+str(memory_size)+",shm_address="+str(shm_address))
+ self._memory_size = memory_size
+ self._shm_address = shm_address
+ if self._shmem is None:
+ if platform.system() == "Windows":
+ self._shmem = mmap.mmap(0, self._memory_size, self._shm_address, mmap.ACCESS_READ)
+ else:
+ O_RDWR = 2
+ self.fd = self.rt.shm_open(self._shm_address,O_RDWR,0)
+ if self.fd < 0:
+ return self.UNKNOWN_ERROR
+ self.rt.ftruncate(self.fd, self._memory_size)
+ self._shmem = mmap.mmap(self.fd, self._memory_size, mmap.MAP_SHARED)
+ self.rt.close( self.fd )
+
+
+
+ ##
+ # @if jp
+ # @brief マッピングした共有メモリをアンマップする
+ #
+ #
+ #
+ # @param self
+ # @param unlink Linuxで/dev/shm以下に作成したファイルを削除する場合にTrueにする
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ # @param unlink
+ #
+ # @endif
+ #
+ # void close_memory(int memory_size, string shm_address);
+ def close_memory(self, unlink):
+ self.close(unlink)
+
+ ##
+ # @if jp
+ # @brief マッピングした共有メモリをアンマップする
+ #
+ #
+ #
+ # @param self
+ # @param unlink Linuxで/dev/shm以下に作成したファイルを削除する場合にTrueにする
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ # @param unlink
+ #
+ #
+ # @endif
+ #
+ # void close(int memory_size, string shm_address);
+ def close(self, unlink=False):
+ self._rtcout.RTC_TRACE("open()")
+ if self._shmem:
+ self._shmem.close()
+ if platform.system() == "Windows":
+ pass
+ else:
+ if unlink:
+ self.rt.shm_unlink(self._shm_address)
+ self._shmem = None
+
+ if self._smInterface:
+ self._smInterface.close_memory(False)
+
+
+
+ ##
+ # @if jp
+ # @brief データを書き込む
+ # 先頭8byteにデータサイズを書き込み、その後ろにデータを書き込む
+ # 設定したデータサイズが共有メモリのサイズを上回った場合、共有メモリの初期化を行う
+ #
+ #
+ #
+ # @param self
+ # @param data 書き込むデータ
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ # @param data
+ #
+ #
+ # @endif
+ #
+ # void write(cdrMemoryStream& data);
+ def write(self, data):
+ self._rtcout.RTC_TRACE("write()")
+
+ if self._shmem:
+ data_size = len(data)
+
+
+ if data_size + SharedMemory.default_size > self._memory_size:
+ self._memory_size = data_size + SharedMemory.default_size
+
+ if self._smInterface:
+ self._smInterface.close_memory(False)
+
+
+ self.close_memory(True)
+ self.create_memory(self._memory_size, self._shm_address)
+
+
+
+ data_size_cdr = cdrMarshal(CORBA.TC_ulong, data_size)
+
+ self._shmem.seek(os.SEEK_SET)
+ self._shmem.write(data_size_cdr)
+ self._shmem.write(data)
+
+
+ ##
+ # @if jp
+ # @brief データを読み込む
+ #
+ #
+ #
+ # @param self
+ # @return データ
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ # @return
+ #
+ # @endif
+ #
+ # void read(::OpenRTM::CdrData_out data);
+ def read(self):
+ self._rtcout.RTC_TRACE("read()")
+ if self._shmem:
+
+ self._shmem.seek(os.SEEK_SET)
+
+ data_size_cdr = self._shmem.read(SharedMemory.default_size)
+ data_size = cdrUnmarshal(CORBA.TC_ulong, data_size_cdr)
+
+
+
+ shm_data = self._shmem.read(data_size)
+
+ return shm_data
+ return ""
+
+
+
+ ##
+ # @if jp
+ # @brief 通信先のCORBAインターフェースを登録する
+ # 登録する事により共有メモリの初期化したときに、通信先でもマッピングをやり直すことができる
+ #
+ #
+ #
+ # @param self
+ # @param sm SharedMemoryのオブジェクトリファレンス
+ #
+ # @else
+ # @brief
+ #
+ #
+ # @endif
+ #
+ # void close(int memory_size, string shm_address);
+ def setInterface(self, sm):
+ self._smInterface = sm
+
+
+ ##
+ # @if jp
+ # @brief データの送信を知らせる
+ #
+ #
+ #
+ # @param self
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ #
+ # @endif
+ #
+ # PortStatus put();
+ def put(self):
+ return OpenRTM.UNKNOWN_ERROR
+
+ ##
+ # @if jp
+ # @brief データの送信を要求する
+ #
+ #
+ #
+ # @param self
+ #
+ # @else
+ # @brief
+ #
+ # @param self
+ #
+ # @endif
+ #
+ # PortStatus get();
+ def get(self):
+ return OpenRTM.UNKNOWN_ERROR
\ No newline at end of file
Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py 2016-02-25 06:02:51 UTC (rev 659)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/__init__.py 2016-02-25 14:22:46 UTC (rev 660)
@@ -103,6 +103,7 @@
from InPortDirectProvider import *
from OutPortDirectConsumer import *
from OutPortDirectProvider import *
+from SharedMemory import *
from InPortSHMConsumer import *
from InPortSHMProvider import *
from OutPortSHMConsumer import *
More information about the openrtm-commit
mailing list