お疲れさまです。中途2年目の堀越です。
表題の通り AWS のサービスを活用した Slack 通知アプリケーションを
開発する機会がありましたのでアウトプットです。
尚、今回はイベントソースについて詳細には触れませんのでご了承ください。
こちらについてはまたの機会に。
アプリケーション概要
- SQS に溜めたメッセージを Lambda がポーリング
- 読み込んだメッセージを Slack に POST
- 通知が成功したメッセージを SQS から削除
本ブログにおいてはコアの機能となっている、
メッセージの読み込み、通知が成功した場合にメッセージを削除する機能について解説していきます。
開発の準備
Serverless Framework インストール
タイトルにある通り、Lambdaのデプロイ及び動作確認には、
Serverless Framework を利用しました。
serverless.com
インストールは Quick Start#Pre-requisites から
YAMLファイルをちょこっと書くだけで、
アプリケーション構成を定義できるので大変便利でした。
今回はAWS Lambdaの構成管理に利用しましたが、
他にもGCP, Azureなど様々なプラットフォームに対応しているところも魅力的です。
環境変数 AWS_PROFILE
SQS, Lambda, S3, CloudFormation への更新が必要なので、
対象の credential に適切なポリシーを設定してください。
サービスの作成
インストールした Serverless を利用してコマンドラインからサービスを作成していきます。
今回は Lambda を Scala で実装するので、
テンプレートには aws-scala-sbt
を指定します。
$ sls create --template aws-scala-sbt
結果は下記の通り。
├── build.sbt ├── project │ ├── assembly.sbt │ ├── build.properties │ └── plugins.sbt ├── serverless.yml └── src └── main └── scala └── hello ├── ApiGatewayResponse.scala ├── Handler.scala ├── Request.scala └── Response.scala
build.sbt 修正
libraryDependencies に必要なライブラリを追加していきます。
libraryDependencies ++= Seq( "com.amazonaws" % "aws-lambda-java-events" % "1.3.0", "com.amazonaws" % "aws-lambda-java-core" % "1.1.0", /* 下記を追加 */ "com.eed3si9n" %% "gigahorse-okhttp" % "0.3.0", "io.circe" %% "circe-core" % "0.9.3", "io.circe" %% "circe-generic" % "0.9.3", "io.circe" %% "circe-parser" % "0.9.3", "com.typesafe" % "config" % "1.3.2", "com.amazonaws" % "aws-java-sdk-sqs" % "1.11.386", "com.amazonaws" % "aws-java-sdk-core" % "1.11.386", "com.amazonaws" % "jmespath-java" % "1.11.386" )
用途としては下記の通りです。
- aws-java-sdk-sqs
- SQSからのメッセージ読み込みと削除処理の実装
- aws-java-sdk-core, jmespath-java
- gigahorse-okhttp
- Slack Web Api の Http Client に利用
- circe-core, circe-generic, circe-parser
- メッセージ の Decode 処理
- config
- application.conf 読み込み
Lambda関数を開発していく
Lambda関数を実装していきます。
まずは、src / main / scala
ディレクトリの配下に適当なディレクトリを用意して実装していきます。
今回は sample
ディレクトリでいきます。
Amazon SQS メッセージの受信・削除を行うクラス
aws-java-sdk-sqs
を利用して
メッセージの受信・削除を行うクラスを作成します。
基本的にはキューのURLを指定して、
行いたいオペレーションのメソッドを定義するだけなので実装はシンプルです。
package sample import com.amazonaws.services.sqs.model.{Message, ReceiveMessageRequest} import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import com.typesafe.config.Config import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} class SQSClient(config: Config) { private val queueUrl = config.getString("aws.sqs.queue.url") private val sqsClient: AmazonSQS = AmazonSQSClientBuilder.defaultClient() private val receiveRequest: ReceiveMessageRequest = { val request = new ReceiveMessageRequest request.setMaxNumberOfMessages(10) request.setQueueUrl(queueUrl) request } def receive(implicit ec: ExecutionContext): Future[Seq[Message]] = Future { sqsClient.receiveMessage(receiveRequest).getMessages.asScala } def delete(msg: Message)(implicit ec: ExecutionContext): Future[Unit] = Future{ sqsClient.deleteMessage(queueUrl, msg.getReceiptHandle) } }
各オペレーションのリクエストに対し細かいオプションを設定することができるので、
今回は受信時のリクエストに最大受信数を設定することにしました。
詳しくは ReceiveMessageRequest (AWS SDK for Java - 1.11.401) 参照。
Slack通知を行うクラス
Slack API の chat.postMessage に対してPOSTリクエストするクラスです。
Httpクライアントには Gigahorse を利用しました。
package sample import com.amazonaws.services.sqs.model.Message import com.typesafe.config.Config import gigahorse.support.okhttp.Gigahorse import gigahorse.{HeaderNames, MimeTypes, Request} import scala.concurrent.{ExecutionContext, Future} class SlackClient(config: Config) { private val Url = config.getString("slack.api.post.message.url") private val ApiToken = config.getString("slack.api.token") private val requestWithBody: String => Request = { body: String => Gigahorse .url(Url) .post(body) .addHeaders( HeaderNames.AUTHORIZATION -> s"Bearer $ApiToken", HeaderNames.CONTENT_TYPE -> MimeTypes.JSON ) } def post(message: Message)(implicit ec: ExecutionContext): Future[Unit] = Gigahorse.withHttp(Gigahorse.config) { http => http .run(requestWithBody(message.getBody), Gigahorse.asString) .flatMap { s => SlackAPIResponse .fromJsString(s) .fold(e => Future.failed(e), _ => Future.successful(())) } } }
今回は割愛してますが、 Gigahorse — Extending Gigahorse で紹介されている
OAuth認証処理をラップしたクラスの実装例など非常に参考になりました。
Slack APIレスポンスの処理
Slack APIのレスポンスは通知が失敗した場合でもステータスコード200で結果を返してくるため、
下記のようなモデルにパースして、失敗していた場合は独自例外を返すようしました。
package sample import io.circe.Decoder import io.circe.parser.decode case class SlackAPIResponse(ok: Boolean, error: Option[String]) object SlackAPIResponse { class SlackPostMessageError( message: String, cause: Option[Throwable] = None ) extends Exception(message, cause.orNull) { def this(message: String, cause: Throwable) = this(message, Some(cause)) } object SlackPostMessageError { def apply(stringOpt: Option[String]): SlackPostMessageError = stringOpt .map(msg => new SlackPostMessageError(msg)) .getOrElse(new SlackPostMessageError("There is no errors")) } implicit val decoder: Decoder[SlackAPIResponse] = Decoder.forProduct2("ok", "error")(SlackAPIResponse.apply) def fromJsString( jsonString: String): Either[SlackPostMessageError, SlackAPIResponse] = decode[SlackAPIResponse](jsonString) match { case Right(response @ SlackAPIResponse(true, _)) => Right(response) case Right(_ @SlackAPIResponse(false, msgOpt)) => Left(SlackPostMessageError(msgOpt)) case Left(e) => Left(new SlackPostMessageError(e.getMessage, e)) } }
circe が非常に便利です。 circe.github.io
Handlerの実装
AWS Lambda から呼び出される関数を定義します。
先に解説した SQSClient, SlackClient を駆動してコアとなる機能を実装していきます。
package sample import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} import com.typesafe.config.ConfigFactory import sample.SampleHandler.{SampleRequest, SampleResponse} import scala.beans.BeanProperty import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.control.NonFatal object SampleHandler { class SampleRequest(@BeanProperty var value: String) { def this() = this("") } case class SampleResponse( @BeanProperty value: String = "slack message post done.") } class SampleHandler extends RequestHandler[SampleRequest, SampleResponse] { import scala.concurrent.ExecutionContext.Implicits.global private val config = ConfigFactory.load() private val sqsClient = new SQSClient(config) private val slackClient = new SlackClient(config) def handleRequest(input: SampleRequest, context: Context): SampleResponse = { val eventualUnit = for { messages <- sqsClient.receive eventualMessages = messages.map(m => slackClient.post(m).map(_ => m)) eventualUnits = eventualMessages.map(_.flatMap(sqsClient.delete)) result = eventualUnits.map(_.recover(printStackTraceInCaseOfFailure())) _ <- Future.sequence(result) } yield SampleResponse() Await.result(eventualUnit, 300.second) } private def printStackTraceInCaseOfFailure(): PartialFunction[Throwable, Unit] = { case NonFatal(e) => e.printStackTrace() } }
Await.result
の第二引数( atMost
) に設定しているのは Lambda の最大実行時間です。
Slack通知が失敗したメッセージの削除はスキップしてひとまずスタックトレースを吐くことにしました。
キューに残ったメッセージの後始末は最大受信数を設定してデッドレターキューに投げてしまうのが良さそうですね。
application.conf
src / main / resources
に application.conf
を作成します。
キューのURLとSlackのAPIトークンは環境変数から読み込むことにました。
aws.sqs.queue.url=${QUEUE_URL}
slack.api.token=${SLACK_API_TOKEN}
slack.api.post.message.url="https://slack.com/api/chat.postMessage"
serverless.yml
Lambda関数やSQSの構成を定義していきます。
service: sample-slack-post frameworkVersion: "=1.30.0" provider: name: aws runtime: java8 stage: ${opt:stage, 'dev'} region: ap-northeast-1 iamRoleStatements: - Effect: "Allow" Action: - "sqs:*" Resource: Fn::ImportValue: ${self:service}:${self:provider.stage}:SampleQueueArn package: artifact: target/scala-2.12/hello.jar functions: sample: handler: sample.SampleHandler name: sample-slack-post-function-${self:provider.stage} timeout: 300 memorySize: 256 environment: QUEUE_URL: Ref: sampleQueue SLACK_API_TOKEN: "slack api token required" resources: Resources: sampleQueue: Type: AWS::SQS::Queue Properties: QueueName: sample-slack-post-${self:provider.stage} ReceiveMessageWaitTimeSeconds: 5 VisibilityTimeout: 10 Outputs: SampleQueueArn: Description: The ARN for the Sample Queue Value: "Fn::GetAtt": [ sampleQueue, Arn ] Export: Name: ${self:service}:${self:provider.stage}:SampleQueueArn
SQSに関して、実際の業務では CloudFormationでは構築しましたが、
せっかくなので resources
で構築してみました。
Scalaのコードが参照するキューのURLとSlackのAPIトークンは、
environment
から環境変数に登録しています。
実際の開発では、キューのURLなど環境毎に切り替えが必要な情報は、
開発用、ステージング用にファイルを切り出して対応しました。
serverless.com
デプロイ
カレントディレクトリ上で下記のコマンドを実行していきます。
パッケージング
$ sbt assembly
デプロイ
$ sls deploy -v
AWSコンソール上からも serverless.yml に定義した Lambda と SQS がクラウド上に、
作成されていることが確認できます。
Lambda
SQS
デプロイすると Serverless が CloudFormation に Stack を作成します。
失敗することがありますが、
環境変数 AWS_PROFILE に登録に入力したプロファイルの権限が問題なことが多いのでポリシーの見直しが必要でしょう。
また、正しい権限に修正しても一度作成した CloudFormation の Stack をうまく削除することができずに再実行が失敗することがあるので、
そんなときは迷わず下記のコマンドを実行すれば大体のことは解決できそうでした。
$ sls remove
動かしてみる
メッセージの送信
テストデータとして aws-cli からキューにメッセージを送信します。
メッセージの内容は Slack API のリクエストボディに設定する Json です。
今回は channel
, text
, username
を指定したシンプルな Json を送信します。
$ aws sqs send-message --queue-url <キューのURL> --message-body '{"channel":"@t_horikoshi","text":"test","username":"LambdaBot"}'
Lambdaの実行
serverless.yml の functions.name
に指定した名前を指定して、
Lambdaを実行します。
$ sls invoke -f sample -l
🎉🎉🎉ヒャッホウ!!!🎉🎉🎉
通知を確認できました
開発してみて
Serverless Framework
Lambda の設定に関して、
実際の開発では timeout
や memorySize
に加えて reservedConcurrency
, events.schedule
を serverless.yml
に定義しました。
これらの構成管理やクラウドへの反映が大変容易に感じました。
また依存するSQSのリソース定義が一元管理できる点も魅力的です。
CloudFormation や terraform の知見がある人にとっては、
キャッチアップが容易な気がします。
便利。
AWS SQS
可視性タイムアウトやデッドレターキューの仕組みなど、
よく考えられているなーっと感心しました。
デッドレターキューに送信した不正メッセージの処理については、
AWS SNSなどで分析できるという話でした。
docs.aws.amazon.com
素敵。
AWS Lambda
動きっぱなしのEC2なんかで動く簡素な日次バッチ処理なんかは、
全部Lambdaに移行してしまえばコストカットにつながるのかなという印象でした。
同時実行起動数やイベントソースのサポートが大変充実しているので、
割と色んなユースケースに柔軟に対応できる気がします。
実はSQSのイベントソースがサポートされているので、
開発でも試してみたかったのですが進捗の都合で今回は断念しました...、無念。
機会があれば触ってみたいと思います。
無念。
辛かったところ
Slack API の chat.postMessage ですが
Channel 毎に秒間1リクエストしか受け付けないという RateLimit がありまして、
このあたりの制御がちょっと厄介でした。
Slackに通知する際、リクエストが1回で済むようにチャンネル単位でメッセージを集計したり、
Throttle制御入れたり、Lambdaの同時実行数を絞ったりなどなど...
解説では割愛してますがいろいろやってます。
このあたりはちょっとしんどかったです。
何でもかんでもSlack通知するっていうのはもうやめようぜ。
そんな気持ちになりました。
終わりに
今回紹介したコードは GitHub に公開しておきました。
どなたかの参考になればと。
総じてAWSのクラウドサービスは便利ですね。
いろいろできるようになりたいので今後も仲良くしていきたいです。
本当はイベントソースやデッドレターキューの
お話などをもう少し詳しく解説に交えたかったのですがそれはまたの機会に。
では、また。