こんにちは。3月にセプテーニ・オリジナルに入社した河内です。 新しい環境でフレッシュな気持ちで業務に取り組んでおります。
さて scala.concurrent.blocking()
は全く新しくない話ですが、あまり知られていないのかな、と感じるので書いておきます。
scala.concurrent.blocking()
をブロッキング処理をマークするのに使います。
実行中のスレッドに関連付けられた *1 BlockContext
によっていい感じに取り扱われます。
デフォルトの BlockContext
は渡された引数を単に評価するだけです*2。
BlockContext.withBlockContext()
を使うと、指定した BlockContext
を使ってブロッキング処理を扱うことができます。
次の例では blocking 前に log を出す BlockContext を作って利用しています。withBlockContext()
で指定することで blocking()
で囲まれた処理を実行する前に log が出力されます。
val logContext = new BlockContext { override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { println("Start blocking") thunk } } BlockContext.withBlockContext(logContext) { blocking { println("Blocking code") } }
withBlockContext()
を使って BlockContext
を自分で指定することは少ないと思います。おそらく最もよく出てくる場面は ExecutionContext.Implicits.global
(以下 global
) の上で Future
を走らせるときではないでしょうか。
global
は java.util.concurrent.ForkJoinPool
を用いて実装されています。ForkJoinPool
は限られたスレッドを使ってCPUを効率的に利用する仕組みですが、ブロッキング処理が混ざるとCPUを効率的に利用できません。そこで登場するのが ForkJoinPool.ManagedBlocker
です。この中でブロッキング処理を実行すると、専用のスレッドが起動されるため、元々の限られたスレッドは別の計算を行うために引き続き利用できます。
blocking()
を使わずに global
上でブロッキング処理を行うと、CPUを効率的に利用できません。
global
上で blocking()
を実行すると ForkJoinPool.ManagedBlocker
を利用して内部の処理を行います。
これにより global
上でも CPU を効率的に利用できる環境が保てます。
デフォルトではそのまま実行され、global
などの環境では専用スレッドを起こしてくれるので、ブロッキング処理は blocking()
で囲みましょう。。。
で、話が終わればいいのですが、 Scala 2.11 以前では気をつけなければならない事項があります。
まず、できるだけ blocking()
の中で blocking()
を呼ばないこと。
global
では blocking()
を呼び出したときにスレッド (Thread1) が作られ、その中で再度 blocking()
を呼び出すと別のスレッド (Thread2) が作られます。
Thread2 上での処理が完了するまで Thread1 は遊んだ状態になります。
なるべく下の層で blocking()
を使うのが良いでしょう。
次に、blocking()
を並列にたくさん呼びすぎないこと。
ブロッキング処理用に作られるスレッド数には事実上制限がないため、一時に多くの blocking()
を呼び出すと作ることができるスレッドの限度を超え、 OutOfMemory エラーが発生します*3。
Scala 2.12 では上記二点が改善されています。
ネストした blocking()
呼び出しにより余計にスレッドが作られてしまうことはありません。
スレッド数は 256 が最大に設定されており、 scala.concurrent.context.maxExtraThreads
システムプロパティで変更できます*4。
接続先ごとに ExecutionContext
を利用するのがベストプラクティスだと思いますが、小さい単位の関数を書いていると、実行時に使われる ExecutionContext
(BlockContext
) が限定できないという状況があると思います。
ブロッキングしなければならないとき*5には blocking()
を使うと、global
を使った場合でも全スレッドをブロックしてしまう事象が置きづらく、良いと思います。
*1:ThreadLocal や Thread.currentThread が参照されます。詳しくは BlockContext.current を参照。
*2:BlockContext.DefaultBlockContext、Scala 2.12 では BlockContext.defaultBlockContext を参照
*3:参考: blockingとOOM