LEGO Mindstorm NXT (以下NXT) は3つのモーターと4種類のセンサー、これらを制御するためのインテリジェントブロックNXTから構成される、ロボットを作ることができる LEGOブロックです。 NXT は USB または Bluetooth で PC と接続して、インテリジェントブロックをプログラミングしたり、直接制御したりすることができます。 また、自作のプログラムをインテリジェントブロックにロードしてスタンドアロンで動作させることもできます。
ここでは、インテリジェントブロックを PC上で RTコンポーネント化し、他のコンポーネントからモーターを動かしたり、センサーデータを読み出したりできるようにします。 NXT を RTコンポーネント化することで、既存の RTコンポーネント化された入力デバイスで NXT を制御したり、センサーデータを読み出したりすることが簡単にできるようになります。
NXT を RTC化するにあたり、まずは NXT と PC の設定を行います。 PC と NXT は USB または Bluetooth で接続することができるますが、せっかく電池で動く NXT を紐付きでは使いたくないのでBluetoothで接続します。
ここでは、写真に示すような構成を例にとり、NXT を RTC化します。
これは、Tribot と呼ばれる構成の移動ベースの部分に、目玉のような超音波センサーを前面に取り付けただけの簡単な構成です。 移動ベースの組み立て方法は、Mindstorm NXT の箱に入っている「Start Here」と書かれた小箱の中の小冊子に詳しく記載されているので参照してください。 この小冊子で解説されている移動ベースに、いくつかの LEGOブロックを追加して、超音波センサーを取り付ければ完成です。
NXT のインテリジェントブロックには最初から Bluetooth が内蔵されています。 一方、PC に Bluetoothデバイスが内臓されていない場合は、写真のような Bluetoothデバイスを PC に取り付けることで、NXT と通信することができるようになります。
まずは、これらのデバイスを取り付けて、必要ならデバイスドライバをインストールするなどして使えるようにします。 Windows-XP などではたいていの市販の Bluetooth デバイスなら、ドライバを改めてインストールすることなくデフォルトのドライバで動作するようです。
Bluetooth が適切にインストールされていれば、コントロールパネルに Bluetooth のアイコンが現れます。 これをクリックすると図のような Bluetooth設定ダイアログが現れます。
次は、NXT と PC を接続するので、このダイアログはとりあえずそのままにしておきます。
PC と NXT を Bluetoothで接続する手順はおおよそ以下のとおりです。
NXT のインテリジェントブロックの中央のオレンジ色のボタンを押して電源を入れます。 電源が入ると「ピロリロリ♪」と音が鳴ってブロックが起動します。(音量の設定を0にしている場合には音は出ない。) 電源を入れると、図のような画面「My files」モードになります。
このような画面にならない場合は、オレンジ色のボタンのしたの四角いボタンを何度か押すことで、「My files」モードにすることができます。
「My files」モードの状態で、オレンジ色ボタンの左右にある、灰色の三角ボタンを押し「Bluetooth」モードにカーソルを合わせ、オレンジボタンを押します。
さらに、左右の三角ボタンを押し、「Search」モードにカーソルを合わせます。
この状態で、オレンジ色のボタンを押し、実際に接続先を検索します。 検索状態の画面を下に示します。
PC の Bluetooth が有効で NXT から PC が見えれば、図のように PC の名前が表れるはずです。 ここで PC の名前とは Windows における「コンピューター名」です。
近くに Bluetooth搭載 PC があれば、何台かの PC が見えるかもしれません。 三角ボタンを押して自分の PC にカーソルを合わせ、オレンジ色の確認ボタンを押します。
次に Bluetooth のチャネル選択画面になるので、そのままオレンジ色の確認ボタンを押します。 Connecting と表示された後、パスキー入力画面になるので、そのままオレンジボタンを押します。
PC側で図のようなバルーンが表示されるのでこのバルーンをクリックします。
パスキーの入力を求められるので、先ほど NXT に表示されていたパスキーを入力します。 接続が完了すると、図のようなダイアログが現れます。 他のデバイスを認識しないように「発見機能を無効にする」をチェックし「完了」を押して終了します。
Bluetooth設定ダイアログの「デバイス」タブで見ると、図のように NXT が接続されていることが確認できます。
上記リンクからWindows用 のインストーラをダウンロードします。 ダウンロードした実行ファイルを実行すればインストールは完了です。
setup.py install
c:\Python24\python setup.py install
Microsoft Windows XP [Version 5.1.2600] (C) Copyright 1985-2001 Microsoft Corp. C:\tmp\nxt_python-0.7>setup.py install running install running build running build_py ... copying build\scripts-2.4\nxt_filer -> c:\python24\Scripts copying build\scripts-2.4\nxt_push -> c:\python24\Scripts copying build\scripts-2.4\nxt_test -> c:\python24\Scripts C:\tmp\nxt_python-0.7>
NXT を ON にしてPCに接続されている状態で、example 以下のサンプルを実行し、NXT Python から NXT が制御できるかどうか確認します。
example ディレクトリーには以下のサンプルがあります。
これらのテストを実行する際には、それぞれのサンプルが使用するモーター、センサーなどが接続された状態で実行しなければなりません。
以上で、PC から Python を使用して NXT を制御する準備が整いました。
RtcTemplate などを使用して、NXT のコンポーネントの雛形を生成してコーディングに取り掛かりたいところです。 NXT Python はきれいにまとまっている Python モジュールですが、NXT Python のコードを直接 RTC の雛形に書き込むのはちょっと待ってください。
サンプルを見てもわかるように、NXT Python はロケータ、モーター、センサー等の複数のモジュールから構成されていて、モーターやセンサーへの細かな機能も制御できるため、アクセス方法も若干煩雑です。
ここは、NXT Python の複数のモジュールをまとめてひとつのインターフェースからアクセスするためのクラスを作ることにします。 これはソフトウエアパターンで言うところの Façade パターンになります。
Facade パターンあるいは Facade パターン(ファサード・パターン)とは、GoF(Gang of Four; 4人のギャングたち)によって定義された、コンピュータソフトウェアのデザインパターンの1つです。 Facade とは「窓口」を意味します。関連するクラス群を使用するための手続きを、窓口となる一つのクラスに集約することにより、プログラムの冗長性を無くすことを目的とします。(出典: フリー百科事典『ウィキペディア(Wikipedia)』Facade パターン)
こうした便利クラスを作成すると以下の利点があります。
これは、RTC を作成する際全般に言えることです。 対象とするロボットやデバイスなどを、うまくクラス化することで、デバイスの変更や、RTC 自体の変更・バージョンアップにも柔軟に対応でき、変更に強いソフトウエアを作ることができます。
決して、onExecute() などで
ioctl(xxxx, xxxx); // UNIX デバイスへの直アクセス inb(xxx); // I/O への直アクセス outb(xxx); // I/O への直アクセス
のようなプリミティブな関数呼び出しをしてはいけません。 理想的には、
onExecute(ec_id) { // 擬似コード if (m_inport.isNew()) { retval = m_myrobot.set_actuator(m_inport.read()); if (retval == fatal_error) return RTC::RTC_ERROR; } if (myrobot.get_sensor(sensor_data) == true) { m_outport.wrtie(sensor_data); } else {// リードエラーの場合の処理 if (fatal_error) return RTC::RTC_ERROR; } return RTC::RTC_OK; }
NXT Python 自体は NXTインテリジェントブロックのほとんどすべての機能を使用することができますが、それらの機能すべてを直接使用するのは煩雑になるため、利用したい機能のみをインターフェースする Facade クラスを作ります。
NXT の主な機能を列挙します。
これらすべてを一度にサポートするクラスを作るのでは Façade クラスを作る意味がなくなってしまいます。 必要な機能が出てきたらそのつど Façade クラスを更新すればよいのです。 したがって、ここで作成する Façade クラスでは、今回作成するロボットに合わせて、以下の機能のみをサポートすることにします。
関数名はおおよそのイメージです。 この考え方で作成したクラス (NXTBrick.py) を以下に示します。
#!/usr/bin/env python # @file NXTBrick.py # -*- coding:shift_jis -*- import nxt.locator from nxt.sensor import * from nxt.motor import * class NXTBrick: def __init__(self, bsock=None): コンストラクタ NXT ブロックへのコネクションを行い、モーターや、センサーオブジェクトを 作成する。モーターのエンコーダのリセットを行う。 """ if bsock: self.sock = bsock else: self.sock = nxt.locator.find_one_brick().connect() self.motors = [Motor(self.sock, PORT_A), Motor(self.sock, PORT_B), Motor(self.sock, PORT_C)] self.sensors = [TouchSensor(self.sock, PORT_1), SoundSensor(self.sock, PORT_2), LightSensor(self.sock, PORT_3), UltrasonicSensor(self.sock, PORT_4)] self.resetPosition() def close(self): """ NXT ブロックとの接続を終了する """ self.sock.close() def resetPosition(self, relative = 0): """ NXT のモーターのエンコーダをリセットする """ for m in self.motors: m.reset_position(relative) def setMotors(self, vels): """ 配列を受け取り、モーターのパワーとしてセットする。 vels の数とモーターの数が一致しない場合、両者の要素数のうち小さい方でループを回す。 """ for i, v in enumerate(vels[:min(len(vels),len(self.motors))]): self.motors[i].power = max(min(v,127),-127) self.motors[i].mode = MODE_MOTOR_ON | MODE_REGULATED self.motors[i].regulation_mode = REGULATION_MOTOR_SYNC self.motors[i].run_state = RUN_STATE_RUNNING self.motors[i].tacho_limit = 0 self.motors[i].set_output_state() def getMotors(self): """ モーターの位置(角度)を取得する """ state = [] for m in self.motors: state.append(m.get_output_state()) return state def getSensors(self): """ センサの値を取得する。得られたデータは配列で返される。 """ state = [] for s in self.sensors: state.append(s.get_sample()) return state """ テストプログラム モーターに適当な出力を与え、角度を読み表示する。 センサから値を読み込み表示する。 """ if __name__ == "__main__": import time nxt = NXTBrick() print "connected" # モーターのテスト for i in range(100): nxt.setMotors([80,-80,80]) print "Motor: " mstat = nxt.getMotors() for i, m in enumerate(mstat): print "(" , i, "): ", m time.sleep(0.1) nxt.setMotors([0,0,0]) # センサーのテスト for i in range(100): sensors = ["Touch", "Sound", "Light", "USonic"] sval = nxt.getSensors() for s in sensors: print s + ": " + sval print "" time.speel(0.1)
最後の if name == "main":から始まる部分はテストプログラムです。このモジュールを単体で動かしたときに実行されます。 まずは、このモジュールが完全に動くまでテストをすることが重要です。
以上で、NXT の Façade クラスができました。 非常に簡単なクラスですが、モーターに速度を与え、ポジションを読み、センサーの値を読むことのできるクラスができました。 はじめから何でもできるようにしようとすると、結局何をするためのクラスなのかよくわからないクラスができてしまいます。 バージョンアップはいつでもできるので、まずは最低限の機能でもちゃんと動くクラスを作ることが重要です。
では、上で実装した NXTBrick クラスを組み込むための RTC を作成します。 作成する RTC の仕様は以下のとおりです。
RtcTemplateで雛形を作成します。 雛型を作成する方法には、コマンドライン版の rtc-template を使う方法と、Eclipse版の RtcTemplate を使う方法があります。
rtc-template を実行するために、以下のようなバッチファイルを作成します。
python "C:\Program Files\OpenRTM-aist\0.4\utils\rtc-template\rtc-template.py" -bpython^ --module-name=NXTRTC --module-desc="NXT sample component"^ --module-version=0.1 --module-vendor=AIST --module-category=example^ --module-comp-type=DataFlowComponent --module-act-type=SPORADIC^ --module-max-inst=10^ --inport=vel:TimedFloatSeq^ --outport=pos:TimedFloatSeq --outport=sens:TimedFloatSeq^ --config="map:string:A,B"
rtc-template(gen.bat)の実行
> gen.bat python "C:\Program Files\OpenRTM-aist\0.4\utils\rtc-template\rtc-template.py" -bpython --module-name=NXTRTC --module-desc="NXT sample component" --module-version=0.1 --module-vendor=AIST --module-category=example --module-comp-type=DataFlowComponent --module-act-type=SPORADIC --module-max-inst=10 --inport=vel:TimedFloatSeq --outport=pos:TimedFloatSeq --outport=sens:TimedFloatSeq --config="map:string:A,B" File "NXTRTC.py" was generated. File "README.NXTRTC" was generated. File "NXTRTC.yaml" was generated.
以上のように、NXTRTC.py などのひな型ファイルができました。
ここから、NXTRTC.py に先ほど作成した NXTBrick.py の機能を組み込んでいきます。
import NXTBrick
# create NXTBrick object try: self._nxtbrick = NXTBrick.NXTBrick() except: print "NXTBrick create failed.""" return RTC.RTC_ERROR
self._nxtbrick.resetPosition()
上記をまとめたサンプルコードを以下に示す。
#!/usr/bin/env python # -*- coding:shift_jis -*- # -*- Python -*- import sys import time sys.path.append(".") # Import RTM module import OpenRTM import RTC # import NXTBrick class import NXTBrick # This module's spesification # <rtc-template block="module_spec"> nxtrtc_spec = ["implementation_id", "NXTRTC", "type_name", "NXTRTC", "description", "NXT sample component", "version", "0.1", "vendor", "AIST", "category", "example", "activity_type", "DataFlowComponent", "max_instance", "10", "language", "Python", "lang_type", "SCRIPT", "conf.default.map", "A,B", ""] # </rtc-template> class NXTRTC(OpenRTM.DataFlowComponentBase): def __init__(self, manager): OpenRTM.DataFlowComponentBase.__init__(self, manager) # DataPorts initialization # <rtc-template block="data_ports"> self._d_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._velIn = OpenRTM.InPort("vel", self._d_vel, OpenRTM.RingBuffer(8)) self.registerInPort("vel",self._velIn) self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._posOut = OpenRTM.OutPort("pos", self._d_pos, OpenRTM.RingBuffer(8)) self.registerOutPort("pos",self._posOut) self._d_sens = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._sensOut = OpenRTM.OutPort("sens", self._d_sens, OpenRTM.RingBuffer(8)) self.registerOutPort("sens",self._sensOut) # initialize of configuration-data. # <rtc-template block="configurations"> self._map = [['A', 'B']] self._nxtbrick = None self._mapping = {'A':0,'B':1,'C':2} def onInitialize(self): # Bind variables and configuration variable # <rtc-template block="bind_config"> self.bindParameter("map", self._map, "A,B") # create NXTBrick object try: print "Connecting to NXT brick ...." self._nxtbrick = NXTBrick.NXTBrick() print "Connection established." except: print "NXTBrick connection failed." return RTC.RTC_ERROR return RTC.RTC_OK def onFinalize(self): self._nxtbrick.close() def onActivated(self, ec_id): # reset NXTBrick's position. self._nxtbrick.resetPosition() return RTC.RTC_OK def onDeactivated(self, ec_id): # reset NXTBrick's position. self._nxtbrick.resetPosition() return RTC.RTC_OK def onExecute(self, ec_id): cnt = 0 # check new data. if self._velIn.isNew(): # read velocity data from inport. self._d_vel = self._velIn.read() vel_ = [0,0,0] vel_[self._mapping[self._map[0][0]]] = self._d_vel.data[0] vel_[self._mapping[self._map[0][1]]] = self._d_vel.data[1] # set velocity self._nxtbrick.setMotors(vel_) # get sensor data. sensor_ = self._nxtbrick.getSensors() if sensor_: self._d_sens.data = sensor_ self._sensOut.write() # get position data. position_ = self._nxtbrick.getMotors() if position_: self._d_pos.data = [position_[self._mapping[self._map[0][0]]][9], position_[self._mapping[self._map[0][1]]][9]] # write position data to outport. self._posOut.write() return RTC.RTC_OK def MyModuleInit(manager): profile = OpenRTM.Properties(defaults_str=nxtrtc_spec) manager.registerFactory(profile, NXTRTC, OpenRTM.Delete) # Create a component comp = manager.createComponent("NXTRTC") def main(): mgr = OpenRTM.Manager.init(len(sys.argv), sys.argv) #mgr = OpenRTM.Manager.init(sys.argv) mgr.setModuleInitProc(MyModuleInit) mgr.activateManager() mgr.runManager() if __name__ == "__main__": main()
このサンプルでは上記のサンプルにコールバック OnWrite を追加し、InPort のバッファへデータが書き込まれた際にモーターへの速度出力を行うように拡張します。
# @class CallBackClass # @brief callback class # # when data is written in the buffer of InPort, # it is called. class CallBackClass: def __init__(self, nxtbrick_, map_): self._nxtbrick = nxtbrick_ self._map = map_ self._mapping = {'A':0,'B':1,'C':2} def __call__(self, pData): vel_ = [0,0,0] vel_[self._mapping[self._map[0][0]]] = pData.data[0] vel_[self._mapping[self._map[0][1]]] = pData.data[1] # set velocity self._nxtbrick.setMotors(vel_)
このクラスでは、NTXBrick クラスのオブジェクトとコンフィギュレーション変数 map を引数にとります。
# set callback class self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map))
setOnWrite() により、InPort へデータが書き込まれた際に、CallBackClassのcall() メソッドが呼ばれるようになります。
上記をまとめたサンプルコードを以下に示す。
#!/usr/bin/env python # -*- coding:shift_jis -*- # -*- Python -*- import sys import time sys.path.append(".") # Import RTM module import OpenRTM import RTC # import NXTBrick class import NXTBrick # This module's spesification # <rtc-template block="module_spec"> nxtrtc_spec = ["implementation_id", "NXTRTC", "type_name", "NXTRTC", "description", "NXT sample component", "version", "0.1", "vendor", "AIST", "category", "example", "activity_type", "DataFlowComponent", "max_instance", "10", "language", "Python", "lang_type", "SCRIPT", "conf.default.map", "A,B", ""] # </rtc-template> # @class CallBackClass # @brief callback class # # when data is written in the buffer of InPort, # it is called. class CallBackClass: def __init__(self, nxtbrick_, map_): self._nxtbrick = nxtbrick_ self._map = map_ self._mapping = {'A':0,'B':1,'C':2} def __call__(self, pData): vel_ = [0,0,0] vel_[self._mapping[self._map[0][0]]] = pData.data[0] vel_[self._mapping[self._map[0][1]]] = pData.data[1] # set velocity self._nxtbrick.setMotors(vel_) class NXTRTC(OpenRTM.DataFlowComponentBase): def __init__(self, manager): OpenRTM.DataFlowComponentBase.__init__(self, manager) # DataPorts initialization # <rtc-template block="data_ports"> self._d_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._velIn = OpenRTM.InPort("vel", self._d_vel, OpenRTM.RingBuffer(8)) self.registerInPort("vel",self._velIn) self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._posOut = OpenRTM.OutPort("pos", self._d_pos, OpenRTM.RingBuffer(8)) self.registerOutPort("pos",self._posOut) self._d_sens = RTC.TimedFloatSeq(RTC.Time(0,0),[]) self._sensOut = OpenRTM.OutPort("sens", self._d_sens, OpenRTM.RingBuffer(8)) self.registerOutPort("sens",self._sensOut) # initialize of configuration-data. # <rtc-template block="configurations"> self._map = [['A', 'B']] self._nxtbrick = None self._mapping = {'A':0,'B':1,'C':2} def onInitialize(self): # Bind variables and configuration variable # <rtc-template block="bind_config"> self.bindParameter("map", self._map, "A,B") # create NXTBrick object try: print "Connecting to NXT brick ...." self._nxtbrick = NXTBrick.NXTBrick() print "Connection established." except: print "NXTBrick connection failed." return RTC.RTC_ERROR # set callback class self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map)) return RTC.RTC_OK def onFinalize(self): self._nxtbrick.close() def onActivated(self, ec_id): # reset NXTBrick's position. self._nxtbrick.resetPosition() return RTC.RTC_OK def onDeactivated(self, ec_id): # reset NXTBrick's position. self._nxtbrick.resetPosition() return RTC.RTC_OK def onExecute(self, ec_id): # get sensor data. sensor_ = self._nxtbrick.getSensors() if sensor_: self._d_sens.data = [sensor_[3]] # write sensor data to outport. self._sensOut.write() # get position data. position_ = self._nxtbrick.getMotors() if position_: self._d_pos.data = [position_[self._mapping[self._map[0][0]]][9],position_[self._mapping[self._map[0][1]]][9]] # write position data to outport. self._posOut.write() return RTC.RTC_OK def MyModuleInit(manager): profile = OpenRTM.Properties(defaults_str=nxtrtc_spec) manager.registerFactory(profile, NXTRTC, OpenRTM.Delete) # Create a component comp = manager.createComponent("NXTRTC") def main(): mgr = OpenRTM.Manager.init(len(sys.argv), sys.argv) #mgr = OpenRTM.Manager.init(sys.argv) mgr.setModuleInitProc(MyModuleInit) mgr.activateManager() mgr.runManager() if __name__ == "__main__": main()
はじめの例では、モーターへの出力、センサー値の読み込み、モーターエンコーダの読み込みをすべて同一ループで同期的に行っていましたが、コールバックを使用することで、データが来たらすぐにモーターへ値を出力できるようになります。
では、作成した NXTRTC を実際に動作させてみます。
ネームサーバーを起動します。 Windows用の OpenRTM-aist(C++版) をインストールしている場合は、スタートメニューの「OpenRTM-aist」>「C++」>「tools」>「Start Naming Service」から起動できます。
rtc.conf を作成します。
corba.nameservers: localhost naming.formats: %n.rtc manager.shutdown_auto: NO
corna.nameservers の項目は使用したいネームサーバーのアドレスに合わせて設定します。 ここでは、ローカルのネームサーバーを使用しますので、localhostとします。
起動したいコンポーネントが入っているそれぞれのディレクトリーにコピーしておきます。 今回作成した NXTRTC のほかにRTSystemEditor を起動します。 RTSystemEditor を起動後、ネームサーバー(ここでは localhost)に接続します。 また、SystemDiagram エディタを開くために、ツールバーの SystemDiagram アイコンをクリックします。
TkJoyStickComp は GUI上でジョイスティックを模擬するためのコンポーネントです。 図に示すような GUI があらわれ、中央の丸をドラッグすることでジョイスティックのように、X-Yの値を OutPort(上) から出力するコンポーネントです。 TkJoyStickComp にはもうひとつ OutPort(下) が付いていて、こちらは作動駆動型移動ロボットを操作するのに便利な左右の車輪の速度を出力するためのデータポートです。 NXTRTC の入力ポートに直接接続すれば、Tribot を制御することができます。
TkMotorComp は GUI上で車輪に見立てた丸いアイコンが、InPort へ入力されたデータ(角度)に従って回転する、車輪の回転の様子をモニタリングするためのコンポーネントです。 NXTRTC の車輪の角度を出力する OutPort に接続することで、NXT の車輪の回転の様子を見ることができます。 NXT の車輪を手で回しても、その様子をモニタすることができます。
TkSliderMonitorComp は、InPort へ入力されたデータの値をGUIのスライダで表示するためのコンポーネントです。 NXTRTC のセンサー出力ポートに接続することで、NXT のセンサーデータのモニタリングをすることができます。
すべてのコンポーネントが起動したら、コンポーネントを接続します。 NameService ビューから SystemDiagram エディタへ各コンポーネントをドラッグアンドドロップします。 接続したいポートからポートへドラッグアンドドロップすることでポートを接続することができます。 下に、幾つかのコンポーネントを接続した例を示します。
NXTに超音波センサーが取り付けられていない場合、NXTRTC の Activate 後にエラー状態(赤色)になります。
これらのコンポーネントを使って、NXTRTC が正しく動いているかどうかを確認しましょう。