LoginSignup
2

More than 3 years have passed since last update.

Scala カスタムExecutionContextの作成

Last updated at Posted at 2020-03-14

カスタムExecuitonContextを作っていきます。

背景

普段Futureを使う時に、なんとなくExecutionContextをDIして、implicitでFutureに渡しています。

これまでは 一つのExecutionContextをFutureに渡してElasticsearchに対して非同期処理を行っていました。
ところが、諸事情で一つのアプリケーションがElastisearchだけではなくDocumentDB(MongoDBのAWSマネージドサービス)にも、ドキュメントを参照・保存するような処理にするような修正をしなければいけなくなりました。

これまで、一つのデフォルトのExecutionContextでElasticsearchに対して非同期処理を行っていたところを、DocumentDBとの接続に対しても利用するようになりました。

共通のExecutionContextを使えれば使いたかったのですが、今回DocumentDBに関して、むやみやたらにコネクションを貼ってしまうと、待機中のスレッドが多すぎて処理しきれなくなります。(デフォルトで最大で500スレッド待機できる)

実際に以下のようなエラーが出てしまいます。

エラー内容

com.mongodb.MongoWaitQueueFullException: Too many threads are already waiting for a connection. Max number of threads (maxWaitQueueSize) of 500 has been exceeded.

なので、DocumentDB側で最大でコネクションを貼れる数(プールできるコネクション数)と、アプリケーション側で接続するスレッドの数をチューニングしてあげる必要があります。

アプリケーション側で並列で動かすスレッド数を、DocumentDB側で設定したコネクションプール数を超えなければ、正常に処理できるはず。

参照: DocumentDBコネクション数設定

そこで、普段何気なく使っているExecutionContextをカスタマイズする必要が出てきました。

ExecutionContextとは

スレッドプールを必要とせず、非同期でプログラムを実行するscalaの標準ライブラリです。

概要は、ExecutionContextライブラリにコメントアウトで、長々と書いてあります。

ExecutionContext.scala
/**
 * An `ExecutionContext` can execute program logic asynchronously,
 * typically but not necessarily on a thread pool.
 *
 * A general purpose `ExecutionContext` must be asynchronous in executing
 * any `Runnable` that is passed into its `execute`-method. A special purpose
 * `ExecutionContext` may be synchronous but must only be passed
 * ............続く

自前のExecutionContextを作る

ExecutionContextライブラリに、カスタムする際の良い方法をちょこっと書いています。

* A custom `ExecutionContext` may be appropriate to execute code
 * which blocks on IO or performs long-running computations.
 * `ExecutionContext.fromExecutorService` and `ExecutionContext.fromExecutor`
 * are good ways to create a custom `ExecutionContext`.

ExecutionContext.fromExecutorのように、自前のExecutionContextを作成するインターフェースが存在するみたいです。

実際に作ってみる

二通りのやり方でカスタムしていきたいと思います。

1. 一つ目

ExecutionContextTest1.scala

class MongoRepository1 {

  implicit val service: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))

  def find = {
    Future {
      for (i <- 1 to 50) {
        println(i * 2)
      }
    }
  }
}

fromExecutorServicenの引数に、Executorsインスタンスを渡しています。
newFixedThreadPool()で、プールできるスレッド数を固定値で指定できます。

implicitでFutureに渡してあげることで、カスタマイズしたExecutionContextで非同期処理を実行できます。

2. 二つ目

play framework上で、ExecutionContextをカスタマイズする方法です。
DIで、きれいに実装するためにこのような実装になっています。

ExecutionContextTest.scala

package study.executionContextTest

import java.util.concurrent.{ExecutorService, Executors}

import com.google.inject.name.{Named, Names}
import com.google.inject.{AbstractModule, Inject, Singleton}

import scala.concurrent.{ExecutionContext, Future}

// 自前のExecutionContextを作る
class Execution @Inject()(mongoRepository: MongoRepository) {
  mongoRepository.find
}

// DIするモジュールを作っている
// ここで、自前のExecutionContextを作成するので、いろいろ自前の設定を定義できる
class MongoExecutionContext(threadCount: Int) extends ExecutionContext {

  // JavaのExecutorServiceを使うことで、スレッドプールを作成できる
  // newFixedThreadPoolで固定値で、プールするスレッド数を定義している。
  private val executorService: ExecutorService =
    Executors.newFixedThreadPool(threadCount)

  // Futureは、内部的に引数のブロックをRannableでラップして、ExecutionContextのexecuteメソッドを実行している。
  override def execute(runnable: Runnable): Unit =
    executorService.execute(runnable)

  override def reportFailure(cause: Throwable): Unit = throw cause
}

// 自前のモジュールを作成
// google guiceの仕様
// application.confのmoduleに定義することで、google guiceでDIできるようになる
class MongoExecutionModule extends AbstractModule {
  override def configure(): Unit = {
    bind(classOf[ExecutionContext])
      .annotatedWith(Names.named("MongoExecutionContext"))
      .toInstance(new MongoExecutionContext(50))
  }
}

// このインスタンスは、一個しか作られないようにする。
// 複数のExecutionContextが作成されると、チューニングがバグる
@Singleton
class MongoRepository @Inject()(
  implicit @Named("MongoExecutionContext") ec: ExecutionContext
) {

  // このfindなどで、implicitのexecutionContextを利用するので、
  // ここでデフォルトのExecutionContextを使うのではなく、mongoとの接続に適したExecutionContextを使ってあげる。
  // このように、ミドルウェア毎に利用するExecutionContextを変更することができる。
  // この例では、Futureのapplyに渡された処理が、非同期で処理される。
  // その際に、上記でinjectしているExecutionContextが使われるので、thread数が最大50まで並行処理する。
  def find = {
    Future {
      for (i <- 1 to 50) {
        println(i * 2)
      }
    }
  }
}

詳細はコメントアウトに書いていますが、さっくりした流れは、以下のようになっています。

最終的にはカスタムしたExecutionContextをDIしたい。

  • 1. ExecutionContextを継承したMongoExecutionContextを作成する。
    • 1-1. その中で、スレッドプールを指定したExecutionServiceを作成する
  • 2. AbstractModuleを継承したMongoExecutionModuleを作成する
    • 2-1. @NameでDIできるように、実装する
  • 3. MongoRepositoryで、2で作成したMongoExecutionModuleをDIする

まとめ

先述したように、これまでおまじないのようにExecutionContextを使っていたのですが、自分で作ってみることで理解がグッと深まりました。

参考

実装するにあたり、非常に助けになりました。
- http://tototoshi.hatenablog.com/entry/2015/12/23/154104
- https://qiita.com/sugiyasu-qr/items/d48c04c81ff9a561b5e5
- https://qiita.com/sugiyasu-qr/items/391b2f0523c9974b011d

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2