あけましておめでとうございます。id:i000i0 です。
去年の9月に入社して以来、Scalaを使って広告の運用ツールを開発しています。
Scalaで開発していると強力な標準ライブラリのお世話になりますが、その内部実装がどうなっているかについて気になることがあります。
そこで今回は、非同期処理に使う scala.concurrent.Future
の実装についてコードを追ってみたいと思います。
ちなみにScalaのバージョンは現在の最新版である2.11.7です。
Futureオブジェクトの生成
Futureオブジェクトの生成から処理を追っていくとここにたどり着きます。 ここでは以下の様な処理を行っています。
- 「非同期に実行したい処理」を渡した
PromiseCompletingRunnable
オブジェクトを生成 - executor上 で 1. で生成したオブジェクトを実行
- 1.で生成したオブジェクトの promise.future を返す(これが scala.concurrent.Future型)
まずは 2. の executor について見てみます。
ExecutionContext
executor の型は ExecutionContext
です。
traitになっており、複数の実装から利用するものを選べるようになっています。
例として、 java.util.concurrent.ExecutorService
から生成する場合の ExecutionContext.fromExecutorService
を見てみます。
execute
メソッドは単に executor.execute
を呼び出しています。
要するに渡された ExecutorService
上で非同期に実行しているだけですね。
この他にもExecutionContext
の実装は幾つかありますが、そこを追うのは本筋とは外れるので省略します。
ちなみにFutureオブジェクト生成時に ExecutionContext
が見つからなかった場合には以下の様なお馴染みの(?)エラーメッセージが出力されますが..
scala> import scala.concurrent.Future import scala.concurrent.Future scala> Future() <console>:12: error: Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global. Future() ^
この scala.concurrent.ExecutionContext.Implicits.global
は以下のようになっています。
lazy val なので、一度呼び出して初期化すると、以降の呼び出しでは同じインスタンスが返されます。
よってこのcontextをimportした処理では同じcontextが使い回されるということになります。
スレッドプールの設定等は各アプリケーションの開発者が明示的に制御したくなるはずであり、プロダクションコードではこのglobalなcontextを使うようなケースはあまりないと思います。
参考: アプリケーションに合ったExecutionContextを使う
PromiseCompletingRunnable
ExecutionContext
については何となく理解できたような気分になったので、Futureオブジェクトの生成処理内で生成していた PromiseCompletingRunnable
を見てみます。
ExecutionContext上で実行するためにRunnableを継承しています。
内部で DefaultPromise
を生成していますが、こちらについては後述します。
runメソッドで DefaultPromise#complete
に、非同期に実行したい処理を渡しています。
非同期に実行したい処理が例外を投げなければ Success
、NonFatalな例外を投げたら Failure
、Fatalな例外ならそれをスロー、という感じです。
つまり、非同期で実行したかった処理を実際に呼び出しているのはここです。
ただし ExecutionContext
上で実行するので、非同期に実行されることになります。
DefaultPromise
PromiseCompletingRunnable
内で生成していた DefaultPromise
について見てみます。
直前に大量のコメントが書いてありますが、コメントによると DefaultPromise
は3つの状態のいずれかになるとのことです。
- 未完了(完了待ちコールバックのリスト。
List[CallbackRunnable]
で表される) - 完了(非同期処理の結果。
Try[T]
で表される) - 他のPromiseとリンク(
DefaultPromise[T]
で表される)
この状態を保持するオブジェクトの型は AnyRef(Object) というなかなかワイルドな実装になっています。
コンストラクタ処理の先頭でupdateState
なるメソッドでNilに設定していますが、これは状態を未完了(空のコールバックリスト) に設定しているということです。
なお、DefaultPromise
は、scala.concurrent.impl.Promise
を継承しています。
更にこいつは scala.concurrent.Promise
と、 scala.concurrent.Future
を継承しています。
最初に見たFutureオブジェクトの生成処理内で、 promise.future
を返していたと思いますが、実はこれは DefaultPromise
のインスタンスそのものです。
Futureのメソッドの実装
ここまで読むと、何となくFutureの実装が読めそうな気分になってきたので、Futureに実装されているメソッドがどのような実装になっているかを見てみます。
map
https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FFuture.scala#L233
onComplete
を呼んでいます。
更に、 dispatchOrAddCallback
を呼んでいます。
DefaultPromise
の状態によって条件分岐しています。
状態は先述の通り3種類あります。
Futureの処理が完了している(状態が Try[_]) ならmapで渡した処理を実行し、まだFutureの処理が終わっていないならコールバックのリストにmapで渡した処理を追加、という感じです。
このような実装になっているため、既に処理が完了したFutureに対しても、その処理結果を利用する別の処理を、後から渡すことができます。
flatMap
先に書いたように DefaultPromise
の状態は3つありました。
そのうちの一つに、他の Promise とリンクしているというのがあったと思いますが、この状態には flatMap メソッドを使った時になり得ます。
https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FFuture.scala#L246
このような実装になっている理由についてはソースコード上にコメントで残されています。 曰く、
- flatMapが再帰的に呼ばれた際に、単純にonCompleteとPromiseのチェーンを繋げていくと、最後のflatMapに渡したFutureの処理が完了した後でないとGCで回収されない
- このスペースリークは内部的に行われ、ユーザーからは直接見えないため発生しやすい
この挙動を防ぐために、 DefaultPromise
ではPromiseのチェーンを作る代わりに、チェーンを辿って大元のPromiseに直接リンクするという実装になっています。
まとめ
細かい部分まで追うとあまりにも長くなってしまうのでかなり省略しましたが、ScalaのFutureの実装について追ってみました。
内部実装を読んでみると、細かいところで効率化のための工夫がなされたりしていて、標準ライブラリの有り難みが増したような気がします。
これからも気になる機能については時間を見つけて内部実装を読んでみたいと思います。