FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

Monix Task のエラーハンドリングやリトライ処理

この記事は Scala Advent Calendar 2018の25日目です。

メリークリスマス、中途三年目の堀越です。

以前から関心のあった Monix ですがようやく重い腰を上げ、
アドカレドリブンで学習しました。

monix.io

話すこと

Monix Task をざっと学習したのですがその中で、エラーハンドリングやリトライ処理について注力してお話します。

とはいうものの、Monix Task の主な使い方については麻植さんのスライドが公開されていたり、昨年度のアドカレにも記事があったりといった状況でございます。

www.slideshare.net

qiita.com

他の記事と内容が被る箇所もあると思いますが、ご了承くださいませ。

教材

公式ドキュメントのバージョン 3x系 を教材としました。

monix.io

monix.io

2x系 を前提に読んでいると間違った解釈になる危険がありますので、
そのあたりはご留意ください。

Monix Task おさらい

Task は遅延評価や非同期処理を制御するのに効果的なデータ型です。
副作用の抑制、非決定性、コールバック地獄の回避に役立ちますとのことです。

Task の宣言と Task.runAsync による非同期処理実行

非同期処理 を行う Task を宣言します。宣言した時点では処理は行われなません。
runAsync を実行することで非同期処理を実行し、コールバックで計算結果に対する処理を定義します。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monix.execution.Scheduler.Implicits.global
import monix.execution.CancelableFuture
import monix.eval.Task
import scala.util.{Success, Failure}

...

scala> val task = Task { println("proccess"); "result" }
task: monix.eval.Task[String] = Task.Eval$958847958

scala> task.runAsync { result =>
     |   result match {
     |     case Right(v) => println(v)
     |     case Left(e) => println(s"error: ${e.getMessage}")
     |   }
     | }
proccess  // 非同期処理
result  // 評価結果
res1: monix.execution.Cancelable = monix.execution.Cancelable.empty

Task.runToFuture で評価済み結果を受け取る

非同期処理の評価済み結果を受け取る場合は、 runToFuture を使います。

scala> val future = task.runToFuture
proccess
future: monix.execution.CancelableFuture[String] = monix.execution.CancelableFuture$Pure@496dcf80

scala> future.foreach(println)
result

CancelableFuture は Future のサブクラスにあたるので、.map で別の処理に繋げたり、 Await.resultブロッキングな実装ができますが、APIドキュメントにはアンチパターンとして取り上げられていますので注意しましょう。

[https://monix.io/api/3.0/monix/eval/Task.html#runToFuture(implicits:monix.execution.Scheduler):monix.execution.CancelableFuture[A]]

Cancelable.cancel で非同期処理のキャンセル

runAsyncrunToFuture の結果は Cancelable というデータ型になります。
Cancelable は非同期処理をキャンセル、アクティブなデータソースが保持したリソースを解放を実行する trait です。

package monix.execution

trait Cancelable {
  def cancel(): Unit
}

使い方は下記のような感じ。

scala> val task = Task { println("delay task") }.delayExecution(5.seconds)
task: monix.eval.Task[Unit] = Task.FlatMap$1655570345

scala> task.runAsync(Callback.empty)
res33: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@1937aa17

scala> .cancel
// "delay task" は出力されない

scala> task.runToFuture
res35: monix.execution.CancelableFuture[Unit] = Async(Future(<not completed>),monix.eval.internal.TaskConnection$Impl$$anon$1@51cfd978)

scala> .cancel
// "delay task" は出力されない

Task.raiseError で例外を明示的に扱う

Task に明示的に例外を指定する際は、 raiseError を使います。

scala> val rError = Task.raiseError(new RuntimeException("error"))
rError: monix.eval.Task[Nothing] = Task.Error(java.lang.RuntimeException: error)

scala> rError.runAsync(println)
Left(java.lang.RuntimeException: error)
res40: monix.execution.Cancelable = monix.execution.Cancelable.empty

Task.executeOn で Scheduler を指定する

executeOn を使用して Scheduler を指定することができます。

Scheduler についてはまだキャッチアップしきれていないので詳細は割愛しますが、 ExecutionContext の拡張したもの?というように捉えています。

なお、デフォルトでは monix.execution.Scheduler.Implicits.global に定義されているものが適用されます。

Scheduler の作成は下記のような感じです。

scala> import monix.execution.Scheduler
import monix.execution.Scheduler

scala> lazy val myScheduler = Scheduler.io(name="my-scheduler")
myScheduler: monix.execution.schedulers.SchedulerService = <lazy>

Scheduler の指定を行います。

scala> val task = Task(println(s"スレッド名:${Thread.currentThread.getName} にて実行されました。"))
task: monix.eval.Task[Unit] = Task.Eval$921447116

scala> val myTask = task.executeOn(myScheduler
   lazy val myScheduler: monix.execution.schedulers.SchedulerService

scala> val myTask = task.executeOn(myScheduler)
myTask: monix.eval.Task[Unit] = Task.Async$1328381637

scala> for {
     | _ <- task
     | _ <- myTask
     | } yield (())
res0: monix.eval.Task[Unit] = Task.FlatMap$1326879076

scala> .runToFuture
スレッド名:run-main-0 にて実行されました。  // デフォルトのスレッド(monix.execution.Scheduler.Implicits.global
スレッド名:my-scheduler-51 にて実行されました。  // 独自に作成したスレッド
res1: monix.execution.CancelableFuture[Unit] = Async(Future(Success(())),monix.eval.internal.TaskConnection$Impl$$anon$1@6d215d88)

Task.memoize で評価した値を再利用

Task.memorize は初回に評価した結果をメモし、次回以降の複数回の呼び出しに対し評価結果を再利用します。

scala> val task = Task { println("process"); "result" }
task: monix.eval.Task[String] = Task.Eval$1390606472

scala> val memorized = task.memoize
memorized: monix.eval.Task[String] = Task.Async$966291178

scala> memorized.runToFuture.foreach(println)
process
result

scala> memorized.runToFuture.foreach(println)
result

Task.evalOnce を利用した場合も同様の挙動になります。

scala> val task = Task.evalOnce{ println("process"); "result" }
task: monix.eval.Task[String] = Task.Eval$1065253276

scala> task.runToFuture.foreach(println)
process
result

scala> task.runToFuture.foreach(println)
result

・・・

主要な機能のおさらいはざっとこんな感じでしょうか...。

エラーハンドリング

本題へ入っていきます。

Task.runAsync のコールバック

runAsyncシグネチャは下記の通りです。

final def runAsync(cb: Either[Throwable, A] => Unit)(implicit s: Scheduler): Cancelable

Task が非同期処理を実行してる最中に発生した例外は、Left[Throwable] を受け取って処理できます。

scala> val errorTask = Task(throw new IllegalStateException("error"))
errorTask: monix.eval.Task[Nothing] = Task.Eval$433111269

scala> errorTask.runAsync {
     |   case Right(_) => ()
     |   case Left(e) => println(e)
     | }
java.lang.IllegalStateException: error
res38: monix.execution.Cancelable = monix.execution.Cancelable.empty

runAsync のコールバック引数にて例外が発生した場合、その例外をキャッチできないため注意が必要です。

scala> errorTask.runAsync(r => throw new IllegalStateException)
java.lang.IllegalStateException
  at .$anonfun$res39$1(<console>:21)
  at monix.execution.Callback$$anon$3.onError(Callback.scala:146)
  at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:261)
  at monix.eval.Task.runAsyncOptF(Task.scala:798)
  at monix.eval.Task.runAsyncOpt(Task.scala:700)
  at monix.eval.Task.runAsync(Task.scala:652)
  ... 36 elided

doOnFinish を使う

doOnFinish はTask に定義した非同期処理の完了に伴って発火する処理を適応します。Scala#Future の andThen に似ていると感じました。

doOnFinish の結果を runAsync, runToFuture つなげることはできませんので注意が必要です。

scala> errorTask.doOnFinish(r => Task(println("例外発生したよ")))
res50: monix.eval.Task[Nothing] = Task.FlatMap$194172761

scala> errorTask.runAsync(r => println(r))
例外発生したよ
Left(java.lang.IllegalStateException: error)
res51: monix.execution.Cancelable = monix.execution.Cancelable.empty

scala> errorTask.runToFuture
例外発生したよ
res52: monix.execution.CancelableFuture[Nothing] = monix.execution.CancelableFuture$Pure@5c0e1846

タイムアウトを発生させる

非同期処理に時間がかかる場合にタイムアウトさせることはよくありますが、Task.timeout を利用することができます。

import scala.concurrent.duration._
import scala.concurrent.TimeoutException

scala> val delayTask = Task("delay delay delay!!!").delayExecution(10.seconds)
delayTask: monix.eval.Task[String] = Task.FlatMap$1924402182

scala> val maybeTimeout = delayTask.timeout(3.seconds)
maybeTimeout: monix.eval.Task[String] = Task.FlatMap$2106463742

scala> maybeTimeout.runAsync(r => println(r))
res55: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@35e45bfe

Left(java.util.concurrent.TimeoutException: Task timed-out after 3 seconds of inactivity)

Task.timeoutTo を使うと、タイムアウト発生時に別のフォールバックタスクへの差し替えが可能になります。

scala> val maybeUnit = delayTask.timeoutTo(
     |   3.seconds,
     |   Task(println("タイムアウトエラーになりました、処理を終了します。"))
     | )
maybeUnit: monix.eval.Task[Any] = Task.FlatMap$742751156

scala> maybeUnit.runToFuture
res56: monix.execution.CancelableFuture[Any] = Async(Future(<not completed>),monix.eval.internal.TaskConnection$Impl$$anon$1@5f9d8617)

scala> タイムアウトエラーになりました、処理を終了します。

エラーのリカバリング

Task.onErrorHandleWith をは例外をパターンマッチングしてフォールバックタスクへの差し替えを行う操作です。

下記のようにしてエラーのリカバリングを実現します。

import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val timeoutTask = Task("maybe timeout")
     |   .delayExecution(10.seconds)
     |   .timeout(3.seconds)
timeoutTask: monix.eval.Task[String] = Task.FlatMap$1455727723

scala> timeoutTask.onErrorHandleWith {
     |   case _: TimeoutException => Task.now("ホイミ!!!")
     |   case other => Task.raiseError(other)
     | }
res59: monix.eval.Task[String] = Task.FlatMap$1663681506

scala> .runAsync(r => println(r))
res62: monix.execution.Cancelable = monix.execution.Cancelable$CancelableTask@8746de7

scala> Right(ホイミ!!!)

Task.onErrorRecoverWith を使うと上記の case other => Task.raiseError(other) のパターンを省略できます。

import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val timeoutTask = Task("maybe timeout")
     |   .delayExecution(10.seconds)
     |   .timeout(3.seconds)
timeoutTask: monix.eval.Task[String] = Task.FlatMap$1455727723

cala> timeoutTask.onErrorRecoverWith {
     |   case _: TimeoutException => Task.now("ホイミ!!!")
     | }
res63: monix.eval.Task[String] = Task.FlatMap$48995740

scala> .runToFuture.foreach(println)

scala> ホイミ!!!

上記のショートカット操作となる Task.onErrorHandle, Task. onErrorRecover がありますのでご参考までに。

リトライ処理

非同期プログラムの実装で避けて通れないのがリトライ処理です。
Monix にはリトライ処理を考慮した機能が備わっているなと感じました。

エラー時のリトライ回数を指定する

Task.onErrorRestart はエラー発生時の最大リトライ回数を指定することができます。
リトライ処理をデフォルトの機能として提供してもらえているのはありがたいです。

scala> val errorTask = Task.raiseError(new IllegalStateException("error"))
errorTask: monix.eval.Task[Nothing] = Task.Error(java.lang.IllegalStateException: error)

scala> errorTask.doOnFinish {
     |   case Some(e) => Task(println(s"${e.getMessage}"))
     |   case _ => Task.unit
     | }
res13: monix.eval.Task[Nothing] = Task.FlatMap$1357591566

scala> .onErrorRestart(maxRetries = 4)
res14: monix.eval.Task[Nothing] = Task.FlatMap$453432794

scala> .runToFuture
error
error
error
error
error
res15: monix.execution.CancelableFuture[Nothing] = monix.execution.CancelableFuture$Pure@4ad3e1f@4638df6a

パターンマッチしてリトライする

Task.onErrorRestartIf を使えばパターンマッチしてリトライをトリガーする条件を指定できます。

import scala.util.Random

scala> val task = Task(Random.nextInt).flatMap {
     |   case n if n % 2 == 0 => Task.now(n)
     |   case _ => Task.raiseError(new IllegalStateException("error"))
     | }
task: monix.eval.Task[Int] = Task.FlatMap$228744859

scala> val maybeRandomInt = task.onErrorRestartIf {
     |   case _: IllegalStateException => true
     |   case _ => false
     | }
maybeRandomInt: monix.eval.Task[Int] = Task.FlatMap$1111296429

Exponential Backoff の実装

Exponential Backoff はリトライの間隔を徐々に広げていくというものです。

yoshidashingo.hatenablog.com

Task.onErrorHandleWith, Task.delayExecution, Task.raiseError再帰処理を組み合わせることで Exponential Backoff を独自実装できます。

def retryBackoff[A](source: Task[A], maxRetries: Int, firstDelay: FiniteDuration): Task[A] = {
  source.onErrorHandleWith {
    case ex: Exception =>
      if (maxRetries > 0)
        retryBackoff(source, maxRetries-1, firstDelay*2)
          .delayExecution(firstDelay)
      else
        Task.raiseError(ex)
  }
}

使い方としては引数に、実行したい非同期処理、リトライの最大回数、リトライ間隔の初期値を指定するだけです。

この実装は、ドキュメントにサンプル実装として紹介されているものですが使い回しが効くしかなりいい感じじゃないですか?

終わりに

内容を振り返ってみるとタイトルに記載しているエラーハンドリング、リトライ処理ではなくおさらいのボリュームがかなり膨らんでしまいました。

既に Monix をゴリゴリ触っているよっていう人にはちょっと退屈な内容だったかもしれませんが、これから Monix を触るよという人の参考になれば幸いです。

では、良いお年を。