LEGO Mindstorm NXT RTコンポーネント

LEGO Mindstorm NXT (以下NXT) は3つのモーターと4種類のセンサー、これらを制御するためのインテリジェントブロックNXTから構成される、ロボットを作ることができる LEGOブロックです。 NXT は USB または Bluetooth で PC と接続して、インテリジェントブロックをプログラミングしたり、直接制御したりすることができます。 また、自作のプログラムをインテリジェントブロックにロードしてスタンドアロンで動作させることもできます。

ここでは、インテリジェントブロックを PC上で RTコンポーネント化し、他のコンポーネントからモーターを動かしたり、センサーデータを読み出したりできるようにします。 NXT を RTコンポーネント化することで、既存の RTコンポーネント化された入力デバイスで NXT を制御したり、センサーデータを読み出したりすることが簡単にできるようになります。

Mindstorm NXT 設定

NXT を RTC化するにあたり、まずは NXT と PC の設定を行います。 PC と NXT は USB または Bluetooth で接続することができるますが、せっかく電池で動く NXT を紐付きでは使いたくないのでBluetoothで接続します。

ブロックの組み立て

ここでは、写真に示すような構成を例にとり、NXT を RTC化します。

TribotBase.png

これは、Tribot と呼ばれる構成の移動ベースの部分に、目玉のような超音波センサーを前面に取り付けただけの簡単な構成です。 移動ベースの組み立て方法は、Mindstorm NXT の箱に入っている「Start Here」と書かれた小箱の中の小冊子に詳しく記載されているので参照してください。 この小冊子で解説されている移動ベースに、いくつかの LEGOブロックを追加して、超音波センサーを取り付ければ完成です。

Bluetoothデバイスのインストール

NXT のインテリジェントブロックには最初から Bluetooth が内蔵されています。 一方、PC に Bluetoothデバイスが内臓されていない場合は、写真のような Bluetoothデバイスを PC に取り付けることで、NXT と通信することができるようになります。

まずは、これらのデバイスを取り付けて、必要ならデバイスドライバをインストールするなどして使えるようにします。 Windows-XP などではたいていの市販の Bluetooth デバイスなら、ドライバを改めてインストールすることなくデフォルトのドライバで動作するようです。

BluetoothDevices.png

Bluetooth が適切にインストールされていれば、コントロールパネルに Bluetooth のアイコンが現れます。 これをクリックすると図のような Bluetooth設定ダイアログが現れます。

BthDialogOption.png

「オプション」を開き、
  • 「発見機能を有効にする」
  • 「Bluetooth アイコンを通知領域に表示する」 をチェックします。

次は、NXT と PC を接続するので、このダイアログはとりあえずそのままにしておきます。

PC と NXT の接続

PC と NXT を Bluetoothで接続する手順はおおよそ以下のとおりです。

  1. NXT の電源を入れる
  2. NXT を Bluetooth 検索モードにし検索する
  3. 自分の PC を選択
  4. チャネル選択
  5. PC の Bluetooth 設定ダイアログから接続ウィザードを起動
  6. パスキーを設定する
  7. PC と NXT 両者で接続ボタンを押す
  8. 接続後 NXT を再起動する

NXT の起動

NXT のインテリジェントブロックの中央のオレンジ色のボタンを押して電源を入れます。 電源が入ると「ピロリロリ♪」と音が鳴ってブロックが起動します。(音量の設定を0にしている場合には音は出ない。) 電源を入れると、図のような画面「My files」モードになります。

NXTBoot.png

このような画面にならない場合は、オレンジ色のボタンのしたの四角いボタンを何度か押すことで、「My files」モードにすることができます。

Bluetooth デバイスの検索

「My files」モードの状態で、オレンジ色ボタンの左右にある、灰色の三角ボタンを押し「Bluetooth」モードにカーソルを合わせ、オレンジボタンを押します。

NXTBluetooth.png

さらに、左右の三角ボタンを押し、「Search」モードにカーソルを合わせます。

NXTBthSearch.png

この状態で、オレンジ色のボタンを押し、実際に接続先を検索します。 検索状態の画面を下に示します。

NXTBthSearching.png

デバイスの接続

PC の Bluetooth が有効で NXT から PC が見えれば、図のように PC の名前が表れるはずです。 ここで PC の名前とは Windows における「コンピューター名」です。

NXTBthPCfound.png

近くに Bluetooth搭載 PC があれば、何台かの PC が見えるかもしれません。 三角ボタンを押して自分の PC にカーソルを合わせ、オレンジ色の確認ボタンを押します。

次に Bluetooth のチャネル選択画面になるので、そのままオレンジ色の確認ボタンを押します。 Connecting と表示された後、パスキー入力画面になるので、そのままオレンジボタンを押します。

NXTBthPasskey.png

PC側で図のようなバルーンが表示されるのでこのバルーンをクリックします。

PCballoon.png

パスキーの入力を求められるので、先ほど NXT に表示されていたパスキーを入力します。 接続が完了すると、図のようなダイアログが現れます。 他のデバイスを認識しないように「発見機能を無効にする」をチェックし「完了」を押して終了します。

PCConnectComp.png

接続の確認

Bluetooth設定ダイアログの「デバイス」タブで見ると、図のように NXT が接続されていることが確認できます。

PCDevlistNXT.png

NXT Python のインストール

NXT Python のインストール

NXT RTC を作成する前に、Python から Bluetooth を使用するためのモジュール PyBlues と、NXT を制御するためのモジュール NXT Python をインストールします。

PyBlues のインストール

上記リンクからWindows用 のインストーラをダウンロードします。 ダウンロードした実行ファイルを実行すればインストールは完了です。

NXT Python のインストール

上記リンクから zip ファイルをダウンロードします。 NXT Python は setup.py スクリプトを使用してインストールします。
  • .pyに関連付けがされている場合
     setup.py install
  • 関連づけされていない場合:
     c:\Python24\python setup.py install

 Microsoft Windows XP [Version 5.1.2600]
 (C) Copyright 1985-2001 Microsoft Corp.
 
 C:\tmp\nxt_python-0.7>setup.py install
 running install
 running build
 running build_py
 ...
 copying build\scripts-2.4\nxt_filer -> c:\python24\Scripts
 copying build\scripts-2.4\nxt_push -> c:\python24\Scripts
 copying build\scripts-2.4\nxt_test -> c:\python24\Scripts
 
 C:\tmp\nxt_python-0.7>

NXT Python のテスト

NXT を ON にしてPCに接続されている状態で、example 以下のサンプルを実行し、NXT Python から NXT が制御できるかどうか確認します。

example ディレクトリーには以下のサンプルがあります。

  • latency.py: センサー読出しのレイテンシを計測する。
  • mary.py: NXT のサウンド機能で「メリーさんの羊」を演奏する
  • message_test.py: インテリジェントブロックの液晶画面にメッセージを表示する
  • spin.py: PortB, PortC のモーターを動かしスピンさせる
  • test_sensors.py: 全センサーの値を表示する

これらのテストを実行する際には、それぞれのサンプルが使用するモーター、センサーなどが接続された状態で実行しなければなりません。

NXT Python RTC 化

以上で、PC から Python を使用して NXT を制御する準備が整いました。

RtcTemplate などを使用して、NXT のコンポーネントの雛形を生成してコーディングに取り掛かりたいところです。 NXT Python はきれいにまとまっている Python モジュールですが、NXT Python のコードを直接 RTC の雛形に書き込むのはちょっと待ってください。

サンプルを見てもわかるように、NXT Python はロケータ、モーター、センサー等の複数のモジュールから構成されていて、モーターやセンサーへの細かな機能も制御できるため、アクセス方法も若干煩雑です。

ここは、NXT Python の複数のモジュールをまとめてひとつのインターフェースからアクセスするためのクラスを作ることにします。 これはソフトウエアパターンで言うところの Façade パターンになります。

Facade パターンあるいは Facade パターン(ファサード・パターン)とは、GoF(Gang of Four; 4人のギャングたち)によって定義された、コンピュータソフトウェアのデザインパターンの1つです。 Facade とは「窓口」を意味します。関連するクラス群を使用するための手続きを、窓口となる一つのクラスに集約することにより、プログラムの冗長性を無くすことを目的とします。(出典: フリー百科事典『ウィキペディア(Wikipedia)』Facade パターン)

こうした便利クラスを作成すると以下の利点があります。

  • 作成したクラスを RTC 以外にも利用できる
  • デバッグが楽になる
    • Façade クラス単体でデバッグした方がデバッグしやすい
    • 直接 RTC 化すると、この例で言えば、NXT Python の使い方の問題なのか、RTC の使い方の問題なのか切り分けが難しい
  • 変更がしやすくなる
    • Façade クラスを変更しても、インターフェースを変更しなければ RTC側のコードは変える必要がない
    • たとえば、get_xxx()、set_xxx()といった関数の入出力にリミッタをつけたい場合、Façade クラス側のみの変更で済む
  • NXT 以外のデバイスへの変更が容易になる
    • たとえば、NXT の次のバージョンのブロックが出た場合、インターフェースさえ同じであれば、対応が容易になる。

これは、RTC を作成する際全般に言えることです。 対象とするロボットやデバイスなどを、うまくクラス化することで、デバイスの変更や、RTC 自体の変更・バージョンアップにも柔軟に対応でき、変更に強いソフトウエアを作ることができます。

決して、onExecute() などで

ioctl(xxxx, xxxx); // UNIX デバイスへの直アクセス inb(xxx); // I/O への直アクセス outb(xxx); // I/O への直アクセス

のようなプリミティブな関数呼び出しをしてはいけません。 理想的には、

 onExecute(ec_id)
 { // 擬似コード
      if (m_inport.isNew())
     {
        retval = m_myrobot.set_actuator(m_inport.read());
        if (retval == fatal_error) return RTC::RTC_ERROR;
     }
     if (myrobot.get_sensor(sensor_data) == true)
     {
         m_outport.wrtie(sensor_data);
     } 
     else
     {// リードエラーの場合の処理
         if (fatal_error) return RTC::RTC_ERROR;
     }
     return RTC::RTC_OK;
 }

のように、「InPortから値を読んで処理」する関数の呼び出しと、「何らかの処理により値を取得し OutPort から出力」する関数の呼び出しのみからなる、「抽象レベルの高い」コードにするべきです。

NXT Python Facade クラス

NXT Python Facade クラス

NXT Python 自体は NXTインテリジェントブロックのほとんどすべての機能を使用することができますが、それらの機能すべてを直接使用するのは煩雑になるため、利用したい機能のみをインターフェースする Facade クラスを作ります。

NXT の主な機能を列挙します。

  • 入力
    • モーター速度値を与える
    • サウンドの音程値を与える
    • 液晶表示に出力するメッセージを与える
  • 出力
    • モーターエンコーダ値を読む
    • センサー値(サウンド、超音波、タッチ、光)の値を読む
    • システム情報を読む
  • その他
    • NXT の検索
    • NXT への接続
    • NXT のファイルの読み書き
    • ファームウエアの読み書き

これらすべてを一度にサポートするクラスを作るのでは Façade クラスを作る意味がなくなってしまいます。 必要な機能が出てきたらそのつど Façade クラスを更新すればよいのです。 したがって、ここで作成する Façade クラスでは、今回作成するロボットに合わせて、以下の機能のみをサポートすることにします。

  • 入力
    • モーター速度値を与える: setMotors()
  • 出力
    • モーターエンコーダ値を読む: getMotors()
    • センサー値 (サウンド、超音波、タッチ、光) の値を読む: getSensors()

関数名はおおよそのイメージです。 この考え方で作成したクラス (NXTBrick.py) を以下に示します。

 #!/usr/bin/env python
 # @file NXTBrick.py
 # -*- coding:shift_jis -*-
 import nxt.locator
 from nxt.sensor import *
 from nxt.motor import *
 class NXTBrick:
     def __init__(self, bsock=None):
         コンストラクタ
         NXT ブロックへのコネクションを行い、モーターや、センサーオブジェクトを
         作成する。モーターのエンコーダのリセットを行う。
         """
         if bsock:
             self.sock = bsock
         else:
             self.sock = nxt.locator.find_one_brick().connect()
         self.motors = [Motor(self.sock, PORT_A),
                        Motor(self.sock, PORT_B),
                        Motor(self.sock, PORT_C)]
             
         self.sensors = [TouchSensor(self.sock, PORT_1),
                         SoundSensor(self.sock, PORT_2),
                         LightSensor(self.sock, PORT_3),
                         UltrasonicSensor(self.sock, PORT_4)]
         self.resetPosition()
 
     def close(self):
         """
         NXT ブロックとの接続を終了する
         """
         self.sock.close()
 
     def resetPosition(self, relative = 0):
         """
         NXT のモーターのエンコーダをリセットする
         """
         for m in self.motors:
             m.reset_position(relative)
 
     def setMotors(self, vels):
         """
         配列を受け取り、モーターのパワーとしてセットする。
         vels の数とモーターの数が一致しない場合、両者の要素数のうち小さい方でループを回す。
         """
         for i, v in enumerate(vels[:min(len(vels),len(self.motors))]):
             self.motors[i].power = max(min(v,127),-127)
             self.motors[i].mode = MODE_MOTOR_ON | MODE_REGULATED
             self.motors[i].regulation_mode = REGULATION_MOTOR_SYNC
             self.motors[i].run_state = RUN_STATE_RUNNING
             self.motors[i].tacho_limit = 0
             self.motors[i].set_output_state()
 
     def getMotors(self):
         """
         モーターの位置(角度)を取得する
         
         """
         state = []
         for m in self.motors:
             state.append(m.get_output_state())
         return state
 
     def getSensors(self):
         """
         センサの値を取得する。得られたデータは配列で返される。
         """
         state = []
         for s in self.sensors:
             state.append(s.get_sample())
         return state
 
 
 """
 テストプログラム
 モーターに適当な出力を与え、角度を読み表示する。
 センサから値を読み込み表示する。
 """
 if __name__ == "__main__":
     import time
     nxt = NXTBrick()
     print "connected"
     
     # モーターのテスト
     for i in range(100):
         nxt.setMotors([80,-80,80])
         print "Motor: "
         mstat = nxt.getMotors()
         for i, m in enumerate(mstat):
             print "(" , i, "): ", m
         time.sleep(0.1)
     nxt.setMotors([0,0,0])
 
     # センサーのテスト
     for i in range(100):
         sensors = ["Touch", "Sound", "Light", "USonic"]
         sval = nxt.getSensors()
         for s in sensors:
             print s + ": " + sval
             print ""
             time.speel(0.1)

最後の if name == "main":から始まる部分はテストプログラムです。このモジュールを単体で動かしたときに実行されます。 まずは、このモジュールが完全に動くまでテストをすることが重要です。

以上で、NXT の Façade クラスができました。 非常に簡単なクラスですが、モーターに速度を与え、ポジションを読み、センサーの値を読むことのできるクラスができました。 はじめから何でもできるようにしようとすると、結局何をするためのクラスなのかよくわからないクラスができてしまいます。 バージョンアップはいつでもできるので、まずは最低限の機能でもちゃんと動くクラスを作ることが重要です。

NXT RTC の実装

NXT RTC の実装

では、上で実装した NXTBrick クラスを組み込むための RTC を作成します。 作成する RTC の仕様は以下のとおりです。

  • InPort
    • モーター速度 (TimedFloatSeq)
  • Outport
    • モーター位置 (TimedFloatSeq)
    • センサーデータ (TimedFloatSeq)

NXTRTC の雛形を生成

NXTRTC の雛形を生成

RtcTemplateで雛形を作成します。 雛型を作成する方法には、コマンドライン版の rtc-template を使う方法と、Eclipse版の RtcTemplate を使う方法があります。

rtc-template を実行するために、以下のようなバッチファイルを作成します。

 python "C:\Program Files\OpenRTM-aist\0.4\utils\rtc-template\rtc-template.py" -bpython^
  --module-name=NXTRTC --module-desc="NXT sample component"^
  --module-version=0.1 --module-vendor=AIST --module-category=example^
  --module-comp-type=DataFlowComponent --module-act-type=SPORADIC^
  --module-max-inst=10^
  --inport=vel:TimedFloatSeq^
  --outport=pos:TimedFloatSeq --outport=sens:TimedFloatSeq^
  --config="map:string:A,B"

Eclipse版 RtcTemplate では下記のようになります。
  • Programing language selection: Python
  • Module definition
    • Module name: NXTRTC
    • Module decription: NXT sample component
    • Module version: 0.1
    • Module vender: AIST
    • Module category: example
    • Component type: DataFlowComponent
    • Component's activity type: SPORADIC
    • Number of maximum instance: 10
  • InPort definition
    • Ports: Name:vel Type:TimedFloatSeq
  • OutPort definition
    • Ports: Name:pos, Type:TimedFloatSeq
    • Ports: Name:sens, Type:TimedFloatSeq
  • ConfigurationSet definition
    • Cfg Sets: Name:map, Type:string, Default Value: A,B

rtc-template(gen.bat)の実行

 > gen.bat
  python "C:\Program Files\OpenRTM-aist\0.4\utils\rtc-template\rtc-template.py"
  -bpython --module-name=NXTRTC --module-desc="NXT sample component" 
  --module-version=0.1 --module-vendor=AIST --module-category=example 
  --module-comp-type=DataFlowComponent --module-act-type=SPORADIC 
  --module-max-inst=10 --inport=vel:TimedFloatSeq 
  --outport=pos:TimedFloatSeq --outport=sens:TimedFloatSeq
  --config="map:string:A,B"
 
   File "NXTRTC.py" was generated.
   File "README.NXTRTC" was generated.
   File "NXTRTC.yaml" was generated.

以上のように、NXTRTC.py などのひな型ファイルができました。

サンプルコードの説明

サンプルコードの説明

ここから、NXTRTC.py に先ほど作成した NXTBrick.py の機能を組み込んでいきます。

  • NXTBrick.py のインポート NXTBrick クラスを使用するために NXBrick.py をインポートします。 import の記述方法は、下記のように拡張子(.py)を除いたファイル名を指定します。
     import NXTBrick
  • onInitialize(self) の実装 onInitialize()で、NXTBrick クラスのインスタンス化を行います。 NXTBrick クラスのインスタンス化過程にてエラーが発生した場合、RTC_ERROR が戻り 終了状態へ遷移します。
            # create NXTBrick object
            try:
                self._nxtbrick = NXTBrick.NXTBrick()
            except:
                print "NXTBrick create failed."""
                return RTC.RTC_ERROR
  • onActivated(self, ec_id),onDeactivated(self, ec_id) の実装 onActivated(),onDeactivated()では、NXTBrickクラスのresetPosition() メソッドをコールします。
            self._nxtbrick.resetPosition() 
  • onExecute(self, ec_id) の実装 onExecute() では、下記の処理を行ってます。
    • データ InPort からの速度の読み込み。
    • Configuration 機能を用い NXT のどのポートへ速度出力を行うかを決定。
    • NXTBrick の setMotors() メソッドにてモーターへ速度を出力。
    • NXTBrick の getSensors() にて NTX の超音波センサー値を取得し、データがあれば OutPort から出力。
    • NXTBrick の getMotors() にて NTX の回転角度[deg]を取得し、データがあれば OutPort から出力。

上記をまとめたサンプルコードを以下に示す。

 #!/usr/bin/env python
 # -*- coding:shift_jis -*-
 # -*- Python -*-
 
 import sys
 import time
 sys.path.append(".")
 
 # Import RTM module
 import OpenRTM
 import RTC
  
 # import NXTBrick class
 import NXTBrick
  
 # This module's spesification
 # <rtc-template block="module_spec">
 nxtrtc_spec = ["implementation_id", "NXTRTC", 
          "type_name",         "NXTRTC", 
          "description",       "NXT sample component", 
          "version",           "0.1", 
          "vendor",            "AIST", 
          "category",          "example", 
          "activity_type",     "DataFlowComponent", 
          "max_instance",      "10", 
          "language",          "Python", 
          "lang_type",         "SCRIPT",
          "conf.default.map", "A,B",
          ""]
 
 # </rtc-template>
 
 class NXTRTC(OpenRTM.DataFlowComponentBase):
     def __init__(self, manager):
         OpenRTM.DataFlowComponentBase.__init__(self, manager)
 
         # DataPorts initialization
         # <rtc-template block="data_ports">
         self._d_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._velIn = OpenRTM.InPort("vel", self._d_vel, OpenRTM.RingBuffer(8))
         self.registerInPort("vel",self._velIn)
         self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._posOut = OpenRTM.OutPort("pos", self._d_pos, OpenRTM.RingBuffer(8))
         self.registerOutPort("pos",self._posOut)
         self._d_sens = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._sensOut = OpenRTM.OutPort("sens", self._d_sens, OpenRTM.RingBuffer(8))
         self.registerOutPort("sens",self._sensOut)
 
         # initialize of configuration-data.
         # <rtc-template block="configurations">
         self._map = [['A', 'B']]
         self._nxtbrick = None
         self._mapping = {'A':0,'B':1,'C':2}
          
     def onInitialize(self):
         # Bind variables and configuration variable
         # <rtc-template block="bind_config">
         self.bindParameter("map", self._map, "A,B")
 
         # create NXTBrick object
         try:
             print "Connecting to NXT brick ...."
             self._nxtbrick = NXTBrick.NXTBrick()
             print "Connection established."
         except:
             print "NXTBrick connection failed."
             return RTC.RTC_ERROR
 
         return RTC.RTC_OK
 
     def onFinalize(self):
         self._nxtbrick.close()
 
     def onActivated(self, ec_id):
         # reset NXTBrick's position.
         self._nxtbrick.resetPosition()
 
         return RTC.RTC_OK
 
     def onDeactivated(self, ec_id):
         # reset NXTBrick's position.
         self._nxtbrick.resetPosition()
 
         return RTC.RTC_OK
 
     def onExecute(self, ec_id):
         cnt = 0
         # check new data.
         if self._velIn.isNew():
             # read velocity data from inport.
             self._d_vel = self._velIn.read()
             vel_ = [0,0,0]
             vel_[self._mapping[self._map[0][0]]] = self._d_vel.data[0]
             vel_[self._mapping[self._map[0][1]]] = self._d_vel.data[1]
             # set velocity
             self._nxtbrick.setMotors(vel_)
 
         # get sensor data.
         sensor_   = self._nxtbrick.getSensors()
         if sensor_:
             self._d_sens.data = sensor_
             self._sensOut.write()
 
         # get position data.
         position_ = self._nxtbrick.getMotors()
         if position_:
             self._d_pos.data =                  [position_[self._mapping[self._map[0][0]]][9],                       position_[self._mapping[self._map[0][1]]][9]]
         # write position data to outport.
         self._posOut.write()
 
         return RTC.RTC_OK
 
   def MyModuleInit(manager):
     profile = OpenRTM.Properties(defaults_str=nxtrtc_spec)
     manager.registerFactory(profile,
                             NXTRTC,
                             OpenRTM.Delete)
 
     # Create a component
     comp = manager.createComponent("NXTRTC")
 
  def main():
     mgr = OpenRTM.Manager.init(len(sys.argv), sys.argv)
     #mgr = OpenRTM.Manager.init(sys.argv)
     mgr.setModuleInitProc(MyModuleInit)
     mgr.activateManager()
     mgr.runManager()
 
 if __name__ == "__main__":
     main()

サンプルコードの説明(コールバックオブジェクトの使用例)

このサンプルでは上記のサンプルにコールバック OnWrite を追加し、InPort のバッファへデータが書き込まれた際にモーターへの速度出力を行うように拡張します。

  • コールバッククラス
    下記のようにコールバッククラスを記述します。

 # @class CallBackClass
 # @brief callback class
 #
 # when data is written in the buffer of InPort,
 # it is called.
 class CallBackClass:
     def __init__(self, nxtbrick_, map_):
         self._nxtbrick = nxtbrick_
         self._map = map_
         self._mapping = {'A':0,'B':1,'C':2}
 
     def __call__(self, pData):
         vel_ = [0,0,0]
         vel_[self._mapping[self._map[0][0]]] = pData.data[0]
         vel_[self._mapping[self._map[0][1]]] = pData.data[1]
         # set velocity
         self._nxtbrick.setMotors(vel_)

このクラスでは、NTXBrick クラスのオブジェクトとコンフィギュレーション変数 map を引数にとります。

  • コールバッククラスの登録
    下記のように、setOnWrite() メソッドにてコールバックオブジェクトを登録します。

   # set callback class
   self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map))

setOnWrite() により、InPort へデータが書き込まれた際に、CallBackClassのcall() メソッドが呼ばれるようになります。

上記をまとめたサンプルコードを以下に示す。

 #!/usr/bin/env python
 # -*- coding:shift_jis -*-
 # -*- Python -*-
 
 import sys
 import time
 sys.path.append(".")
 
 # Import RTM module
 import OpenRTM
 import RTC
  
 # import NXTBrick class
 import NXTBrick
 
  # This module's spesification
 # <rtc-template block="module_spec">
 nxtrtc_spec = ["implementation_id", "NXTRTC", 
          "type_name",         "NXTRTC", 
          "description",       "NXT sample component", 
          "version",           "0.1", 
          "vendor",            "AIST", 
          "category",          "example", 
          "activity_type",     "DataFlowComponent", 
          "max_instance",      "10", 
          "language",          "Python", 
          "lang_type",         "SCRIPT",
          "conf.default.map", "A,B",
          ""]
 
 # </rtc-template>
 
 # @class CallBackClass
 # @brief callback class
 #
 # when data is written in the buffer of InPort,
 # it is called.
 class CallBackClass:
     def __init__(self, nxtbrick_, map_):
         self._nxtbrick = nxtbrick_
         self._map = map_
         self._mapping = {'A':0,'B':1,'C':2}
 
     def __call__(self, pData):
         vel_ = [0,0,0]
         vel_[self._mapping[self._map[0][0]]] = pData.data[0]
         vel_[self._mapping[self._map[0][1]]] = pData.data[1]
         # set velocity
         self._nxtbrick.setMotors(vel_)
 
  class NXTRTC(OpenRTM.DataFlowComponentBase):
     def __init__(self, manager):
         OpenRTM.DataFlowComponentBase.__init__(self, manager)
 
         # DataPorts initialization
         # <rtc-template block="data_ports">
         self._d_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._velIn = OpenRTM.InPort("vel", self._d_vel, OpenRTM.RingBuffer(8))
         self.registerInPort("vel",self._velIn)
         self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._posOut = OpenRTM.OutPort("pos", self._d_pos, OpenRTM.RingBuffer(8))
         self.registerOutPort("pos",self._posOut)
         self._d_sens = RTC.TimedFloatSeq(RTC.Time(0,0),[])
         self._sensOut = OpenRTM.OutPort("sens", self._d_sens, OpenRTM.RingBuffer(8))
         self.registerOutPort("sens",self._sensOut)
 
         # initialize of configuration-data.
         # <rtc-template block="configurations">
         self._map = [['A', 'B']]
         self._nxtbrick = None
         self._mapping = {'A':0,'B':1,'C':2}
          
     def onInitialize(self):
         # Bind variables and configuration variable
         # <rtc-template block="bind_config">
         self.bindParameter("map", self._map, "A,B")
 
         # create NXTBrick object
         try:
             print "Connecting to NXT brick ...."
             self._nxtbrick = NXTBrick.NXTBrick()
             print "Connection established."
         except:
             print "NXTBrick connection failed."
             return RTC.RTC_ERROR
 
         # set callback class
         self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map))
 
         return RTC.RTC_OK
 
     def onFinalize(self):
         self._nxtbrick.close()
 
     def onActivated(self, ec_id):
         # reset NXTBrick's position.
         self._nxtbrick.resetPosition()
 
         return RTC.RTC_OK
  
     def onDeactivated(self, ec_id):
         # reset NXTBrick's position.
         self._nxtbrick.resetPosition()
 
         return RTC.RTC_OK
  
     def onExecute(self, ec_id):
         # get sensor data.
         sensor_   = self._nxtbrick.getSensors()
         if sensor_:
             self._d_sens.data = [sensor_[3]]
             # write sensor data to outport.
             self._sensOut.write()
 
         # get position data.
         position_ = self._nxtbrick.getMotors()
         if position_:
             self._d_pos.data = [position_[self._mapping[self._map[0][0]]][9],position_[self._mapping[self._map[0][1]]][9]]
             # write position data to outport.
             self._posOut.write()
 
         return RTC.RTC_OK
  
 def MyModuleInit(manager):
     profile = OpenRTM.Properties(defaults_str=nxtrtc_spec)
     manager.registerFactory(profile,
                             NXTRTC,
                             OpenRTM.Delete)
 
     # Create a component
     comp = manager.createComponent("NXTRTC")
  
 def main():
     mgr = OpenRTM.Manager.init(len(sys.argv), sys.argv)
     #mgr = OpenRTM.Manager.init(sys.argv)
     mgr.setModuleInitProc(MyModuleInit)
     mgr.activateManager()
     mgr.runManager()
 
 if __name__ == "__main__":
     main()

はじめの例では、モーターへの出力、センサー値の読み込み、モーターエンコーダの読み込みをすべて同一ループで同期的に行っていましたが、コールバックを使用することで、データが来たらすぐにモーターへ値を出力できるようになります。

NXT RTC 動作確認

NXT RTC 動作確認

では、作成した NXTRTC を実際に動作させてみます。

ネームサーバーの起動

ネームサーバーを起動します。 Windows用の OpenRTM-aist(C++版) をインストールしている場合は、スタートメニューの「OpenRTM-aist」>「C++」>「tools」>「Start Naming Service」から起動できます。

rtc.conf の作成

rtc.conf を作成します。

 corba.nameservers: localhost
 naming.formats: %n.rtc
 manager.shutdown_auto: NO

corna.nameservers の項目は使用したいネームサーバーのアドレスに合わせて設定します。 ここでは、ローカルのネームサーバーを使用しますので、localhostとします。

起動したいコンポーネントが入っているそれぞれのディレクトリーにコピーしておきます。 今回作成した NXTRTC のほかに
  • TkJoystickComp
  • TkMotorPosComp
  • TkSliderMonitorComp を起動することにします。

RTSystemEditor の起動

RTSystemEditor を起動します。 RTSystemEditor を起動後、ネームサーバー(ここでは localhost)に接続します。 また、SystemDiagram エディタを開くために、ツールバーの SystemDiagram アイコンをクリックします。

コンポーネントの起動

以下のコンポーネントを起動します。
  • NXTRTC
  • TkJoystickComp
  • TkMotorPosComp
  • TkSliderMonitorComp 起動すると、RTSystemEditorのNameServcie ビュー上にコンポーネントが表示されます。

TkJoystickComp

TkJoyStickComp は GUI上でジョイスティックを模擬するためのコンポーネントです。 図に示すような GUI があらわれ、中央の丸をドラッグすることでジョイスティックのように、X-Yの値を OutPort(上) から出力するコンポーネントです。 TkJoyStickComp にはもうひとつ OutPort(下) が付いていて、こちらは作動駆動型移動ロボットを操作するのに便利な左右の車輪の速度を出力するためのデータポートです。 NXTRTC の入力ポートに直接接続すれば、Tribot を制御することができます。

TkJoystick.png

TkMotorComp

TkMotorComp は GUI上で車輪に見立てた丸いアイコンが、InPort へ入力されたデータ(角度)に従って回転する、車輪の回転の様子をモニタリングするためのコンポーネントです。 NXTRTC の車輪の角度を出力する OutPort に接続することで、NXT の車輪の回転の様子を見ることができます。 NXT の車輪を手で回しても、その様子をモニタすることができます。

TkMotor.png

TkSliderMonitorComp

TkSliderMonitorComp は、InPort へ入力されたデータの値をGUIのスライダで表示するためのコンポーネントです。 NXTRTC のセンサー出力ポートに接続することで、NXT のセンサーデータのモニタリングをすることができます。

コンポーネントの接続

すべてのコンポーネントが起動したら、コンポーネントを接続します。 NameService ビューから SystemDiagram エディタへ各コンポーネントをドラッグアンドドロップします。 接続したいポートからポートへドラッグアンドドロップすることでポートを接続することができます。 下に、幾つかのコンポーネントを接続した例を示します。

NXTに超音波センサーが取り付けられていない場合、NXTRTC の Activate 後にエラー状態(赤色)になります。

RtcLink.png

これらのコンポーネントを使って、NXTRTC が正しく動いているかどうかを確認しましょう。