Notes C API & C++11+ & ReactiveX & Qt #9 ~RxでNotes C API関数をラップ(実装編)

前回の続きになります。

ソースコードは、いつものようにこちらをgit cloneして、git checkout v0.0.2してください。

chiburusystems.hatenablog.com

実装コード

前回目標にした、「Notes C API関数をRx的にラップする」実装側です。

rx::observable<GetServerLatency::ReturnValues> GetServerLatency::operator ()(
    const QByteArray &serverName,
    DWORD timeout
    )
{
  return rx::observable<>::create<GetServerLatency::ReturnValues>(
        [this, &serverName, &timeout](rx::subscriber<GetServerLatency::ReturnValues> o) {
    try {
      Status status = NSFGetServerLatency(
            const_cast<char*>(serverName.constData()),
            timeout,
            clientToServer_.pValue(),
            serverToClient_.pValue(),
            version_.pValue()
            );
      if (status.hasError())
        throw status;
      GetServerLatency::ReturnValues values;
      values.version_ = version_.value();
      values.clientToServer_ = clientToServer_.value();
      values.serverToClient_ = serverToClient_.value();
      o.on_next(values);
      o.on_completed();
    }
    catch (...) {
      o.on_error(std::current_exception());
    }
  });
}

GetServerLatency::ReturnValuesについて補足します。これは、GetServerLatencyクラス内で定義されている内部構造体です。

  /**
   * @brief 出力用データ型
   * @struct ReturnValues
   */
  struct ReturnValues
  {
    WORD version_; // サーババージョン
    DWORD clientToServer_; // クライアントからサーバへの待ち時間
    DWORD serverToClient_; // サーバからクライアントへの待ち時間
  };

Notes C API関数のNSFGetServerLatencyが返す4つの値の内、STATUS値を除く3つの値を1個のデータとして扱えるようにした構造体です。ReactiveXでは、原則流す値は1個です。複数の値を流すときは配列や連想配列、構造体やクラス、タプルなどのコンテナにまとめます。このReturnValues構造体もそのためです。

さて、かっこ演算子オーバーロードしたこのメソッドは、このReturnValuesを呼び出し元に返すのが目的ですが、実際には次のように定義されています。

rx::observable<GetServerLatency::ReturnValues>

rx名前空間の別名で、実際にはrxcpprxcpp::operatorsを表しています。nx::observableの実際のシグネチャrxcpp::observableになります。ここでは、「ReturnValues型のデータを扱うコンテナのようなもの」程度に思っていてください。

次にメソッド内を見てみると、いきなりreturn文が書かれていますが、引数を省略すると、次のようになります。

return rx::observable<>::create<GetServerLatency::ReturnValues>(/*引数*/);

実際にはたった1文しかない実装になります。observable::createメソッドが、「引数を元にobservable(監視可能)なものを作って返す」ということですね。rx::observable::create<ReturnValues>は、ReturnValues型のデータを流す「水源」を作るメソッドです。「水源」から流れた川を流れるのは「水」ですから、observable::create<ReturnValues>から流れてくるのはReturnValuesです。

まだちんぷんかんぷんかもしれませんが、今はまだなんとなくでいいです。

では、ReturnValuesをどうやって流すのか、それを決めるのがcreateに渡される「引数」です。引数にはロジック(関数やラムダ式)を渡します。いろいろ簡略化すると、以下のようなことになります。

create((subscriber o) {
  try {
    o.on_next(/* ReturnValues型の値 */);
    o.on_completed();
  } catch (...) {
    o.on_error(/* 例外 */);
  }
});

ロジックの引数にはrx::subscriber<ReturnValues>型なる値が渡されます。subscriberを直訳すると、「加入者」とか「愛読者」とか出てきますが、Rxの世界ではsubscribe(購読する)という言葉を使うので、個人的には「購読者」という理解が近いと思っています。購読者にon_nextメソッドを使ってReturnValues型の値を流すと、購読者は値を読めるということになります。

整理してみましょう。冒頭のソースコードに表されているのは、以下のようなことになります。

  1. NSFGetServerLatencyを実行する。
  2. ステータス値が正常でなければ、subscriber.on_errorを呼び出す。
  3. ステータス値が正常であれば、出力値をReturnValuesにまとめ、subscriber.on_nextで流す。

以上のロジックを持つ・・・

  1. rx::observableオブジェクトを作成して返す。

ロジックを目の当たりにしてしまうので、Rxや関数型プログラミングに慣れていない人はよく混乱するんですが、observable::createがしていることは、あくまでobservableオブジェクトを返すことだけです。ロジックは一切実行されていません。

nx::GetServerLatency()(pServer);

と実行しても、NSFGetServerLatency関数は実行されません。前述のコードは、もともとこういう意味になります。

rx::observable<ReturnValues> ob = nx::GetServerLatency()(pServer);

変数obが作成されただけです。NSFGetServerLatencyを含んだロジックはどこにいったのか?そうです。このob変数にしまわれています。では、どうしたらこのob変数にしまわれてしまったロジックを実行できるのか?それが、先程も出てきたsubscribe(購読する)メソッドです。先程のob変数では、次のようにします。

ob.subscribe(/* ReturnValuesを受け取りロジック */);

メソッドsubscribeは、observableオブジェクトが持っているロジックを活性化して、データを流す役割があり、流れてきたデータは、引数に指定された受け取りロジックに渡します。ソースコードncl/main.cppではこのように書いています。

  nx::GetServerLatency getServerLatency(true, true, true);
  getServerLatency(pServer, 0).subscribe(
        [&pServer](nx::GetServerLatency::ReturnValues values) {
    // 標準出力
    QTextStream out(stdout, QIODevice::WriteOnly);
    out << QObject::tr("Build version of '%1'").arg(pServer)
        << ": "
        << values.version_
        << endl
        << QObject::tr("Latency time for client to server")
        << ": "
        << QString("%1 ms").arg(values.clientToServer_)
        << endl
        << QObject::tr("Latency time for server to client")
        << ": "
        << QString("%1 ms").arg(values.serverToClient_)
        << endl;
  }
  , [](std::exception_ptr ep) {
    try {std::rethrow_exception(ep);}
    catch (nx::Status status) {
      // 標準エラー出力
      QTextStream out(stderr, QIODevice::WriteOnly);
      out << QObject::tr("NSFGetServerLatency status")
          << ": "
          << status.error()
          << endl;
    }
  });

抜き出して簡略化すると、こうなります。

nx::GetServerLatency()(pServer).subscribe(/* 正常ロジック */, /* 異常ロジック */);

pServerでサーバ名を渡した時点では、NSFGetServerLatencyは実行しておらず、そこで作られたobservable<ReturnValues>型のオブジェクトで、subscribeしたときに実行されます。活性化されたロジックは、NSFGetServerLatency関数を実行し、正常に実行できれば、ReturnValues構造体を作り、subscriber(購読者)のon_nextメソッドに渡します。subscriberは、いわばsubscribeメソッドが放った「エージェント」の様な存在で、on_nextに渡された値は、結局のところsubscribeメソッドの引数「正常ロジック」に渡されます。

ついていけていませんか?心配ありません。今までのパラダイムでプログラミングしていれば、こんなパラダイムをすぐ理解できる方がまれじゃないでしょうか。今は、「subscribeして初めてロジック全体が実行される」ということだけ覚えていただければよいです。

回りくどいですか?ひどく同感です。しかし、こうすることで実際には多くの恩恵が潜んでいるらしいです。私自身は、エラー処理が楽になるとか、ロジックの変更がしやすいとか、単体テストがしやすいとか、同期/非同期の差がなくなるとか、まだまだその程度の実感です。

ReactiveXは、理屈をいくら重ねても理解しにくいかもしれませんが、実装例を見て「こうしたらこうなる」「こうしてみたらこう変わる」というのを体感していく方がよいでしょう。ReactiveXが実装されている言語はJavaJavaScript、.NET、Rubyなど多くの言語で展開されています。自身が得意な言語で、新しいパラダイムに触れてみてはいかがでしょうか。

ReactiveX

P.S. あいにくLotusScriptのReactiveXは、私が個人的に実装した例以外には見当たらないようですが。(^^;)どこかでお披露目できればと思っています。