Notes C API & C++11+ & ReactiveX & Qt #9 ~RxでNotes C API関数をラップ(実装編)
前回の続きになります。
ソースコードは、いつものようにこちらをgit clone
して、git checkout v0.0.2
してください。
実装コード
前回目標にした、「Notes C API関数をRx的にラップする」実装側です。
rx::observable<GetServerLatency::ReturnValues> GetServerLatency::operator ()( const QByteArray &serverName, DWORD timeout ) { return rx::observable<>::create<GetServerLatency::ReturnValues>( [this, &serverName, &timeout](rx::subscriber<GetServerLatency::ReturnValues> o) { try { Status status = NSFGetServerLatency( const_cast<char*>(serverName.constData()), timeout, clientToServer_.pValue(), serverToClient_.pValue(), version_.pValue() ); if (status.hasError()) throw status; GetServerLatency::ReturnValues values; values.version_ = version_.value(); values.clientToServer_ = clientToServer_.value(); values.serverToClient_ = serverToClient_.value(); o.on_next(values); o.on_completed(); } catch (...) { o.on_error(std::current_exception()); } }); }
GetServerLatency::ReturnValues
について補足します。これは、GetServerLatencyクラス内で定義されている内部構造体です。
/** * @brief 出力用データ型 * @struct ReturnValues */ struct ReturnValues { WORD version_; // サーババージョン DWORD clientToServer_; // クライアントからサーバへの待ち時間 DWORD serverToClient_; // サーバからクライアントへの待ち時間 };
Notes C API関数のNSFGetServerLatency
が返す4つの値の内、STATUS値を除く3つの値を1個のデータとして扱えるようにした構造体です。ReactiveXでは、原則流す値は1個です。複数の値を流すときは配列や連想配列、構造体やクラス、タプルなどのコンテナにまとめます。このReturnValues
構造体もそのためです。
さて、かっこ演算子をオーバーロードしたこのメソッドは、このReturnValues
を呼び出し元に返すのが目的ですが、実際には次のように定義されています。
rx::observable<GetServerLatency::ReturnValues>
rx
は名前空間の別名で、実際にはrxcpp
とrxcpp::operators
を表しています。nx::observable
の実際のシグネチャはrxcpp::observable
になります。ここでは、「ReturnValues
型のデータを扱うコンテナのようなもの」程度に思っていてください。
次にメソッド内を見てみると、いきなりreturn
文が書かれていますが、引数を省略すると、次のようになります。
return rx::observable<>::create<GetServerLatency::ReturnValues>(/*引数*/);
実際にはたった1文しかない実装になります。observable::create
メソッドが、「引数を元にobservable
(監視可能)なものを作って返す」ということですね。rx::observable::create<ReturnValues>
は、ReturnValues
型のデータを流す「水源」を作るメソッドです。「水源」から流れた川を流れるのは「水」ですから、observable::create<ReturnValues>
から流れてくるのはReturnValues
です。
まだちんぷんかんぷんかもしれませんが、今はまだなんとなくでいいです。
では、ReturnValues
をどうやって流すのか、それを決めるのがcreate
に渡される「引数」です。引数にはロジック(関数やラムダ式)を渡します。いろいろ簡略化すると、以下のようなことになります。
create((subscriber o) { try { o.on_next(/* ReturnValues型の値 */); o.on_completed(); } catch (...) { o.on_error(/* 例外 */); } });
ロジックの引数にはrx::subscriber<ReturnValues>
型なる値が渡されます。subscriber
を直訳すると、「加入者」とか「愛読者」とか出てきますが、Rxの世界ではsubscribe
(購読する)という言葉を使うので、個人的には「購読者」という理解が近いと思っています。購読者にon_next
メソッドを使ってReturnValues
型の値を流すと、購読者は値を読めるということになります。
整理してみましょう。冒頭のソースコードに表されているのは、以下のようなことになります。
NSFGetServerLatency
を実行する。- ステータス値が正常でなければ、
subscriber.on_error
を呼び出す。 - ステータス値が正常であれば、出力値を
ReturnValues
にまとめ、subscriber.on_next
で流す。
以上のロジックを持つ・・・
rx::observable
オブジェクトを作成して返す。
ロジックを目の当たりにしてしまうので、Rxや関数型プログラミングに慣れていない人はよく混乱するんですが、observable::create
がしていることは、あくまでobservable
オブジェクトを返すことだけです。ロジックは一切実行されていません。
nx::GetServerLatency()(pServer);
と実行しても、NSFGetServerLatency
関数は実行されません。前述のコードは、もともとこういう意味になります。
rx::observable<ReturnValues> ob = nx::GetServerLatency()(pServer);
変数ob
が作成されただけです。NSFGetServerLatency
を含んだロジックはどこにいったのか?そうです。このob
変数にしまわれています。では、どうしたらこのob
変数にしまわれてしまったロジックを実行できるのか?それが、先程も出てきたsubscribe
(購読する)メソッドです。先程のob
変数では、次のようにします。
ob.subscribe(/* ReturnValuesを受け取りロジック */);
メソッドsubscribe
は、observable
オブジェクトが持っているロジックを活性化して、データを流す役割があり、流れてきたデータは、引数に指定された受け取りロジックに渡します。ソースコードncl/main.cpp
ではこのように書いています。
nx::GetServerLatency getServerLatency(true, true, true); getServerLatency(pServer, 0).subscribe( [&pServer](nx::GetServerLatency::ReturnValues values) { // 標準出力 QTextStream out(stdout, QIODevice::WriteOnly); out << QObject::tr("Build version of '%1'").arg(pServer) << ": " << values.version_ << endl << QObject::tr("Latency time for client to server") << ": " << QString("%1 ms").arg(values.clientToServer_) << endl << QObject::tr("Latency time for server to client") << ": " << QString("%1 ms").arg(values.serverToClient_) << endl; } , [](std::exception_ptr ep) { try {std::rethrow_exception(ep);} catch (nx::Status status) { // 標準エラー出力 QTextStream out(stderr, QIODevice::WriteOnly); out << QObject::tr("NSFGetServerLatency status") << ": " << status.error() << endl; } });
抜き出して簡略化すると、こうなります。
nx::GetServerLatency()(pServer).subscribe(/* 正常ロジック */, /* 異常ロジック */);
pServer
でサーバ名を渡した時点では、NSFGetServerLatency
は実行しておらず、そこで作られたobservable<ReturnValues>
型のオブジェクトで、subscribe
したときに実行されます。活性化されたロジックは、NSFGetServerLatency
関数を実行し、正常に実行できれば、ReturnValues
構造体を作り、subscriber
(購読者)のon_next
メソッドに渡します。subscriber
は、いわばsubscribe
メソッドが放った「エージェント」の様な存在で、on_next
に渡された値は、結局のところsubscribe
メソッドの引数「正常ロジック」に渡されます。
ついていけていませんか?心配ありません。今までのパラダイムでプログラミングしていれば、こんなパラダイムをすぐ理解できる方がまれじゃないでしょうか。今は、「subscribe
して初めてロジック全体が実行される」ということだけ覚えていただければよいです。
回りくどいですか?ひどく同感です。しかし、こうすることで実際には多くの恩恵が潜んでいるらしいです。私自身は、エラー処理が楽になるとか、ロジックの変更がしやすいとか、単体テストがしやすいとか、同期/非同期の差がなくなるとか、まだまだその程度の実感です。
ReactiveXは、理屈をいくら重ねても理解しにくいかもしれませんが、実装例を見て「こうしたらこうなる」「こうしてみたらこう変わる」というのを体感していく方がよいでしょう。ReactiveXが実装されている言語はJava、JavaScript、.NET、Rubyなど多くの言語で展開されています。自身が得意な言語で、新しいパラダイムに触れてみてはいかがでしょうか。
P.S. あいにくLotusScriptのReactiveXは、私が個人的に実装した例以外には見当たらないようですが。(^^;)どこかでお披露目できればと思っています。