操作
機能 #3406
完了⑦ トピックベースのポート接続機能
開始日:
2015/12/22
期日:
2016/03/25
進捗率:
100%
予定工数:
30.00時間
説明
DDSのようなトピックベースのポート接続機能を実装すること。ただし、この機能はデータポート、サービスポートにも利用できるように実装すること。
ファイル
操作
100%
説明
DDSのようなトピックベースのポート接続機能を実装すること。ただし、この機能はデータポート、サービスポートにも利用できるように実装すること。
ファイル
test_Manager_new.py (7.38 KB) test_Manager_new.py | miyamoto, 2016/01/14 20:45 |
Managerクラスに以下の関数を追加した。
publishPorts関数
指定のRTCが保持するポートをネーミングサービスへバインドする
subscribePorts関数
指定のRTCが保持するポートを同じトピック名のコンテキストの下にバインドしてあるポートオブジェクトと接続する
getPortsOnNameServers関数
与えられたパス以下の指定のkindのポートオブジェクトのリストを取得する
connectDataPorts関数
対象のデータポートと指定したリスト内のデータポートを接続する
connectServicePorts関数
対象のサービスポートと指定したリスト内のサービスポートを接続する
NamingManagerクラスに以下の関数、クラスを追加した。
bindPortObject関数
ポートを指定したパスでネーミングサービスへバインドする
registerPortName関数
対象のポートと名前をポート管理用オブジェクト(Port)に格納し、管理用オブジェクトをリストに追加する
unregisterPortName関数
指定の名前のポートをリストから削除する
Portクラス
ポート管理用オブジェクト
C++版でNamesがNameServerに変更されていたため、こちらでも変更を行った。
またコンストラクタ、unbindAll関数にもポートの管理用リストに関するコードを追加した。
NamingOnCorbaクラスに以下の関数を追加した。
bindPortObject
ポートを指定したパスでネーミングサービスへバインドする
CorbaNamingクラスに以下の関数を追加した。
listByKind関数
与えられたパス以下で指定のkindのオブジェクトを検索する
listBinding関数
与えられたパス以下のオブジェクトを取得する
ManagerクラスのregisterComponent関数内で登録したRTCのポートをpublishPorts関数でネームサービスにバインドして、subscribePorts関数で既にバインド済みのポートと接続する。
ポートがバインドされるパスはトピック名をtopic_test、ポート名をport0とした場合、以下のようになる。
インポートdataports.port_cxt/topic_test.topic_cxt/port0.inport
アウトポートdataports.port_cxt/topic_test.topic_cxt/port0.outport
サービスポートsvcports.port_cxt/topic_test.topic_cxt/port0.svc
トピック名はポートのpublish_topicというプロパティに設定する。
添付のテスト用コードでテストを行った。
テスト用コードは以下の動作を行う。
setUp関数
マネージャ初期化self.manager = OpenRTM_aist.Manager.init(sys.argv)
self.manager.setModuleInitProc(MyModuleInit)
self.manager.activateManager()
ここでTestComp10、TestComp20の2つのRTCを起動する。
TestComp10はアウトポートtopic_outとサービスポートtopic_serviceを保持しており、TestComp20はインポートtopic_inとサービスポートtopic_serviceを保持している。
つまり互いのデータポートとサービスポートが接続されていれば正常に動作しているという事になる。
test_Topic関数
データポートが接続されたかの確認ans = OpenRTM_aist.already_connected(inport, outport)
self.assertTrue(ans)
サービスポートが接続されたかの確認ans = OpenRTM_aist.already_connected(provided, required)
self.assertTrue(ans)