中途三年目、堀越です。
突然ではありますがわたくし、並行プログラミングについて学習しております。その活動の一部として絶版になっている「Java並行プログラミング」読んでいる最中であります。
普段 Scala を書くことが多いわたしにとっては6章がとても親近感のある内容でしたので、前・後編に分けて紹介したいと思います(一回でうまくまとめきれなかった)。
並行処理におけるタスクの定義
タスクとはそれぞれが独立した活動であり、他のタスクの状態や結果に依存しない仕事です。
また、こういったタスクは並行処理に向いているということでした。
アプリケーションの仕事を複数のタスクに分割するとどうなるか?
- プログラムが単純になる。
- エラーリカバリしやすくなる。
- 並行処理しやすくなる。
アプリケーションの振る舞いとして求められること
- 正常な負荷状態での良いスループットとレイテンシ。
- 過負荷になったときの穏やかな処理低下。
このあたりは要求次第だったりするのかなとは思うのですが、プログラムの基本機能だと思うのでしっかりと押さえておきたいところです。別に並行プログラミングに限った話ではないと思いました。
逐次的な実装
簡単なWebサーバーの実装例が紹介されていました。Scalaで書き直したものを掲載します。
object SingleThreadWebServer { def main(args: Array[String]): Unit = { val socket = new ServerSocket(80) while (true) { socket.accept() println("Hello SingleThreadWebServer!!!") } } }
このようなプログラムには下記のような特徴があるということでした。
- シンプルで理論的には正しいが、一度にひとつのリクエストしか扱えないので性能が悪い。
- ひとつのリクエストでI/Oが入り混じっている場合、接続に問題があるとブロックし後続のタスクは処理されない。
- スループットもレイテンシも悪い。
薄々、勘付いていた方もおられるかとは思いますがやはり性能的に問題があります。
タスクのためにスレッドを明示的に作る
応答性を良くするために各タスクの実行の度にスレッドを作成するアプローチの紹介です。同じく Scala でリライトしたものを掲載します。
object ThreadPerTaskWebServer { def main(args: Array[String]): Unit = { val socket = new ServerSocket(80) while (true) { socket.accept() val task = new Runnable { override def run(): Unit = println("Hello ThreadPerTaskWebServer!!!") } new Thread(task).start() } } }
1タスクに対し1スレッド作成するメリット
レイテンシの向上
タスクの処理をメインスレッドが担当しないので、メインループはすぐに次の接続の受付を開始できるようになります。新しい接続を前のリクエストが完了する前に受け付けるので、応答性の向上が期待できるということでした。
スループットの向上
タスクを並列に処理できるので、複数のリクエストに対し同時にサービスできるようになります。これによって、プロセッサが複数あればスループットが良くなります。また、タスクがI/Oの完了やロックの取得、リソースの空きなどを待ってブロックするときもスループットが改善されるということでした。
1タスクに対し1スレッド作成する際の注意点
- タスクを扱うコードはスレッドセーフである必要がある。
- リクエストが到着する頻度〜速度がサーバーの処理能力を超えていないこと。
上記の注意点を守ることで良いレイテンシとスループットを実現できるのかなという印象ですが、ちょっと微妙ですね。
制限の無いスレッドの欠点
- スレッド作成のオーバーヘッドが発生。
- 実行スレッドが可利用なプロセッサより多いとスレッドは停滞する。
- アイドル状態のスレッドが増えるとCPUの食い合いが起こり、サーバー能力の低下に繋がる。
- 作れるスレッドには上限があるので、それを超えると OutOfMemoryError となる。
コンピューティングリソースは有限なのでなので計画的な利用をしないと当然破綻してしまいますよね。そのあたりも踏まえて戦略的に実行計画を組む必要があるのでやはりこのアプローチもあまり良いものとは言えません。
Executor フレームワークを利用する
java.util.concurrent パッケージの Executor フレームワークを使うとスレッドを管理できるようになります。Executor には柔軟な使い方ができる既製のスレッドプールの実装があるので後ほど紹介します。
Executor を使うことでプロデューサー・コンシューマー方式での設計を楽に実現できるということでした。
Executor を使ったWebサーバ
Executor を使ったWebサーバーのサンプルプログラムです。
import java.net.ServerSocket import java.util.concurrent.{Executor, Executors} object TaskExecutionWebServer { private final lazy val NThreads: Int = 100 private final lazy val exec: Executor = Executors.newFixedThreadPool(NThreads) def main(args: Array[String]): Unit = { val socket = new ServerSocket(80) while (true) { socket.accept() def task = new Runnable { override def run(): Unit = { println("Hello TaskExecutionWebServer!!!") } } exec.execute(task) } } }
これは、容量100スレッドの固定サイズのスレッドプールを使った標準的な実装です。このにはリクエスト処理のタスクを依頼することと、その実行が Executor を使うことで分離されているという特徴があります。
タスクの依頼と実行を分離する価値
タスクの依頼と実行を分離することで、一定の種類のタスクの実行ポリシーを簡単に指定できる、その後の変更も難工事無しでできるといったメリットがあるようです。
実行ポリシー
実行ポリシーについて具体的に見ていきます。実行ポリシーとはタスク実行の "what, wehere, when, how" の定義のことです。
- タスクをどのスレッドで実行するのか?
- タスクをどんな順序で実行するのか(FIFOか?LIFOか?プライオリティ順か?)
- 並列に実行できるタスクはいくつか?
- 待ち行列(キュー)に並ぶタスクの最大数はいくつか?
- 過負荷のためにタスクを拒絶するときは、どんな基準で犠牲者を選ぶのか?アプリケーションへの通知はどうやるのか?
- タスクを実行する前と実行した後でどんなアクションを行うべきか?
これらを指定することが実行ポリシーとなります。
スレッドプール
スレッドプールとは
その名の通りワーカースレッドの均質なプールを管理する役割があります。スレッドプールを使うことで新たにスレッドを作成するのではなく既存のスレッドを再利用するので応答性が良くなるようです。
既製のスレッドプールを使う
スレッドプールの既製の構成を使うには Executor クラスの static ファクトリメソッドのどれかを使うことができます。
メソッド | 説明 |
---|---|
.newFixedThreadPool | タスクの依頼と同時に固定サイズのスレッドを最大プールサイズまで作成する。 |
.newCachedThreadPool | プールサイズが処理要求を超えていたらスレッドを減らし、要求が増えたらあらあにスレッドを加える。サイズ制限はない。 |
.newSingleThreadPool | 一つのワーカースレッドを作ってタスクを処理する。 タスクはタスクキューの順序(FIFO, LIFO, プライオリティ順)に従って逐次的に処理される。 |
.newScheduledThreadPool | タスクの遅延と周期的な実行をサポートする固定サイズのスレッドプール。 |
ユースケースに応じて応じてお好みのスレッドプールを利用できると良さそうです。
まとめ
- タスクとはそれぞれが独立した活動で、他のタスクのス状態や結果に依存しない仕事のこと。
- 逐次的な処理を行うプログラムは性能が悪い。
- 制限の無いスレッドの作成は結果的にアプリケーションの性能劣化を招く。
- Executor を利用することでスレッドを管理できる。
- ユースケースに応じて既製のスレッドプールを使おう。
最後に
ExecutionContext#fromExecutor(...) といったエントリがあるように、普段何気なく利用している Future や ExecutionContext の裏側では Executor が頑張ってくれているのだなということがわかりました。理解を深めて雰囲気で触るのは卒業したいと思います。
後半では、Executor を拡張した ExecutorService や Future(Java のやつ)、Callable などについて触れていきたいと思います。