FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

scala.concurrent.blocking()

こんにちは。3月にセプテーニ・オリジナルに入社した河内です。 新しい環境でフレッシュな気持ちで業務に取り組んでおります。

さて scala.concurrent.blocking() は全く新しくない話ですが、あまり知られていないのかな、と感じるので書いておきます。 scala.concurrent.blocking()ブロッキング処理をマークするのに使います。

実行中のスレッドに関連付けられた *1 BlockContext によっていい感じに取り扱われます。 デフォルトの BlockContext は渡された引数を単に評価するだけです*2

BlockContext.withBlockContext() を使うと、指定した BlockContext を使ってブロッキング処理を扱うことができます。 次の例では blocking 前に log を出す BlockContext を作って利用しています。withBlockContext() で指定することで blocking() で囲まれた処理を実行する前に log が出力されます。

    val logContext = new BlockContext {
      override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
        println("Start blocking")
        thunk
      }
    }

    BlockContext.withBlockContext(logContext) {
      blocking {
        println("Blocking code")
      }
    }

withBlockContext() を使って BlockContext を自分で指定することは少ないと思います。おそらく最もよく出てくる場面は ExecutionContext.Implicits.global (以下 global) の上で Future を走らせるときではないでしょうか。

globaljava.util.concurrent.ForkJoinPool を用いて実装されています。ForkJoinPool は限られたスレッドを使ってCPUを効率的に利用する仕組みですが、ブロッキング処理が混ざるとCPUを効率的に利用できません。そこで登場するのが ForkJoinPool.ManagedBlocker です。この中でブロッキング処理を実行すると、専用のスレッドが起動されるため、元々の限られたスレッドは別の計算を行うために引き続き利用できます。

blocking() を使わずに global 上でブロッキング処理を行うと、CPUを効率的に利用できません。 global 上で blocking() を実行すると ForkJoinPool.ManagedBlocker を利用して内部の処理を行います。 これにより global 上でも CPU を効率的に利用できる環境が保てます。

デフォルトではそのまま実行され、global などの環境では専用スレッドを起こしてくれるので、ブロッキング処理は blocking() で囲みましょう。。。

で、話が終わればいいのですが、 Scala 2.11 以前では気をつけなければならない事項があります。

まず、できるだけ blocking() の中で blocking() を呼ばないこと。 global では blocking() を呼び出したときにスレッド (Thread1) が作られ、その中で再度 blocking() を呼び出すと別のスレッド (Thread2) が作られます。 Thread2 上での処理が完了するまで Thread1 は遊んだ状態になります。 なるべく下の層で blocking() を使うのが良いでしょう。

次に、blocking() を並列にたくさん呼びすぎないこと。 ブロッキング処理用に作られるスレッド数には事実上制限がないため、一時に多くの blocking() を呼び出すと作ることができるスレッドの限度を超え、 OutOfMemory エラーが発生します*3

Scala 2.12 では上記二点が改善されています。 ネストした blocking() 呼び出しにより余計にスレッドが作られてしまうことはありません。 スレッド数は 256 が最大に設定されており、 scala.concurrent.context.maxExtraThreads システムプロパティで変更できます*4

接続先ごとに ExecutionContext を利用するのがベストプラクティスだと思いますが、小さい単位の関数を書いていると、実行時に使われる ExecutionContext (BlockContext) が限定できないという状況があると思います。 ブロッキングしなければならないとき*5には blocking() を使うと、global を使った場合でも全スレッドをブロックしてしまう事象が置きづらく、良いと思います。

*1:ThreadLocal や Thread.currentThread が参照されます。詳しくは BlockContext.current を参照。

*2:BlockContext.DefaultBlockContext、Scala 2.12 では BlockContext.defaultBlockContext を参照

*3:参考: blockingとOOM

*4:参考: Futures in Scala 2.12 (part 7)

*5:避けられる限りブロッキングしないようにしましょう