OpenRTM-aistを用いたリアルタイムシステムの構築

OpenRTM-aistを用いたリアルタイムシステムの構築

産業技術総合研究所
知能システム研究部門
金広文男

はじめに

本稿では,OpenRTM-aist をミドルウェアとして用いて,ロボットの制御を行うリアルタイムシステムを構築する方法について述べる.ロボット,C++言語,CORBA,OpenRTM-aistに関して基本的な知識を持った読者を対象とする.

最初にリアルタイムOSがどのような機能を持っているのか,リアルタイムシステムを構築する上でどのようなことに注意しなければならないかについて述べる.次に単一のRTコンポーネント(以下RTC)をリアルタイム実行する方法について述べ,続いて複数のRTCが連携して機能するリアルタイムシステムの構築方法について述べる.最後に動力学シミュレータOpenHRP を用いてリアルタイムシステムを検証する方法について述べる.

リアルタイムOS

リアルタイムOSとは

実世界と物理的な接触を行ないながら活動するロボットを制御するためには,センサからの情報を読み込んで,外界の状況に応じたロボットの動作を計算するための制御計算を正確に一定周期で行なう必要がある.このような一定周期で実行されるプログラムを周期タスクと呼ぶ.処理の内容によって周期は異なるが,多くの場合数十[Hz]から数[kHz]程度である.周期タスクを正確に実行するための機能を備えたのがリアルタイムOSであり,一般に利用されているパーソナルコンピュータにインストールされているWindowsやLinuxはこのような機能をもたない非リアルタイムOSである.

非商用のリアルタイムOSにはART-Linux や通常のLinuxカーネルにRT-Preemptパッチを適用したもの等がある.これらのOSはLinuxをベースとしており,使えるデバイスやソフトウェア資産が豊富であるという特徴がある.一方で商用のリアルタイムOSにはQNX やVxWorks がある.これらのOSは使えるデバイスが少ないものの,信頼性が高く,サポートが受けられるという利点がある.本稿ではOpenRTM-aistがOSとしてサポートしているLinuxベースのリアルタイムOSを用いてリアルタイムシステムを構築する.

RT-Preemptパッチ

RT-PreemptパッチとはLinuxでリアルタイムタスクの実行を可能とするための修正プログラムのことである.このパッチを通常のLinuxカーネルに適用してカーネルを構築することで(実行環境にもよるが)1[ms]周期で動作するプログラムを数十[us]程度の周期誤差で実行することが可能となる(以下RT-Preemptパッチが適用されたLinuxをPreemptive Linuxと呼ぶ).使用しているLinuxディストリビューションがUbuntu の場合には,以下のコマンドを実行することでこのパッチが適用されたカーネルがインストールされる.

 # apt-get install linux-rt

Preemptive Linuxを用いることで,マルチプロセッサシステムの処理能力をSMPで活用可能であり,標準規格POSIXに準拠した方法でリアルタイムタスクをプログラミング可能である.

POSIXでのリアルタイムタスクのプログラミングの概要を以下に示す.

 param.sched_priority = MY_PRIORITY;
 sched_setscheduler(0, SCHED_FIFO, &param); // (1)
 mlockall(MCL_CURRENT|MCL_FUTURE);
 stack_prefault();
 clock_gettime(CLOCK_MONOTONIC ,&t);
 t.tv_sec++;
 while(1) {
   clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &t, NULL); // (2)
   /* do the stuff */ // (3)
   t.tv_nsec += interval;
   while (t.tv_nsec >= NSEC_PER_SEC) {
     t.tv_nsec -= NSEC_PER_SEC;
     t.tv_sec++;
   }
 }

タスクの優先度を設定し(1),周期実行を行うメインループに入る.ループの先頭で実行を開始すべき時刻を待ち(2),時刻がきてスリープがとけると処理(3)を行う,という流れである.

ART-Linux

ART-LinuxはPreemptive Linuxと同様に,通常のLinuxカーネルを元に開発されたリアルタイムOSであり,石綿陽一氏が中心となって開発を行っている.カーネルのバイナリパッケージおよびソースパッケージはHP からダウンロード可能である.非リアルタイムOSをリアルタイムOS化する際の問題の一つが割り込みハンドラの取り扱いである.非リアルタイムOSではハードウェアからの割り込みが入ると割り込みハンドラが呼び出される.しかしリアルタイムタスクを実行している最中に割り込みハンドラが実行されてCPUを長時間占有してしまうとリアルタイムタスクの実行周期を乱す原因となる.そこでART-Linuxでは割り込みが入った瞬間には本来の割り込みハンドラを起動することはせずにフラグを立てるだけとし,割り込みハンドラは最低優先度のリアルタイムタスクとして実行してフラグが立っている場合には実際の処理を行うようになっている.例外はタイマ割り込みであり,この割り込みを起点として優先度が高いリアルタイムタスクから順に実行される仕組みとなっている.

またART-LinuxではマルチプロセッサシステムをAMPでサポートしている. ART-Linuxではリアルタイムタスクをプログラミングするため,独自のAPI を定義している.主に使用するのは以下の3つのAPIである.

  1. art_enter:非実時間プロセスを実時間タスクに変換
  2. art_exit:実時間タスクを非実時間プロセスに変換
  3. art_wait:次の周期まで実行を停止

ART-LinuxとPreemptive Linuxの違い

ART-LinuxとPreemptive Linuxは共に周期実行されるリアルタイムタスクをプログラミングする機能を提供するが,2点違いがあるので注意が必要である.

1点目はタスクが起床するタイミングの違いである.前節までに述べたようにART-Linuxでは全てのタスクは1[ms]周期のタイマ割り込みを起点として駆動されるのに対し,Preemptive Linuxでは次の起床時刻を絶対時刻で指定する.

高優先度と低優先度の2つのリアルタイムタスクがある場合を考えると,ART-Linuxの場合はタイマ割り込みが入ると高優先度のタスクがまず実行を開始し,実行が終わると即座に低優先度タスクの実行が開始される.これを利用して複数のタスクを優先度で順序づけて連続して実行することが可能である.一方Preemptive Linuxでは起床タイミングはそれぞれのタスクがclock_nanosleep()を使って指定するため,高優先度タスクの実行が終わっても低優先度タスクがすぐに起床するとは限らない.ART-Linuxと同様の挙動を得るためには,リアルタイムタスク間で起床タイミングを共有する必要がある.

art_vs_preempt.png

2点目はマルチプロセッサシステムのサポート方法である.ART-LinuxはAMP,Preemptive LinuxはSMPでマルチプロセッサシステムを利用可能とする.

SMP(Symmetric Multi Processing)は複数のプロセッサ上の実行環境がメモリ空間を共有する方式であり,Windowsや通常のLinuxでも採用されている方式である.この方式ではメモリ空間を共有している為に,プロセッサ間でデータを交換し易いという特長がある.一方AMP(Asymmetric Multi Processing)はそれぞれのプロセッサが独立したメモリ空間を持つ方式であり,1台の計算機上に複数のOSが実行されている形となる.メモリ空間が独立している為に,1つの実行環境に問題が発生してそのプロセサ上のOSが停止した場合でも,他のプロセッサ上の環境は動作を継続できるという特長がある.

smp_vs_amp.png

リアルタイムタスクプログラミングの注意点

注意が必要なのは,リアルタイムOSはユーザがどんなプログラムを書いても自動的に一定周期で動くようにしてくれるような魔法のシステムではない,ということである.周期タスクが想定した一定周期で実行されるには,リアルタイムタスクを実装するユーザ自身が処理が実行周期内に収まるように実装する必要がある.

リアルタイムタスクをプログラミングする上で大前提となるのは「処理時間が不定となることをしない」ということである.ユーザは処理時間の最悪値が実行周期を超えないように処理時間を見積もる必要があり,処理時間が不定の部分があっては処理時間を見積もることはできないためである.処理時間の見積を可能とするため,以下のような注意が必要である.

  • ディスクからの読み出し/書き込みをしない
  • 標準出力/エラー出力への出力は控える
  • ネットワークからの読み込み/書き出しをしない(同一ホスト内であっても)
  • 不要なタスクスイッチを起こさせない

RTCを実時間タスク化するには?

RTCを新たに開発する場合に,ユーザがプログラムするのはonActivated(), onExecute(), onDeactivated()などのコールバック関数であり,これらがどこから呼び出されているのかユーザは意識する必要がないようになっている.これらを呼び出しているのは実行コンテキストと呼ばれる部分である.実行コンテキストはRTC内の状態遷移マシンの状態に応じて適切なコールバック関数を呼び出すスレッドを実行する.

rtcmodel.png

この実行コンテキストには通常非リアルタイムの周期実行コンテキストが使用されており,その実装は以下のようになっている.

 // src/lib/rtm/PeriodicExecutionContext.cpp
 int PeriodicExecutionContext::svc(void) {
   do{
     m_worker.mutex_.lock();
     while (!m_worker.running_){
       m_worker.cond_.wait();
     }
   if (m_worker.running_){ 
     std::for_each(m_comps.begin(), m_comps.end(), invoke_worker()); // (1)
   }
   m_worker.mutex_.unlock();
   if (!m_nowait) { coil::sleep(m_period); } // (2)
   } while (m_svc);
   return 0;
 }

invoke_workerがRTC内の状態遷移マシンの現在の状況に応じたコールバック関数を呼び出し(1),その後1周期分の時間だけsleepによって実行を停止するという流れである.この実行コンテキストは通常のスレッドとして実行されるため,この実行コンテキストによって駆動されるRTCは非リアルタイムである.従ってRTCをリアルタイム実行する為には,この実行コンテキストをリアルタイム化する必要がある.

OpenRTM-aist-1.0-C++にはART-Linux用の実行コンテキストが含まれている.その実装は以下のようになっている.

 // src/ext/artlinux/art_ec/ArtExecutionContext.cpp  
 int ArtExecutionContext::svc(void) {
   if (art_enter(ART_PRIO_MAX, ART_TASK_PERIODIC, m_usec) == -1){ // (1) 
     std::cerr << "fatal error: art_enter" << std::endl;
   }
   do {
     std::for_each(m_comps.begin(), m_comps.end(), invoke_worker()); // (2)
     if (art_wait() == -1){ // (3)
       std::cerr << "fatal error: art_wait " << std::endl;
     }
   } while (m_running);
   if (art_exit() == -1) { // (4)
     std::cerr << "fatal error: art_exit" << std::endl;
   }
   return 0;
 }

最初に非リアルタイムプロセスをリアルタイムプロセスに変換し(1),メインループの部分では,RTCの状態遷移マシンの状態に応じたコールバック関数の呼び出し(2)と次の実行周期を待つ処理(3)を繰り返し,ループを抜けた後ではリアルタイムプロセスを非リアルタイムプロセスに戻す処理(4)を行っている.

OpenRTM-aistでは新たなタイプの実行コンテキストをシェアードライブラリ型式で追加することが可能である.新たな実行コンテキストの追加やどの実行コンテキストを使うかといった設定は設定ファイルであるrtc.confによって指定が可能である.例えばART-Linux用の実行コンテキストを使用するには,以下のように設定する.

 # rtc.conf
 corba.nameservers: localhost:2809
 naming.formats: %n.rtc
 logger.enable: YES
 logger.log_level: NORMAL
 logger.file_name: stdout
 exec_cxt.periodic.rate: 1000
 manager.modules.load_path: /usr/local/lib # (1)
 manager.modules.preload: ArtExecutionContext.so # (2)
 exec_cxt.periodic.type: ArtExecutionContext # (3)

まずシェアードライブラリを検索するパスを指定(1)し,ART-Linux用の実行コンテキストの実装が含まれたシェアードライブラリのダイナミックロードを指定(2)し,最後にダイナミックロードした実行コンテキストの使用を指定(3)している.

複数のRTCを連携させながら実時間実行するには?

複数RTCのrtcdを用いた同一プロセス上での生成

前節で述べた方法で個々のRTCをリアルタイム実行することは可能であるが,複数のRTCが連携して動作するシステムをRTC間のコミュニケーションも含めてリアルタイムで動作させるためには,RTCの生成等を適切に行う必要がある.コミュニケーションをリアルタイムで行う上で最も大きな障害はプロセス間通信である.プロセス間通信を行うと通信が完了するまでに要する時間の最悪値を見積もることができないため,リアルタイム実行を行うことができない.従ってコミュニケーションをリアルタイムで行うためにはこのプロセス間通信を行わないシステム構成が必要となる.プロセス間通信を行わずにRTC間のコミュニケーションを行うには複数のRTCを同一プロセス上に生成すればよい.これはOpenRTM-aistが通信レイヤに用いているCORBAの多くの実装では,送信側と受信側が同一プロセス内にある場合,通常はプロセス間通信で行われる処理が単なる関数呼び出しで行われることを利用したものである.

複数のRTCを同一プロセス上に生成するには,OpenRTM-aistに付属しているrtcdコマンドを使用する.このコマンドを用いてRTCを生成する方法は2つある. 一つ目はrtc.confを利用する方法である.以下にrtc.confの例を示す.

 # rtc.conf
 corba.nameservers: localhost:2809
 naming.formats: %n.rtc
 logger.enable: YES
 logger.log_level: NORMAL
 logger.file_name: stdout
 exec_cxt.periodic.rate: 1000
 manager.modules.load_path: /usr/local/lib # (1)
 manager.modules.preload: componentA.so, componentB.so # (2)
 manager.components.precreate: componentA, componentB # (3)

この例では2種類のRTCのインスタンスをそれぞれ一つ同一プロセス上に生成している.まずRTCのシェアードライブラリを検索するパスを指定(1)し,RTCを定義するシェアードライブラリをダイナミックロードすることを指定(2)し,最後にインスタンスを一つずつ生成することを指定(3)している. rtc.confを用いた方法では,rtcdが起動する際に構築されるシステム構成を設定することができるが,よりダイナミックにシェアードライブラリのロードやRTCの生成を行うにはrtcd内にあるManagerオブジェクトのCORBAインタフェースを用いる.インタフェースは以下のように定義されている.

 // Manager.idl
 interface Manager
 {
   RTC::ReturnCode_t load_module(in string pathname, in string initfunc);
   RTC::RTObject create_component(in string module_name);
   …
 };

load_module()によって新たなRTCのシェアードライブラリをロードし,create_component()でロード済みのRTCのインスタンスを生成することができる.これらのインタフェースを必要に応じて呼び出すことでダイナミックにシステムの構成を変更することが可能である. rtcdを用いて複数のRTCを同一プロセス上に生成した場合(下図右),RTC間のコミュニケーションをリアルタイム化できるという利点があるが,一つのRTCが不正な処理を行った場合,そのRTCが存在するrtcd毎異常終了してしまうため,そのrtcd上で動作している全てのRTCが終了してしまう.一方複数のRTCをそれぞれ独立したプロセスとして実行した場合(下図左)は,コミュニケーションをリアルタイムで行うことはできないものの一つのRTCが異常終了しても他のRTCはそのまま動き続けるという利点があるので場合に応じて使い分けが必要である.

rtc_and_manager.png

複数RTCの単一実行コンテキストによる同期駆動

複数のRTCを連携させる場合には,データの依存関係に対応して複数のRTCを特定の順序で実行したい場合がある.このようなシステムは複数のRTCを同一の実行コンテキストに割り当てることで実現可能である. 通常実行コンテキストはRTCが生成される際に同時に生成されるために,RTCと実行コンテキストが一対一対応しているが,この対応関係は自由に変更可能である.対応関係を変更するには実行コンテキストが持つ以下のようなインタフェースを用いる.

 // RTC.idl
 interface ExecutionContext
 {
     : // 略
    ReturnCode_t add_component(in LightweightRTObject comp);
    ReturnCode_t remove_component(in LightweightRTObject comp);
     : // 略
 }

これらのインタフェースを用いて実行コンテキストとRTCの対応関係を変更することにより,1つの実行コンテキストで複数のRTCを順番に実行したり(下図左),複数の実行コンテキストで1つのRTCを実行したり(下図右)することができる.下図左のような構成は1枚の画像に複数のフィルタ処理を順に適用するような場合に,下図右のような構成は一方の実行コンテキストで短い周期で計算を行いながら,もう一方の実行コンテキストでは比較的長い周期で計算結果を表示する,といった利用が可能である.

rtc_ec_attach.png

RTC間の接続設定

複数のRTCを一つの実行コンテキストで同期的に駆動する場合には,RTC間のデータ伝送も同期的に行われなければならない.このためにはサブスクリプションタイプを適切に設定する必要がある.サブスクリプションタイプとはデータポート間でPush型通信を行う場合の送信方式を指定する物である.サブスクリプションタイプには以下のようなタイプがある.

  • New:送信バッファにデータが入り次第送信スレッドが送信を開始する
  • Periodic:送信バッファにたまったデータを送信スレッドが定期的に送信する
  • Flush:write()を呼び出したスレッドが実際の送信を実行する.write()は送信が完了するまで戻らない

NewおよびPeriodicでは送信スレッドが介在するため,データポートで接続された2つのRTCを一つのRTCで逐次実行した場合に,先に実行されたRTCが計算した結果が,次に実行されたRTCが実行を開始した瞬間にそのRTCに到達しているとは限らない.そのため,このような場合にはFlushを指定する必要がある. RTSystemEditorを用いてデータポート間の接続を行う場合,接続時に表示される次のようなダイアログの⑤の部分でサブスクリプションタイプを設定可能である.

connect_dialog.png

下図のような2つのプロセス(rtcd),3つのRTCからなるシステムを考える.リアルタイム実行を行っているRTCから別プロセス上のRTCへデータの送信を行う場合にはサブスクリプションタイプをFlush以外に設定しなければならない.これはリアルタイムタスク内でネットワーク通信を行わない為である.またサービスポートの利用に関しては,同一プロセス内のサービスをリアルタイム実行しているRTCが利用することは問題ないが,サービスが外部プロセス上にある場合は利用してはならない.これもリアルタイムタスク内部からのネットワーク通信を防ぐためである.逆にサービスを外部プロセス上のRTCに対して提供するのはリアルタイム実行の妨げにはならない.これはサービスに対するリクエストは別スレッドで処理されるためである.

note_subscriptiontype.png

その他のTips

その他OpenRTM-aistをリアルタイムシステムの構築に用いる際には以下のような注意が必要である.

ログレベルに注意

OpenRTM-aistのログは通常ファイルへ出力されるが,ファイル出力はリアルタイム実行の妨げになりかねないので注意が必要である.ロガーはrtc.confの設定で無効にすることが可能であるが,無効にした場合でもファイルへの出力がなされないだけで処理に大きな時間を要することに注意が必要である.ロガーレベルを十分に低い(Error等)レベルに設定しておくことが必要である.

リングバッファのバッファ長の設定に注意

データポート間の接続にはリングバッファが内蔵されている.これは実行周期の異なるRTCの接続に対応するためであるが,同期実行しているRTC間の接続ではこのバッファが意図しないデータの遅れにつながる場合がある.このような遅れを防ぐためにはバッファの長さを1にしておくとよい.

シミュレーションと実験との切り替え

開発した制御システムを実際のハードウェアに適用する前にシミュレータで検証することは,ロボット自身や周辺の環境,人に対して損害を与えないために重要である.制御システムをシミュレータ上で検証する場合には,シミュレーション世界でのロボットの振舞と実際のロボットの振舞が十分に一致していること,またそれらの切り替えが容易にできるようになっていることが重要である.これを実現するうえで考慮しなければならない点は以下の2点である.

  1. シミュレーションと実験では入出力先が異なる.実験時の入出力先は実際のロボットハードウェアであるが,シミュレーション時はシミュレーション世界に存在する仮想的なハードウェアである
  2. 時間の流れ方が異なる.実験時にはロボット上のリアルタイムOSが実世界の時間に基づいて駆動されるタイマを用いてリアルタイムタスクを一定周期で駆動する.一方,シミュレーション時にはシミュレーション世界の時間の進行速度はシミュレータを実行する計算機のスペックやシミュレーション世界の複雑さによって変化する.

1点目に関してはシミュレータに対する入出力と実際のハードウェアに対する入出力の型式を揃えておくことで,実行するRTCを差し替えるだけでシミュレーションと実験が済むようにすることが可能である.

controller_bridge.png

OpenHRPではシミュレーション世界に存在するロボットに対応するRTCを作り出す仕組みとしてopenhrp-controller-bridgeを提供している.起動時の引数や設定ファイルによって,そのRTCがもつセンサやアクチュエータへの入出力ポートの構成を設定可能であるので,この構成と実際のロボットハードウェアに対する入出力を行うRTCのポートの構成を合わせておき,差し替え可能としておけばよい.

extrig_ec.png

2点目に関しては実験時にはリアルタイムで駆動されるRTCをシミュレーション世界の時計に従って駆動することが必要となる.このような駆動方法は外部トリガ駆動実行コンテキストを用いることで実現可能である.外部トリガ駆動実行コンテキストとは外部からトリガが与えられる毎に一度だけRTCの実行を行う実行コンテキストであり,以下のように定義されている.通常の実行コンテキストではRTCの駆動を行うループが実装されるメンバ関数svc()が空となっている代わりに外部からのトリガを受けてRTCを一度だけ駆動するメンバ関数tick()が定義されている.

外部トリガ駆動実行コンテキスト

 // src/lib/rtm/OpenHRPExecutionContext.cpp 
 int OpenHRPExecutionContext::svc(void)
 {
     return 0;
 }
 void OpenHRPExecutionContext::tick() throw (CORBA::SystemException)
 {
     std::for_each(m_comps.begin(), m_comps.end(), invoke_worker());
     return;
 }

Documentation:

Download

latest Releases : 2.0.0-RELESE

2.0.0-RELESE Download page

Number of Projects

Choreonoid

Motion editor/Dynamics simulator

OpenHRP3

Dynamics simulator

OpenRTP

Integrated Development Platform

AIST RTC collection

RT-Components collection by AIST

TORK

Tokyo Opensource Robotics Association

DAQ-Middleware

Middleware for DAQ (Data Aquisition) by KEK