FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

Akka Http 2.0とScala.jsを試してみた

初めまして。菅野と申します。 去年10月に入社し、広告運用ツールを開発するチームに所属しています。

私は普段の開発ではPlayとTypeScriptを使用しています。 PlayやTypeScriptにはそれほど不満は感じていないのですが、その他のツールやライブラリについても気になっているものがいろいろとあります。

特にAkka HTTP 2.0とScala.jsが気になっていて、それらを試すためにToDoアプリのようなものを作ってみました。 今回はその過程を紹介したいと思います。

Akka HTTP

Akka HTTPとはAkkaを使用したHTTPサーバライブラリです。
Akka Streamsを使って実装されているため、処理はリアクティブでback-pressureにも対応できるのが特徴です。

libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.0.1"

使用するにはsbtに上記の依存を追加するだけでOKです。
experimentalとあるように、Akka HTTP2.0はまだ実験的バージョンのようです。
Akka ActorやAkka Streamsに依存しているので、それらのAPIを全て使うことができるようになります。(Akka Streamsもexperimental-2.0.1を使用することになります)

早速動かしてみたいと思います。

object MyApp extends App {

  implicit val actorSystem = ActorSystem("my-system")
  implicit val flowMaterializer = ActorMaterializer()

  import Directives._

  val route = get {
    pathEndOrSingleSlash {
      getFromFile("path/to/index.html")
    }
  } ~ path("test") {
    get {
      handleWith((a: HttpRequest) => s"your request is\n\n${a.headers.mkString("\n")}")
    }
  }

  val serverBinding = Http(actorSystem).bindAndHandle(route ,interface = "localhost", port = 8080)
}

これだけでサーバが立ち上がります。
上の例ですと、ルートにアクセスするとhtmlファイルが返され、/testにアクセスすると文字列が返されます。

ルーティングはWebサーバのようにディレクティブを組み合わせて作成します。
用意されているディレクティブの組み合わせで、普通のルーティング以外にも認証、ETagでの制御、ロギング等いろいろなことが出来ます。

Playのようなフルスタックフレームワークとは違う、とてもシンプルなライブラリです。

Scala.js

Scala.jsは名前の通り、Scalaソースコードをjsに出力するAltJSです。
宗教上の理由によりjavascriptを書くことが禁じられている場合にとても威力を発揮します。
また、jsしか書かないけどScalaの勉強をしたいフロントエンドのエンジニアにとっての救世主です。

plugins.sbtに

addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.5")

build.sbtに

enablePlugins(ScalaJSPlugin)
libraryDependencies += "org.scala-js" %%% "scalajs-dom" % "0.8.0"

を追記するだけですぐにScalaコードをjavascriptに変換できるようになります。

上記の例ではscalajs本体ではなくてscalajs-domを依存に追加していますが、
DOM操作は常にすると思いますので初めからscalajs-domだけを依存に追加すれば良いと思います。
%%%は、ScalaのバージョンとScala.jsのバージョンの2つを自動で追加するScala.jsライブラリ用のモジュールIDの記法です。

これで外部ライブラリに依存していないScalaコードなら普通にjsに出力して動かせます。
また、Scala.js対応のScalaライブラリも結構あるのでアプリを作るのに困ることはないと思います。多分。

package example
object MyJsApp extends JSApp {
  def main(): Unit = {
    println("Hello World!!")
  }
}

jsアプリを作るにはJSAppトレイトを継承してmainメソッドを実装します。Scalaのprintlnで出力している箇所はjsのコンソールに出力されます。

Scalaを書いたらsbtでfastOptJSfullOptJSというタスクを実行すればtargetディレクトリ配下にjsファイルが出力されます。
それをブラウザから実行するために下記のようなhtmlファイルを作ります。

<html>
<head>
    <script src="path/to/output.js"></script>
</head>
<body>
    <script>
        example.MyJsApp().main();
    </script>
</body>
</html>

jsファイルを読み込むだけではダメで、mainメソッドを呼んであげる必要があります。JVMではないのでmainメソッドを呼び出す仕様なんてあるわけ無いですが、私はここで少し考えてしまいました。。。

正常に実行されればブラウザの開発ツールのコンソールにHello World!!と表示されます。

画面を動かしたくなったらscalajs-domを使うことによってDOMをいじるためのAPIが提供されるため、
ScalaでUIを好き勝手出来ます。やったね。
インターフェースはjavascriptのものと全く一緒なので安心です。

なんか作る

上記の2つでサーバとクライアントで通信して何かするアプリを作ってみたいと思います。
何を作るかは思い浮かばなかったのでToDoアプリにしようと思います。同時にアクセスしている人とToDoを共有できる誰得仕様で行きましょう。

ToDoアプリ程度でも直接DOMをいじるのは辛いので、React.jsを使うことにします。
幸い、scalajs-reactというScala.jsからReact.jsのAPIを扱うライブラリが既にあるのですんなり導入できます。

最低限の実装だと下記のようになります。

object Client extends JSApp {

  val todos = Seq(ToDo("todo1"), ToDo("todo2"))

  def list = todos.map {
    case ToDo(title) => <.li(title)
  }

  def rootComponent = {
    <.div(
      <.ul(list)
    )
  }

  def main(): Unit = ReactDOM.render(rootComponent, dom.document.getElementById("main"))

}

<.タグ名という記法でHTML(JSX?)を表現しています。初めは独特の記法で戸惑いましたが、慣れると普通のHTMLより楽に思えてきます。
また、型推論有りの静的型付けで、caseで値をマッチさせる事ができるとかはScala.jsならではだと思います。

<html>
<head>
    <script src="https://fb.me/react-with-addons-0.14.3.min.js"></script>
    <script src="https://fb.me/react-dom-0.14.3.min.js"></script>
    <script src="app.js"></script>
</head>
<body>
    <div id="main"></div>
    <script>
        example.Client().main();
    </script>
</body>
</html>

sbtでReact.js等のjsライブラリをまとめて一つのjsファイルに固める事もできるようですが、
今回はReact.js本体はCDNから別に読み込むようにしました。

これでブラウザ上に以下のように表示されます。

  • todo1
  • todo2

ToDoを操作できるようにする

先程の例は単なる静的ページでしか無いので、ちゃんと動くように作ります。
まずはReactのコンポーネントをちゃんと作るところから。

object Client extends JSApp {

  case class TodoAppState(formValue: String, list: Seq[ToDo])

  class TodoAppBackend($: BackendScope[Unit, TodoAppState]) {

    def onChange(e: ReactEventI) =
      $.modState(s => s.copy(formValue = e.target.value))

    def handleSubmit(e: ReactEventI) =
      e.preventDefaultCB >>
        $.modState(s => {
          s.copy(formValue = "", list = s.list :+ ToDo(s.formValue))
        })

    def handleDone(e: ReactEventI) =
      e.preventDefaultCB >>
        $.modState(s => {
          s.copy(list = s.list.map {
            case ToDo(id, false) if id == e.target.name => ToDo(id, true)
            case other => other
          })
        })

    def render(state: TodoAppState) = <.div(
      <.form(^.onSubmit ==> handleSubmit,
        <.input(^.onChange ==> onChange, ^.value := state.formValue),
        <.button("Add")
      ) ,
      state.list.map {
        case ToDo(title, false) => <.li(<.span(<.button(^.onClick ==> handleDone, ^.name := title, "Done"), title))
        case ToDo(title, true) => <.li(<.del(title))
      }
    )
  }

  val TodoApp = ReactComponentB[Unit]("TodoApp")
    .initialState(TodoAppState("", Seq.empty))
    .renderBackend[TodoAppBackend]
    .buildU

  def main(): Unit = ReactDOM.render(TodoApp(), dom.document.getElementById("main"))

}

ぱっと見た感じではコールバックの書き方が独特ですね。要素の属性には^.属性名でアクセスできます。
jsだとstateはオブジェクトですが、Scalaではクラス化する必要があります。ですが、Stateの型がキッチリとするのでかなりメリットが有ると思います。

コンポーネントで必要になるコールバック関数などはBackendとして別のクラスにします。
BackendScope[Unit, TodoAppState]の型パラメータの1個目はpropsの型で、今回は使わなかったのでUnitを指定しています。

<div id="main">
    <div>
        <form>
            <input value="">
            <button>Add</button>
        </form>
        <li>
            <span>
                <button name="まだやってない">Done</button>
                <span>まだやってない</span>
            </span>
        </li>
        <li>
            <del>オワタ</del>
        </li>
    </div>
</div>

実行して画面を操作するとmainのdivタグ内が大体上記のようになります。これはフォームから2件登録して内1件を完了にしたところです。
name属性に値を設定しているのはとりあえず使えそうだったからで、今回はそういったあたりは作りこみません。

そうだwebsocketを使おう

ブラウザ内でポチポチやっててもつまらないので、サーバ側と通信するようにしようと思います。
Akka Streamsを使える状態なので、せっかくだからwebsocketでToDoをやり取りすることにします。

アクセスしている全員にToDoを配信して共有できるように作りたいと思います。

サーバ側

一つのwebsocketクライアントから来たメッセージをすべてのクライアントに送り返す実装をしなくてはなりません。 こういう時にはAkka Streamsの出番です。

度重なる試行錯誤の結果、下記のような実装となりました。

object Server extends App {

  implicit val actorSystem = ActorSystem("my-system")
  implicit val flowMaterializer = ActorMaterializer()

  import Directives._

  val route = get {
    pathEndOrSingleSlash {
      getFromResource("index.html") // クラスパス上からファイルを読む
    }
  } ~
  path("app.js") {
    get {
      getFromResource("app.js") // クラスパス上からファイルを読む
    }
  } ~
  path("stream") {
    get {
      handleWebsocketMessages(todoMessageFlow) // websocketを扱う
    }
  }

  val broadcastActor = actorSystem.actorOf(Props[BroadcastActor])

  def todoMessageFlow = Flow.fromGraph(GraphDSL.create(Source.actorRef[ToDo](bufferSize = 3, OverflowStrategy.fail)) {
    implicit builder =>
      subscribeActor =>
        import GraphDSL.Implicits._

        val codec = Codec[ToDo] // MessagePack用コーデック
        val websocketSource = builder.add(Flow[Message].collect {
          case BinaryMessage.Strict(bin) => codec.decode(BitVector(bin.asByteBuffer)) match {
            case Successful(todo) => todo.value
            case e => ToDo(e.toString)
          }
        })

        val uuid = UUID.randomUUID().toString
        val merge = builder.add(Merge[ToDoMessage](2))

        val connActorSource = builder.materializedValue.map[ToDoMessage](Subscribe(uuid, _))

        val broadcastActorSink = Sink.actorRef(broadcastActor, UnSubscribe(uuid))

        val output = builder.add(
          Flow[ToDo].map {
            case t: ToDo =>
              BinaryMessage(ByteString(codec.encode(t).fold(_.message.getBytes, _.toByteArray)))
          }
        )

        // DSLによりフローグラフを構築
        websocketSource ~> merge ~> broadcastActorSink // websocketから来るデータとbroadcast用のアクターに渡すデータの
        connActorSource ~> merge                       // フローをマージしてbroadcast用のアクターに流す

        subscribeActor ~> output // broadcast用アクターから配信されたデータはwebsocketでクライアントに返す
        FlowShape(websocketSource.in, output.out) // websocketの入出力を繋ぐ
  })

  val serverBinding = Http(actorSystem).bindAndHandle(route ,interface = "localhost", port = 8080)

}

いろいろとAkka HTTP 1.0と違っているので公式ドキュメントなどを調べてフロー部分を作成しました。 DSLによりSourceからSinkへのデータの流れが表現することができています。

今回はクライアントから来たメッセージを一度broadcast用のアクターに渡して、そのアクターが接続中のクライアント全てにメッセージを送信するようにしています。 新しい接続が来た旨のメッセージもアクターに渡すためにフローをマージしてSinkに流しています。

複雑なフローだと作り方がわかりづらいですが、ただ単に文字列加工するだけであれば下記のようなフローをhandleWebsocketMessages()に渡してあげるだけで作れます。

  // handleWebsocketMessages()にはMessageを受け取ってMessageを返すフローを渡してあげます
  def exampleFlow = Flow[Message].map {
    case TextMessage.Strict(text) => TextMessage(s"メッセージ「${text}」を受け取った!")
  }

接続しているクライアント全員にToDoを共有するアクターは下記のようになっています。

class BroadcastActor extends Actor {

  val subscribers = mutable.HashMap.empty[String, ActorRef]

  def receive: Receive = {
    case Subscribe(id, actorRef) => subscribers += ((id, actorRef))
    case UnSubscribe(id) => subscribers -= id
    case todo => subscribers.values.foreach(_ ! todo)
  }

}

sealed trait ToDoMessage
case class Subscribe(id: String, actorRef: ActorRef) extends ToDoMessage
case class UnSubscribe(id: String) extends ToDoMessage
case class ToDo(title: String, done: Boolean = false) extends ToDoMessage

ついでにbuild.sbtに下記の記述を追加して、
クラスパス上にScala.jsが出力したjsファイルを置くようにしてみました。
htmlファイルもresources直下に置いています。

artifactPath in (Compile, fullOptJS) := file("src/main/resources/app.js")

これらのサーバ側の実装後に動作確認をしたところ、とりあえず想定通りに動くのですが、websocketの切断時に

Websocket handler failed while waiting for handler completion with empty.head

というメッセージのエラーが発生します。
akkaのgithubリポジトリにissueがあるので不具合なのかもしれません。

個人的にwebsocketにはいい思い出がないので、プロダクションレベルではあまり使用したくないです。。。

クライアント側

気を取り直して、クライアント側も実装していきます。
Addボタンを押すとサーバにToDoを送信、サーバからToDoメッセージが来たらStateのリストに追加する処理を入れ込みます。

Scala.jsではAkka Streamsは使えないので、monifuというリアクティブなライブラリを橋渡しに使ってみました。
ちなみにAkka ActorのScala.js実装というヤバそうなものも存在するようです。

サーバとクライアント間のメッセージ形式はMessagePackを使いました。
Scala.jsではjsのようにJSONオブジェクトでシリアライズなどが行えるのですが、肝心の型情報が無いためタイプセーフには行きませんでした。 そして、ScalaはネイティブでJSONをサポートしているわけではないので、必ず何かしらのライブラリを使うことになります。
そこで目をつけたのがscodec-msgpackというライブラリで、割と最近Scala.js対応もされたのでサーバとクライアントの両方で使うことが出来ます。 また、JSONより効率の良いMessagePackで作りたかったし。
ということで、scodec-msgpackでMessagePackを使おうと考えました。

最終的なbuild.sbtは以下のようになっています。

name := "akka-http-scalajs"

scalaVersion := "2.11.7"

enablePlugins(ScalaJSPlugin)

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-experimental" % "2.0.1",
  "org.scala-js" %%% "scalajs-dom" % "0.8.0",
  "com.github.japgolly.scalajs-react" %%% "core" % "0.10.3",
  "com.github.pocketberserker" %%% "scodec-msgpack" % "0.4.3",
  "org.monifu" %%% "monifu" % "1.0"
)

artifactPath in (Compile, fullOptJS) := file("src/main/resources/app.js")

そして、サーバとのやり取りを含めた実装は以下のようになりました。

object Client extends JSApp {

  case class TodoAppState(formValue: String, list: Seq[ToDo])

  class TodoAppBackend($: BackendScope[Unit, TodoAppState]) {

    def onChange(e: ReactEventI) =
      $.modState(s => s.copy(formValue = e.target.value))

    def handleSubmit(e: ReactEventI) =
      e.preventDefaultCB >>
        $.modState(s => {
          sendSubject.onNext(ToDo(s.formValue))
          s.copy(formValue = "")
        })

    def handleDone(e: ReactEventI) =
      e.preventDefaultCB >> Callback { // サーバへ送信
        sendSubject.onNext(ToDo(e.target.name, true))
      }

    def render(state: TodoAppState) = <.div(
      <.form(^.onSubmit ==> handleSubmit,
        <.input(^.onChange ==> onChange, ^.value := state.formValue),
        <.button("Add")
      ) ,
      state.list.map {
        case ToDo(title, false) => <.li(<.span(<.button(^.onClick ==> handleDone, ^.name := title, "Done"), title))
        case ToDo(title, true) => <.li(<.del(title))
      }
    )
  }

  val TodoApp = ReactComponentB[Unit]("TodoApp")
    .initialState(TodoAppState("", Seq.empty))
    .renderBackend[TodoAppBackend]
    .componentDidMount(scope => Callback { // コンポーネントのマウント時に受信したメッセージの扱いを登録
      receiveSubject.onSubscribe(new Observer[ToDo] {
        override def onNext(elem: ToDo): Future[Ack] = elem match {
          case ToDo(_, false) =>
            scope.modState(state => state.copy(list = state.list :+ elem)).runNow()
            Continue
          case ToDo(title, true) =>
            scope.modState(state => state.copy(list = state.list.map {
              case ToDo(id, false) if id == title => ToDo(id, true)
              case other => other
            })).runNow()
            Continue
        }
        override def onError(ex: Throwable): Unit = {}
        override def onComplete(): Unit = {}
      })
    })
  .buildU

  val receiveSubject: PublishSubject[ToDo] = PublishSubject() // サーバから来たメッセージのストリーム
  val sendSubject: PublishSubject[ToDo] = PublishSubject() // サーバに送信する用のストリーム

  def main(): Unit = {

    import scodec._
    import scodec.msgpack._
    import scodec.Attempt._
    val codec = Codec[ToDo] // MessagePack周りはサーバ側と一緒の実装で済む

    val root = ReactDOM.render(TodoApp(), dom.document.getElementById("main"))

    val ws = new WebSocket("ws://localhost:8080/stream") // WebSocketオブジェクトの使い方はjsの場合と一緒
    ws.binaryType = "arraybuffer" // バイナリの扱いの指定が必要

    ws.onmessage = { (e: MessageEvent) =>
      val msg = e.data match {
        case buf: ArrayBuffer => codec.decode(BitVector(int8Array2ByteArray(new Int8Array(buf)))).fold(e => ToDo(e.message), _.value)
      }
      receiveSubject.onNext(msg)
    }
    sendSubject.onSubscribe(new Observer[ToDo] {
      override def onNext(elem: ToDo): Future[Ack] = {
        codec.encode(elem) match {
          case Successful(a) => ws.send(byteArray2Int8Array(a.toByteArray).buffer)
          case Failure(e) => println(e.message)
        }
        Continue
      }

      override def onError(ex: Throwable): Unit = {}

      override def onComplete(): Unit = {}
    })

  }

}

ちょっと嵌ったところとしては、コールバックを呼び出し元に返さないでstateの値を更新しようとする場合は最後に.runNow()を付けてやらないと動かないということ。
あと、websocketでバイナリを扱うときにはjsのArrayBufferの知識が必要でした。ただ、Scala.jsから扱う分にはtypedarrayのbyteArray2Int8Arrayint8Array2ByteArray関数を使えばScalaArray[Byte]と相互変換できるのでそれだけ使っていれば問題ないように思います。

実装が終わってjsにコンパイルというところで罠がありました。
[error] (compile:fullOptJS) org.scalajs.core.tools.optimizer.OptimizerCore$OptimizeException: The Scala.js optimizer crashed while optimizing Lscodec_msgpack_codecs_MessagePackCodec$$anonfun$4.apply__Lscodec_Codec: java.lang.StackOverflowError
と。
sbtを起動するときのJVMオプションに適当に-Xss16Mくらいを設定すると無事にコンパイルが終わりました。

クライアントのjsを生成してからサーバを起動後、複数のブラウザ画面でアクセスしてからToDoを操作すると全ての画面で同じ動きをします。
実際に動かすと突っ込みどころ満載の仕様ですがキニシナイ!!

おわり

今回Akka HTTPを触った印象としては、ルーティングの書き方が柔軟でいろいろなことが出来そうだと思いました。
Akka Streamsに関しては複数のストリームを非同期につなぎ合わせられることで高いパフォーマンスを発揮するのでこれから使っていきたいと思います。また、DSLでデータの流れを表現できることが便利だと思いました。

Scala.jsに関しては実行前にコンパイラの検査が入るので、素のjsに比べて実行時エラーが極端に少なくなる印象でした。そして何よりScalaの言語仕様のおかげで割りとサクッと書けます。
また、React.jsのようなライブラリと組み合わせると非常に強力だと思いました。 基本的にHTMLテンプレートみたいなものは弄らなくなるので、コンパイルが出来れば必ず動く環境を手に入れられるかも知れません。
ただ、jsへの出力時間が心配で、通常のclassファイルへのコンパイルが約3秒なのに対してfullOptJSでは12~22秒くらいになりました。 普段はclassファイルへのコンパイルを行ってチェックを行い、必要なときだけjsへ出力すればマシになるのかなと思います。 Scala.jsはかなり面白いのでなんとか使えないか模索したいと思っています。

以上になります。
ここまで読んでいただいてありがとうございました。