def init(self, prop):
if not prop.propertyNames():
return
filename = prop.getProperty("testif.filename", "test.dat")
def polling():
self._running = True
lastid = 0
while self._running:
if self._connector:
if os.path.exists(filename):
with open(filename, 'rb') as fin:
try:
id = struct.unpack("L", fin.read(struct.calcsize("L")))[0]
if id != lastid:
lastid = id
size = struct.unpack("L", fin.read(struct.calcsize("L")))[0]
if size > 0:
data = fin.read(size)
self._connector.write(data)
except BaseException:
pass
self._thread = threading.Thread(target=polling)
self._thread.start()
def get(self):
with open(self._filename_in, 'wb') as fout:
self._dataid += 1
try:
print(self._dataid)
fout.write(struct.pack('L', self._dataid))
except BaseException:
return self.PORT_ERROR
for i in range(100):
if os.path.exists(self._filename_out):
with open(self._filename_out, 'rb') as fin:
try:
id = struct.unpack("L", fin.read(struct.calcsize("L")))[0]
if id == self._dataid:
self._dataid = id
size = struct.unpack("L", fin.read(struct.calcsize("L")))[0]
if size > 0:
data = fin.read(size)
return self.PORT_OK, data
except BaseException:
pass
return self.PORT_ERROR, ""
def init(self, prop):
if not prop.propertyNames():
return
filename_in = prop.getProperty("testif.filename_in", "test_in.dat")
filename_out = prop.getProperty("testif.filename_out", "test_out.dat")
def polling():
self._running = True
lastid = 0
while self._running:
if self._connector:
if os.path.exists(filename_in):
with open(filename_in, 'rb') as fin:
try:
id = struct.unpack("L", fin.read(struct.calcsize("L")))[0]
if id == lastid:
continue
except BaseException:
continue
with open(filename_out, 'wb') as fout:
try:
fout.write(struct.pack('L', id))
ret, data = self._connector.read()
fout.write(struct.pack('L', len(data)))
if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
fout.write(data)
lastid = id
except BaseException:
pass
このページではPythonで独自インターフェースを作成する手順を説明します。
以下のソースコードのサンプルでは重要でない部分は省略しているため、詳細なソースコードは以下から取得してください。
Push型通信
Push型通信実装のためにTestInPortConsumer(InPortConsumer)、TestInPortProvider(InPortProvider)を実装します。
InPortConsumerの実装
まず以下のようなPythonファイル(TestInPortConsumer.py)を用意します。
今回はここにファイルの読み書きでデータを転送する独自インターフェースを実装します。
まずコネクタの初期化時にinit関数が呼ばれます。 変数propにはRTSystemEditor等で設定したコネクタの接続情報が格納されています。 以下の例ではpropからtestif.filenameのパラメータを取得してファイル名に設定しています。 init関数は複数回呼ばれる可能性があるので、その点は注意する必要があります。
データの書き込み時にはput関数が呼ばれます。 変数dataはbytes型のシリアライズしたデータが格納されています。
この例ではファイルにデータのID、データサイズ、バイト列データを書き込んでいます。 変数dataのgetDataLength関数でデータサイズ、getBuffer関数でバイト列データを取得して、取得したデータを何らかの方法でInPortProviderへ送信します。
その他の関数は基本的に実装の必要はありませんが、InPortProvider側で追加の情報を設定する場合はsubscribeInterface関数でその情報の取得をする必要があります。 unsubscribeInterfaceはコネクタ切断時に呼ばれるので、subscribeInterface関数での処理に関して何らかの終了処理が必要な場合は記述します。
InPortProviderの実装
まず以下のようなPythonファイル(TestInPortProvider.py)を用意します。
ここに処理を追加していきます。 今回の例では、init関数で指定ファイルが変更されているかをポーリングするスレッドを作成しています。
ファイルからデータを取得後に、m_connectorのwrite関数を呼び出してデータをInPortConnectorに渡しています。 InPortConsumerのデータをファイルに書き込んで、InPortProviderでファイルからデータを読み込むというデータ転送を実装できました。
Pull型通信
Pull型通信実装のためにTestInPortConsumer(OutPortConsumer)、TestInPortProvider(OutPortProvider)を実装します。 ただし、Push型通信のみを実装する場合はここは飛ばしてください。
InPortConsumerの実装
まず以下のようなPythonファイル(TestOutPortConsumer.py)を用意します。
今回の例ではinit関数で読み書きするファイル名を指定します。
ここに処理を追加していきます。 Pull型通信ではデータ読み込み時にget関数を呼びますが、以下の例ではファイルAに呼び出しのIDを書き込みます。 次にファイルBからデータサイズとバイト列データを読み込んで値を返しています。
OutPortProviderの実装
まず以下のようなPythonファイル(TestOutPortProvider.py)を用意します。
ここに処理を追加していきます。 以下の例ではinit関数でファイルが変更されたかをポーリングして、変更時に別のファイルにデータを書き込むスレッドを起動しています。
まずm_connectorのread関数を呼んでOutPortConnectorから転送するデータを取得します。 getDataLength関数でデータサイズを取得、getBuffer関数でバイト列データを取得してファイルに書き込んでいます。
これにより、OutPortConsumerでファイルAにデータのIDを書き込み後にOutPortProviderでファイルAからIDを読み込んで前回読み込んだデータのIDと一致しているかを判定します。 新しいデータだと判定したらファイルBにデータを書き込んで、OutPortConsumerでファイルBが新しいデータかを判定してOutPortConnectorにデータを渡します。
独自インターフェースの登録
ここまでに実装した独自インターフェースを使用可能にするためファクトリに登録します。 以下の内容のTestIF.pyを作成してください。
TestInPortProviderInit、TestInPortConsumerInit、TestOutPortProviderInit、TestOutPortConsumerInit関数ではInPortProviderFactory、InPortConsumerFactory、OutPortProviderFactory、OutPortConsumerFactoryのaddFactory関数で実装した独自インターフェースを登録しています。 testifという名前をポート接続時に指定することで使用できます。 TestIFInit関数はPythonモジュールをロードする時に呼び出す関数です。〇〇.pyであれば〇〇Initというように、初期化関数はPythonファイルの名前+Initにしてください。 Push型のみ、もしくはPull型通信のみの実装の場合は、必要なモジュールだけを登録してください。
動作確認
以下のrtc.confを作成してください。 ${TestIF_DIR}にはTestIF.pyのフォルダのパスを指定してください。
作成したrtc.confを指定してConsoleIn、ConsoleOutのサンプルコンポーネントを起動します。これでOpenRTM-aistがTestIF.pyをロードします。
RTSystemEditorでデータポートを接続しようとすると、以下のようにInterface Typeでtestifが選択可能になっています。
Interface Typeにtestifを選択して接続すると、実装したファイル読み書きによるデータ転送ができることが確認できます。
Pull型通信を動作確認する場合について、Pull型通信ではInPortのisNew関数が使えず新規のデータが存在するかは確認できません。 このため、ConsoleOutコンポーネントのisNew関数を実行している部分をコメントアウトする必要があります。
接続時のオプションを設定
今回の例ではtestif.filename、testif.filename_in、testif.filename_outというオプションで読み書きするファイル名を指定できるようにしましたが、これらをデータポートのプロファイルからオプションの情報を取得するように設定を追加できます。ただし、リリース版のOpenRTM-aist 2.0では使えない場合があるので、OpenRTM-aistをソースコードからビルドしてください。