MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Spark Streaming と Kryo シリアライザーの話

マイクロアドでアプリケーションエンジニアをしている「G」です。日々、バッチを作ったり WEB アプリケーションを作ったりしています。

今回はライトに Spark Streaming でカスタム Kryo (クライオ)シリアライザーが必要になってしまったお話を書いていきたいと思います。

Spark Streaming とは

Spark Streaming とは Apache Spark という分散コンピューティング基盤上でストリーム処理を行うためのソフトウェアです。

マイクロアドでは秒間数万件以上発生するユーザーのアクセスログの処理のために Spark Streaming を利用しています。

詳細は以下の記事をご確認ください。

developers.microad.co.jp

ブロードキャスト変数とシリアライズの罠

Apache Spark では、メインプログラムが実行されるドライバーと呼ばれるノードがスケジューリングなどの管理を行う一方、実際の分散処理は複数あるワーカーが行います。

ワーカー上で共通のデータを参照する場合などに、ドライバーが変数を生成して、各ワーカーに配布(ブロードキャスト)します。このような変数をブロードキャスト変数と呼びます。

ブロードキャストはドライバーでシリアライズして、転送先の各ワーカーでデシリアライズすることで行います。そのため、ブロードキャスト変数はシリアライズ可能(Serializable)である必要があります。

この場合、以下のようなケースで少し困ったことが発生します。

例えば、各ワーカーが入力用ストリームからレコードを取得し、さらに処理に必要な情報を Redis から取得した上で、結果を Kafka クラスタに書き出すとします。

文章では少しわかりにくいと思いますので、以下にわかりやすい図を載せます。

わかりやすい図
わかりやすい図

このとき、「Redis や Kafka を読み書きするためのクラスのインスタンス」をドライバーで生成し、ワーカーにブロードキャストしようとすると、うまくいきません。

IO に関するクラス(例えば Java の InputStream, OutputStream, ファイル・コネクションが関わるクラス)は一般にはシリアライズ可能でないためです。

詰んだわ。

でもだいじょうぶ。

「IO に関するクラスのインスタンスを生成するクラス」であれば、ブロードキャスト可能です。

実装イメージは以下の通りです。

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  private lazy val producer = createProducer()

  def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}

KafkaSink が「Kafka の Producer クラス(IO に関するクラス)のインスタンスを生成するクラス」ですね。

val sparkConf = new SparkConf().setAppName("test")
val sc = new SparkContext(sparkConf)

val sink = sc.broadcast(new KafkaSink(() => {
  val props = new java.util.Properties()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")

  val producer = new KafkaProducer[String, String](props)

  sys.addShutdownHook {
    producer.close()
  }

  producer
}))

ワーカーでは sink.value.send すれば良いことになります。

言語は Scala なので IO に関するクラスのインスタンスを保持する変数を lazy にできます。ワーカー内で初めて使用されるタイミングで、確実にインスタンスを生成できますね。

※ここまでの本文中で何度も言及し、かつ「わかりやすい図」にも堂々と載せているにもかかわらず Redis のコードが一切ないことにつきましてお詫び申し上げます。

続・ブロードキャスト変数とシリアライズの罠

罠はもう一つあります。

それは Apache Spark で利用されているシリアライズ方式である Kryo に関するものです。

Kryo は Java 標準のシリアライズ方式と比べて、効率とパフォーマンスで優れています。Java 標準で既に Serializable なクラスは Kryo でも特に意識することなくそのままシリアライズ可能です。

そんなふうに考えていた時期が俺にもありました。

シリアライズがうまくいかない具体例の1つとして、Typesafe Config があります。

Kryo がいい感じにやってくれることを信じてストリーム処理を起動すると、以下のような不穏なメッセージが現れます。

2019-01-20 10:00:00.000 [org.apache.spark.internal.Logging$class] WARN Lost task 0.0 in stage 6.0 (TID 9, streaming.example.com, executor 3): java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
Serialization trace:
object (com.typesafe.config.impl.SimpleConfig)
jp$microad$c3po$config$Config$$underlying (jp.microad.c3po.config.Config)
topicConfig (jp.microad.malib.scala.topic.analysis.internal.TopicDaoInitializer)
...

シリアライズが失敗しているので、ストリーム処理も正しく動いていないわけです。雑にコードを追ってみると、以下のような事情がわかります。

シリアライズ時には、別に定義されたカスタムシリアライザーがない場合、対象が持つフィールドを全て自動で取得します。今回 Kryo が Typesafe Config をシリアライズする際も同様です。しかし、その過程で何故か即例外(UnsupportedOperationException)を吐くメソッドを触ってしまい、シリアライズが失敗しているようです。

詰んだわ。

でもだいじょうぶ。

昔々、全く同じ原因でシリアライズがこけて困っている人たちがいまして、彼らは「カスタムシリアライザーを実装する」ことで問題を解決していたからです。

github.com

コードを見てもらうとわかりますが、カスタムシリアライザーとはいっても非常にシンプルな作りです。

これは Typesafe Config の場合

  • Config クラスと同等の情報をもった Map を Config クラス自体が保持している
  • Factory クラスが用意されており上記の Map から簡単に Config クラスが復元できる

という理由により、Map だけをシリアライズすれば良いためです。単純ですね。

元のコードは Java ですが Scala でも難しくなさそうです。

さてお待ちかね、Scala のコードです↓↓↓↓

Scala コード

package com.example.serializer

import com.esotericsoftware.kryo.{Kryo, Serializer, Registration}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.serializers.MapSerializer
import com.typesafe.config.{Config, ConfigFactory}

class ConfigSerializer extends Serializer[Config] {

  override def write(kryo: Kryo, output: Output, obj: Config): Unit =
    kryo.writeObject(output, obj.root.unwrapped, new MapSerializer)

  override def read(kryo: Kryo, input: Input, t: Class[Config]): Config =
    ConfigFactory.parseMap(
      kryo.readObject(input, classOf[java.util.HashMap[String, Object]], new MapSerializer)
    )
}

object ConfigSerializer {

  def register(kryo: Kryo): Registration = {
    // com.typesafe.config.impl.SimpleConfig は Scala では package private なので Java-style でアクセスする
    kryo.register(Class.forName("com.typesafe.config.impl.SimpleConfig"), new ConfigSerializer)
  }
}

ほぼ元の Java コードと同じように書けました。Scala としては多少不自然な書き方になった理由はコメントの通りです。

また Spark Streaming から利用するためには、以下のようにカスタムシリアライザーを登録する必要があります。

package com.example.stream

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import com.example.serializer.ConfigSerializer

class StreamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    ConfigSerializer.register(kryo)
  }
}

これをドライバーが利用する SparkConf オブジェクトに登録します。

val sparkConf = new SparkConf().setAppName("test")
    .set("spark.kryo.registrator", "com.example.stream.StreamKryoRegistrator")

今度こそ本当に自由自在にストリーム処理が作れそうですね。

Happy Streaming!

おわりに

いかがでしたか?

ストリーム処理も色々大変なんだなぁと思って頂ければ幸いです。

それではご縁がありましたらまたお会いしましょう。