この記事は Scala Advent Calendar 2018の25日目です。
メリークリスマス、中途三年目の堀越です。
以前から関心のあった Monix ですがようやく重い腰を上げ、
アドカレドリブンで学習しました。
話すこと
Monix Task をざっと学習したのですがその中で、エラーハンドリングやリトライ処理について注力してお話します。
とはいうものの、Monix Task の主な使い方については麻植さんのスライドが公開されていたり、昨年度のアドカレにも記事があったりといった状況でございます。
www.slideshare.net
他の記事と内容が被る箇所もあると思いますが、ご了承くださいませ。
教材
公式ドキュメントのバージョン 3x系
を教材としました。
2x系
を前提に読んでいると間違った解釈になる危険がありますので、
そのあたりはご留意ください。
リリースの記事に書いてないけど、3.0.0-RC2から `Task.apply` が非同期評価から同期評価に変わっておる…😇他にもなんかありそう…https://t.co/1v46D6rWsN
— Taisuke OE (@OE_uia) 2018年11月9日
落ち着いて確認してからしゃべらないと、間違った知識を広めてしまいそうだな。無理してアンカンファレンスに合わせない方が良さそう。
Monix Task おさらい
Task は遅延評価や非同期処理を制御するのに効果的なデータ型です。
副作用の抑制、非決定性、コールバック地獄の回避に役立ちますとのことです。
Task の宣言と Task.runAsync による非同期処理実行
非同期処理 を行う Task を宣言します。宣言した時点では処理は行われなません。
runAsync
を実行することで非同期処理を実行し、コールバックで計算結果に対する処理を定義します。
scala> :paste // Entering paste mode (ctrl-D to finish) import monix.execution.Scheduler.Implicits.global import monix.execution.CancelableFuture import monix.eval.Task import scala.util.{Success, Failure} ... scala> val task = Task { println("proccess"); "result" } task: monix.eval.Task[String] = Task.Eval$958847958 scala> task.runAsync { result => | result match { | case Right(v) => println(v) | case Left(e) => println(s"error: ${e.getMessage}") | } | } proccess // 非同期処理 result // 評価結果 res1: monix.execution.Cancelable = monix.execution.Cancelable.empty
Task.runToFuture で評価済み結果を受け取る
非同期処理の評価済み結果を受け取る場合は、 runToFuture
を使います。
scala> val future = task.runToFuture proccess future: monix.execution.CancelableFuture[String] = monix.execution.CancelableFuture$Pure@496dcf80 scala> future.foreach(println) result
CancelableFuture は Future のサブクラスにあたるので、.map
で別の処理に繋げたり、 Await.result
でブロッキングな実装ができますが、APIドキュメントにはアンチパターンとして取り上げられていますので注意しましょう。
Cancelable.cancel で非同期処理のキャンセル
runAsync
、 runToFuture
の結果は Cancelable
というデータ型になります。
Cancelable
は非同期処理をキャンセル、アクティブなデータソースが保持したリソースを解放を実行する trait です。
package monix.execution trait Cancelable { def cancel(): Unit }
使い方は下記のような感じ。
scala> val task = Task { println("delay task") }.delayExecution(5.seconds) task: monix.eval.Task[Unit] = Task.FlatMap$1655570345 scala> task.runAsync(Callback.empty) res33: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@1937aa17 scala> .cancel // "delay task" は出力されない scala> task.runToFuture res35: monix.execution.CancelableFuture[Unit] = Async(Future(<not completed>),monix.eval.internal.TaskConnection$Impl$$anon$1@51cfd978) scala> .cancel // "delay task" は出力されない
Task.raiseError で例外を明示的に扱う
Task に明示的に例外を指定する際は、 raiseError
を使います。
scala> val rError = Task.raiseError(new RuntimeException("error")) rError: monix.eval.Task[Nothing] = Task.Error(java.lang.RuntimeException: error) scala> rError.runAsync(println) Left(java.lang.RuntimeException: error) res40: monix.execution.Cancelable = monix.execution.Cancelable.empty
Task.executeOn で Scheduler を指定する
executeOn
を使用して Scheduler を指定することができます。
Scheduler についてはまだキャッチアップしきれていないので詳細は割愛しますが、 ExecutionContext
の拡張したもの?というように捉えています。
なお、デフォルトでは monix.execution.Scheduler.Implicits.global
に定義されているものが適用されます。
Scheduler の作成は下記のような感じです。
scala> import monix.execution.Scheduler import monix.execution.Scheduler scala> lazy val myScheduler = Scheduler.io(name="my-scheduler") myScheduler: monix.execution.schedulers.SchedulerService = <lazy>
Scheduler の指定を行います。
scala> val task = Task(println(s"スレッド名:${Thread.currentThread.getName} にて実行されました。")) task: monix.eval.Task[Unit] = Task.Eval$921447116 scala> val myTask = task.executeOn(myScheduler lazy val myScheduler: monix.execution.schedulers.SchedulerService scala> val myTask = task.executeOn(myScheduler) myTask: monix.eval.Task[Unit] = Task.Async$1328381637 scala> for { | _ <- task | _ <- myTask | } yield (()) res0: monix.eval.Task[Unit] = Task.FlatMap$1326879076 scala> .runToFuture スレッド名:run-main-0 にて実行されました。 // デフォルトのスレッド(monix.execution.Scheduler.Implicits.global スレッド名:my-scheduler-51 にて実行されました。 // 独自に作成したスレッド res1: monix.execution.CancelableFuture[Unit] = Async(Future(Success(())),monix.eval.internal.TaskConnection$Impl$$anon$1@6d215d88)
Task.memoize で評価した値を再利用
Task.memorize は初回に評価した結果をメモし、次回以降の複数回の呼び出しに対し評価結果を再利用します。
scala> val task = Task { println("process"); "result" } task: monix.eval.Task[String] = Task.Eval$1390606472 scala> val memorized = task.memoize memorized: monix.eval.Task[String] = Task.Async$966291178 scala> memorized.runToFuture.foreach(println) process result scala> memorized.runToFuture.foreach(println) result
Task.evalOnce
を利用した場合も同様の挙動になります。
scala> val task = Task.evalOnce{ println("process"); "result" } task: monix.eval.Task[String] = Task.Eval$1065253276 scala> task.runToFuture.foreach(println) process result scala> task.runToFuture.foreach(println) result
・・・
主要な機能のおさらいはざっとこんな感じでしょうか...。
エラーハンドリング
本題へ入っていきます。
Task.runAsync のコールバック
runAsync
のシグネチャは下記の通りです。
final def runAsync(cb: Either[Throwable, A] => Unit)(implicit s: Scheduler): Cancelable
Task が非同期処理を実行してる最中に発生した例外は、Left[Throwable]
を受け取って処理できます。
scala> val errorTask = Task(throw new IllegalStateException("error")) errorTask: monix.eval.Task[Nothing] = Task.Eval$433111269 scala> errorTask.runAsync { | case Right(_) => () | case Left(e) => println(e) | } java.lang.IllegalStateException: error res38: monix.execution.Cancelable = monix.execution.Cancelable.empty
runAsync
のコールバック引数にて例外が発生した場合、その例外をキャッチできないため注意が必要です。
scala> errorTask.runAsync(r => throw new IllegalStateException) java.lang.IllegalStateException at .$anonfun$res39$1(<console>:21) at monix.execution.Callback$$anon$3.onError(Callback.scala:146) at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:261) at monix.eval.Task.runAsyncOptF(Task.scala:798) at monix.eval.Task.runAsyncOpt(Task.scala:700) at monix.eval.Task.runAsync(Task.scala:652) ... 36 elided
doOnFinish を使う
doOnFinish
はTask に定義した非同期処理の完了に伴って発火する処理を適応します。Scala#Future の andThen
に似ていると感じました。
doOnFinish
の結果を runAsync
, runToFuture
つなげることはできませんので注意が必要です。
scala> errorTask.doOnFinish(r => Task(println("例外発生したよ"))) res50: monix.eval.Task[Nothing] = Task.FlatMap$194172761 scala> errorTask.runAsync(r => println(r)) 例外発生したよ Left(java.lang.IllegalStateException: error) res51: monix.execution.Cancelable = monix.execution.Cancelable.empty scala> errorTask.runToFuture 例外発生したよ res52: monix.execution.CancelableFuture[Nothing] = monix.execution.CancelableFuture$Pure@5c0e1846
タイムアウトを発生させる
非同期処理に時間がかかる場合にタイムアウトさせることはよくありますが、Task.timeout
を利用することができます。
import scala.concurrent.duration._ import scala.concurrent.TimeoutException scala> val delayTask = Task("delay delay delay!!!").delayExecution(10.seconds) delayTask: monix.eval.Task[String] = Task.FlatMap$1924402182 scala> val maybeTimeout = delayTask.timeout(3.seconds) maybeTimeout: monix.eval.Task[String] = Task.FlatMap$2106463742 scala> maybeTimeout.runAsync(r => println(r)) res55: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@35e45bfe Left(java.util.concurrent.TimeoutException: Task timed-out after 3 seconds of inactivity)
Task.timeoutTo
を使うと、タイムアウト発生時に別のフォールバックタスクへの差し替えが可能になります。
scala> val maybeUnit = delayTask.timeoutTo( | 3.seconds, | Task(println("タイムアウトエラーになりました、処理を終了します。")) | ) maybeUnit: monix.eval.Task[Any] = Task.FlatMap$742751156 scala> maybeUnit.runToFuture res56: monix.execution.CancelableFuture[Any] = Async(Future(<not completed>),monix.eval.internal.TaskConnection$Impl$$anon$1@5f9d8617) scala> タイムアウトエラーになりました、処理を終了します。
エラーのリカバリング
Task.onErrorHandleWith
をは例外をパターンマッチングしてフォールバックタスクへの差し替えを行う操作です。
下記のようにしてエラーのリカバリングを実現します。
import scala.concurrent.duration._ import scala.concurrent.TimeoutException val timeoutTask = Task("maybe timeout") | .delayExecution(10.seconds) | .timeout(3.seconds) timeoutTask: monix.eval.Task[String] = Task.FlatMap$1455727723 scala> timeoutTask.onErrorHandleWith { | case _: TimeoutException => Task.now("ホイミ!!!") | case other => Task.raiseError(other) | } res59: monix.eval.Task[String] = Task.FlatMap$1663681506 scala> .runAsync(r => println(r)) res62: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@8746de7 scala> Right(ホイミ!!!)
Task.onErrorRecoverWith
を使うと上記の case other => Task.raiseError(other)
のパターンを省略できます。
import scala.concurrent.duration._ import scala.concurrent.TimeoutException val timeoutTask = Task("maybe timeout") | .delayExecution(10.seconds) | .timeout(3.seconds) timeoutTask: monix.eval.Task[String] = Task.FlatMap$1455727723 cala> timeoutTask.onErrorRecoverWith { | case _: TimeoutException => Task.now("ホイミ!!!") | } res63: monix.eval.Task[String] = Task.FlatMap$48995740 scala> .runToFuture.foreach(println) scala> ホイミ!!!
上記のショートカット操作となる Task.onErrorHandle
, Task. onErrorRecover
がありますのでご参考までに。
リトライ処理
非同期プログラムの実装で避けて通れないのがリトライ処理です。
Monix にはリトライ処理を考慮した機能が備わっているなと感じました。
エラー時のリトライ回数を指定する
Task.onErrorRestart
はエラー発生時の最大リトライ回数を指定することができます。
リトライ処理をデフォルトの機能として提供してもらえているのはありがたいです。
scala> val errorTask = Task.raiseError(new IllegalStateException("error")) errorTask: monix.eval.Task[Nothing] = Task.Error(java.lang.IllegalStateException: error) scala> errorTask.doOnFinish { | case Some(e) => Task(println(s"${e.getMessage}")) | case _ => Task.unit | } res13: monix.eval.Task[Nothing] = Task.FlatMap$1357591566 scala> .onErrorRestart(maxRetries = 4) res14: monix.eval.Task[Nothing] = Task.FlatMap$453432794 scala> .runToFuture error error error error error res15: monix.execution.CancelableFuture[Nothing] = monix.execution.CancelableFuture$Pure@4ad3e1f@4638df6a
パターンマッチしてリトライする
Task.onErrorRestartIf
を使えばパターンマッチしてリトライをトリガーする条件を指定できます。
import scala.util.Random scala> val task = Task(Random.nextInt).flatMap { | case n if n % 2 == 0 => Task.now(n) | case _ => Task.raiseError(new IllegalStateException("error")) | } task: monix.eval.Task[Int] = Task.FlatMap$228744859 scala> val maybeRandomInt = task.onErrorRestartIf { | case _: IllegalStateException => true | case _ => false | } maybeRandomInt: monix.eval.Task[Int] = Task.FlatMap$1111296429
Exponential Backoff の実装
Exponential Backoff はリトライの間隔を徐々に広げていくというものです。
Task.onErrorHandleWith
, Task.delayExecution
, Task.raiseError
と再帰処理を組み合わせることで Exponential Backoff を独自実装できます。
def retryBackoff[A](source: Task[A], maxRetries: Int, firstDelay: FiniteDuration): Task[A] = { source.onErrorHandleWith { case ex: Exception => if (maxRetries > 0) retryBackoff(source, maxRetries-1, firstDelay*2) .delayExecution(firstDelay) else Task.raiseError(ex) } }
使い方としては引数に、実行したい非同期処理、リトライの最大回数、リトライ間隔の初期値を指定するだけです。
この実装は、ドキュメントにサンプル実装として紹介されているものですが使い回しが効くしかなりいい感じじゃないですか?
終わりに
内容を振り返ってみるとタイトルに記載しているエラーハンドリング、リトライ処理ではなくおさらいのボリュームがかなり膨らんでしまいました。
既に Monix をゴリゴリ触っているよっていう人にはちょっと退屈な内容だったかもしれませんが、これから Monix を触るよという人の参考になれば幸いです。
では、良いお年を。