あけましておめでとうございます。
初めまして、2017年4月に中途入社した張沢と申します。
今回はPlay FrameworkでServer-Sent Events(SSE)を使用してServer pushを行う実装方法について書きます。 WebSocketの情報は検索すると色々見つかりますが、SSEの記事やサンプルコードはあまり見かけないため…。
Play Framework 2.5からstream関連のAPI実装がAkka Streamに変わりました。そのため、Server pushのAPIでもAkka Streamを使用する必要があります。この記事ではSSEでのAkka Streamの使用方法について紹介します。
Server-Sent Eventsとは
Client(ブラウザ)へHTTP server pushを実現するための技術の1つで、以下のような特徴があります。
- HTTPを利用したストリーム通信
- 通信はServerからClient(ブラウザ)への一方向
- UTF-8エンコードの文字列のみ送信可能
- IEやEdgeではサポートされていませんが、polyfillが存在します
Play FrameworkにおけるSSEの実装方法
Play Framework 2.6でSSEを実装する場合、Controllerで下記のように書きます。
/app/controllers/FooController.scala
package controllers import javax.inject.{Inject, Singleton} import akka.stream.scaladsl.Source import play.api.http.ContentTypes import play.api.libs.EventSource import play.api.mvc._ @Singleton class FooController @Inject() (cc: ControllerComponents) extends AbstractController(cc) { def sse = Action { val source: Source[String, _] = _ Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM) } }
/conf/routes
GET /sse controllers.FooController.sse
SSEは文字列しか返せない仕様のため、最終的にStringを返すAkka StreamのSourceを指定する必要があります。自分で定義した型をSourceとして流す場合、source.map()
でStringへ変換してください。
source.map(Json.toJson(_).toString)
のようにしてJSONの文字列に変換し、ブラウザ側(JavaScript)でJSON.parse()
してデータをやりとりする方法もよいと思います。例えば以下のように。
ブラウザ側のJavaScript
const eventSource = new EventSource('/sse'); eventSource.onmessage = event => { // サーバーから送信されたJSON文字列をparseする const json = JSON.parse(event.data); // do something. }
なるほど、簡単そうですね?
SSEのサンプルコード
さて、Play FrameworkでSSEを行うサンプルコードを探してみると、Play 2.5で非推奨となったEnumerator
を使用したサンプルやSource.tick()
を使用した一定時間ごとに時刻を送信するだけのサンプルしか見つからず、困り果てた方もいるのではないでしょうか?
例えば、他のActionで発生したリクエストやイベントなどのデータをSSEで各ブラウザにpushするサンプルが欲しくなるかと思います。
ということで、ごくごく簡単なチャットアプリケーションをPlay Framework 2.6で作りました。メッセージを投稿するとSSEにより同じページを開いている他のブラウザへ投稿したメッセージを送信します。
以下のコマンドでこのアプリケーションを起動することができます。起動後、ブラウザでlocalhost:9000
にアクセスしてください。
$ git clone https://github.com/septeni-original/play-scala-sse-example.git
$ cd play-scala-sse-example
$ sbt run
このチャットアプリケーションには4つの実装サンプルがあり、それぞれ以下のSourceを使用しています。
Source.actorRef()
Source.actorPublisher()
Source.fromGraph()
MergeHub
&BroadcastHub
それぞれのSourceについて説明していきます。
Source.actorRef()
おそらく一番簡易的に使用できるSourceです。Source[_, ActorRef]
が生成され、このactorRefにブラウザへ送信したいメッセージを送ります。
生成されたactorRefにはmapMaterializedValue()
またはwatchTermination()
でアクセスできます。基本的にはActorの停止を監視すると思いますので、watchTermination()
を使うことになります。
下記の例ではSource.actorRef()
で生成されたactorRefを保持し、メッセージが投稿されるたびに保持しているactorRefへメッセージを送信して、SSE経由で各ブラウザへ送っています。
class ActorRefController @Inject() (system: ActorSystem, cc: ControllerComponents, addToken: CSRFAddToken) (implicit executionContext: ExecutionContext) extends AbstractController(cc) { // `Source.actorRef()` の actorRef を管理するためのActor private[this] val manager = system.actorOf(ActorRefManager.props) def index = addToken(Action { implicit request => Ok(views.html.actorRef(CSRF.getToken.get)) }) def receiveMessage = Action(parse.json[Message]) { request => // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する manager ! SendMessage(request.body.toString) Ok } def sse = Action { val source = Source .actorRef[String](32, OverflowStrategy.dropHead) .watchTermination() { case (actorRef, terminate) => // actorRefをmanagerに登録し、actorが停止した際には登録を解除する manager ! Register(actorRef) terminate.onComplete(_ => manager ! UnRegister(actorRef)) actorRef } Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM) } } class ActorRefManager extends Actor { private[this] val actors = mutable.Set.empty[ActorRef] def receive = { case Register(actorRef) => actors += actorRef case UnRegister(actorRef) => actors -= actorRef case SendMessage(message) => actors.foreach(_ ! message) } } object ActorRefManager { def props: Props = Props[ActorRefManager] case class SendMessage(message: String) case class Register(actorRef: ActorRef) case class UnRegister(actorRef: ActorRef) }
バッファーが溢れた際の動作をOverflowStrategyで指定できますが、このactorRefはAkkaが内部で生成したActorPublisher
のため、背圧制御を行う(指定する)ことはできません。
Source.actorPublisher()
背圧制御を行うためにはActorPublisher
を自分で定義します。
Implementing Reactive Streams Publisher or Subscriber
class Publisher extends ActorPublisher[String] { import akka.stream.actor.ActorPublisherMessage._ val MaxBufferSize = 256 private[this] var buf = Vector.empty[String] def receive = { case SendMessage(message) if buf.size == MaxBufferSize => sender() ! MessageDenied(self, message) case SendMessage(message) => if (buf.isEmpty && totalDemand > 0) onNext(message) else { buf :+= message deliverBuf() } case Request(_) => // Subscriberからデータを要求されたらバッファーに溜めてたデータを送信する deliverBuf() case Cancel => // `Cancel`メッセージを受け取った場合はActorを停止する context.stop(self) } @annotation.tailrec final def deliverBuf(): Unit = if (totalDemand > 0) { /* * totalDemand is a Long and could be larger than * what buf.splitAt can accept */ if (totalDemand <= Int.MaxValue) { val (use, keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { val (use, keep) = buf.splitAt(Int.MaxValue) buf = keep use foreach onNext deliverBuf() } } } object Publisher { def props: Props = Props[Publisher] }
ActorPublisher
ではtotalDemand
でSubscriberが要求している(受け取れる)データの個数が取得できますので、その数を超えないようにonNext()
でデータを送ります。(totalDemand
がLongを返すことに注意してください)
今回のケースでは、データの送信タイミングは以下の2通りです。
- Subscriberから
Request
メッセージを受け取った(データを要求された)場合 - チャットのメッセージが送信されてきた時に、Subscriberがまだデータを受け取れる場合
それ以外の場合は自身が持っているバッファーにデータを溜めておきます。
Source.actorPublisher()
で定義したActorPublisherからSourceを生成できます。
class PubSubController @Inject() (system: ActorSystem, cc: ControllerComponents, addToken: CSRFAddToken) (implicit executionContext: ExecutionContext) extends AbstractController(cc) { // ActorPublisherを管理するためのActor private[this] val manager = system.actorOf(PublishersManager.props, PublishersManager.name) def index = addToken(Action { implicit request => Ok(views.html.pubSub(CSRF.getToken.get)) }) def receiveMessage = Action(parse.json[Message]) { request => // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する manager ! SendMessage(request.body.toString) Ok } def sse = Action { val source = Source .actorPublisher[String](Publisher.props) .watchTermination() { case (publisher, terminate) => // ActorPublisherをmanagerに登録し、actorが停止した際には登録を解除する manager ! Register(publisher) terminate.onComplete(_ => manager ! UnRegister(publisher)) publisher } Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM) } } class PublishersManager extends Actor with ActorLogging { private[this] val publishers = mutable.Set.empty[ActorRef] def receive = { case Register(publisher) => publishers += publisher case UnRegister(publisher) => publishers -= publisher case event: SendMessage => publishers.foreach(_ ! event) case denied: MessageDenied => log.error(denied.toString) } } object PublishersManager { val name: String = PublishersManager.getClass.getSimpleName val props: Props = Props[PublishersManager] case class SendMessage(message: String) case class Register(publisher: ActorRef) case class UnRegister(publisher: ActorRef) case class MessageDenied(publisher: ActorRef, message: String) }
ActorPublisher
内で背圧制御を行っているところ以外は、ほぼSource.actorRef()
のサンプルコードと同じ処理になっています。
ちなみにActorPublisher
はAkka-Stream 2.5.xでdeprecatedになっています。
Source.fromGraph()
(GraphStage)
deprecatedになったActorPublisher
の代わりとして、GraphStage
を使えばよさそうなことが書かれていますので、実際に使ってみます。
Custom stream processing • Akka Documentation
It is possible to acquire an ActorRef that can be addressed from the outside of the stage, similarly how
AsyncCallback
allows injecting asynchronous events into a stage logic. This reference can be obtained by callinggetStageActorRef(receive)
passing in a function that takes a Pair of the sender ActorRef and the received message.
- they are not location transparent, they cannot be accessed via remoting.
- they cannot be returned as materialized values.
- they cannot be accessed from the constructor of the
GraphStageLogic
, but they can be accessed from thepreStart()
method.
Stage外からイベントを受け取るためのactorRefが用意されているようです。getStageActorRef(receive)
でアクセスできますが、preStart()
内でアクセスする必要があったり、位置透過性がなくリモートからアクセスできないなど注意すべき点があるようです。
class MessageStage(manager: ActorRef) extends GraphStage[SourceShape[String]] { // Define the (sole) output port of this stage val out: Outlet[String] = Outlet("MessageStage") // Define the shape of this stage, which is SourceShape with the port we defined above val shape: SourceShape[String] = SourceShape(out) private[this] val messages = mutable.Queue.empty[String] private[this] var downstreamWaiting = false // This is where the actual (possibly stateful) logic will live override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { implicit def self: ActorRef = stageActor.ref override def preStart(): Unit = { // `preStart()` 内で ActorRef を取得し、managerに登録する val thisStageActor = getStageActor(receive).ref manager ! Register(thisStageActor) } setHandler(out, new OutHandler { override def onPull(): Unit = { if (messages.isEmpty) { // 送信するメッセージが来るまで`push()`しない downstreamWaiting = true } else { push(out, messages.dequeue()) } } }) private def receive(receive: (ActorRef, Any)): Unit = receive match { case (_, SendMessage(message)) => messages.enqueue(message) // 下流の`onPull()`に対してまだメッセージを送信していなければ送信する if (downstreamWaiting) { downstreamWaiting = false push(out, messages.dequeue()) } } } }
portとshape
GraphStageのボイラープレートのようなもので、portとshapeを定義する必要があります。
portは、入力側のin: Inlet
や出力側のout: Outlet
を定義するのですが、今回はSourceなので出力側のout
のみ定義します。Flowのように入力と出力がある場合はin
とout
、Sinkのように入力しかない場合はin
のみ定義します。port定義時に名前をつける必要がありますが、基本的には自身のclass名を指定します。Flowのようにin
とout
の2つのportがある場合は、それぞれClassName.in
, ClassName.out
のように分けて名前をつけます。今回はout
のみなのでOutlet("MessageStage")
としています。
shapeは基底クラスGraph
のshapeをoverrideしています。各GraphStageごとに対応しているshapeを設定します。
GraphStage[SourceShape[_]]
SourceShape(out)
GraphStage[FlowShape[_, _]]
FlowShape.of(in, out)
GraphStage[SinkShape[_]]
SinkShape(in)
preStart()
とpostStop()
前述の通り、preStart()
でactorRefを取得しています。何かしらの前処理を行う場合はここで行うのが良いでしょう。逆に後処理が必要な場合は、postStop()
をoverrideしてその中で行います。
handler
各portに対してhandlerを設定します。out
には下流からデータのリクエストがあった際の処理をOutHandler
で記述します。やることは簡単でonPull()
内でpush()
を実行して下流にデータを送信するだけです。
ここで送信するべきデータがない場合に何をすればいいのか悩むかと思います。結論から言うと、次に送信するデータの用意ができるまでonPull()
では何もしなくてOKです。
公式ドキュメントのサンプルコード通り、onPull()
で送信すべきメッセージがまだない場合、downstreamWaiting = true
にして下流を待たせている状態であることを記憶しておきます。そして次に送信すべきメッセージが来たとき、下流を待たせている場合はpush()
でメッセージを送信します。つまり1回のpullに対して1回pushすればよいわけです。また、push()
を実行するまで次のpullが来ることはありません。
Source.fromGraph()
で定義したGraphStageからSourceを生成できます。
class GraphStageController @Inject() (system: ActorSystem, cc: ControllerComponents, addToken: CSRFAddToken) extends AbstractController(cc) { // GraphStage の actorRef を管理するためのActor private[this] val manager = system.actorOf(GraphStageManager.props) def index = addToken(Action { implicit request => Ok(views.html.graphStage(CSRF.getToken.get)) }) def receiveMessage = Action(parse.json[Message]) { request => // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する manager ! SendMessage(request.body.toString) Ok } def sse = Action { val source = Source.fromGraph(new MessageStage(manager)) Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM) } } class GraphStageManager extends Actor { private[this] val actors = mutable.Set.empty[ActorRef] def receive = { case Register(actorRef) => // GraphStageのActorが停止した際に登録を解除するためにActorを監視する context.watch(actorRef) actors += actorRef case Terminated(actorRef) => // GraphStageのActorが停止したら監視を停止して登録を解除する context.unwatch(actorRef) actors -= actorRef case message: SendMessage => actors.foreach(_ ! message) } } object GraphStageManager { def props: Props = Props[GraphStageManager] case class SendMessage(message: String) case class Register(actorRef: ActorRef) }
MergeHub & BroadcastHub
動的なin/outにはMergeHubとBroadcastHubが使えます。
Dynamic fan-in and fan-out with MergeHub, BroadcastHub and PartitionHub
Combining dynamic stages to build a simple Publish-Subscribe service
公式ドキュメントのサンプルコードをそのままコピペすると以下のようになります。
class HubController @Inject() (cc: ControllerComponents, addToken: CSRFAddToken) (implicit val mat: Materializer) extends AbstractController(cc) { private[this] val (sink, source) = MergeHub.source[String](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) .run() def index = addToken(Action { implicit request => Ok(views.html.hub(CSRF.getToken.get)) }) def receiveMessage = Action(parse.json[Message]) { request => Source.single(request.body.toString).runWith(sink) Ok } def sse = Action { Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM) } }
BroadcastHub.sink()
には何度でもSourceを流すことができるので、メッセージが投稿されるたびにSource.single()
で投稿されたメッセージをブラウザへ送信しています。
MergeHubとBroadcastHubも内部的にはGraphStageで実装されていますので、興味があればソースを眺めてみるのも良いでしょう。
おわりに
Akka-Streamの詳細にはあまり立ち入らずSourceの使用方法にフォーカスした記事でしたが、いかがだったでしょうか?
この記事がPlay FrameworkでSSEを実装する際の一助になれば幸いです。
参考リンク
EventSource - play.api.libs.EventSource
https://www.playframework.com/documentation/2.6.x/api/scala/index.html#play.api.libs.EventSource$Tutorials - 2.6.x #Comet / Server-Sent-Events (SSE)
https://www.playframework.com/documentation/2.6.x/Tutorials#Comet-/-Server-Sent-Events-(SSE)Streams • Akka Documentation
http://doc.akka.io/docs/akka/current/scala/stream/index.htmlakka-stream を始めるときに覚えておきたいこと // Speaker Deck
https://speakerdeck.com/tkawachi/akka-stream-woshi-merutokinijue-eteokitaikotoHTML Standard 9.2 Server-sent events
https://html.spec.whatwg.org/multipage/server-sent-events.htmlServer-Sent Events の利用 - Server-sent events | MDN
https://developer.mozilla.org/ja/docs/Server-sent_events/Using_server-sent_eventsサーバサイドpush技術としてのWebsocketとServer-sent eventsの特徴比較 https://qiita.com/suin/items/e33af700ceb678d40a67