この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Apache Kafkaとは
Apache Kafkaとは、Linkedinが開発した分散メッセージキューで、
データストリーミング用のプラットフォームです。
メッセージキューとは、システム間でデータのハブととして機能し、
対象データを一時的に保持してくコンポーネントです。
キューをはさむことでシステム間を疎結合にし、通信を非同期化することができます。
AWSでいえば、sqsやkinesisが同様のサービスにあたります。
Apache Kafkaはスケーラビリティや大量データの扱いに長けており、耐障害性もあるスグレモノです。
本稿ではApache Kakfaとnode.js用モジュールのkafka-nodeとkafka-streamsをつかって
node.jsからkafkaへアクセスしてみます。
Amazon Kinesisとの比較
以前はAWSフルマネージド(kinesis)かどうか、という決定的な違いがあったのですが、
2018年11月にAmazon Managed Streaming for Apache Kafka (Amazon MSK)が
リリースされたのでその違いもほぼなくなりました。
※ Amazon MSK : Apache Kafka をストリーミングデータ処理に使用する際のフルマネージド型サービス
ここではkafkaとkinesisの比較をしていますが、
主な違いは
- データ保持期間 : kafka = 無制限 , kinesis = 1日〜7日
- データサイズmax : kafka = default 1MB(設定可能) , kinesis = 1MB
- 依存サービス : kafka = ZooKeeper , kinesis = DynamoDB
というような感じです。
絶対にどちらかしか実現できない機能はありませんが、
最初から大量のストリームデータ配信を想定している場合はkafka、
そうでない場合はkinesisを選択することが多いようです。
参考:
・https://www.ossnews.jp/compare/Apache_Kafka/Amazon_Kinesis
・http://www.itcheerup.net/2019/01/kafka-vs-kinesis/
Apache Kafkaのサンプル
まずはKafkaを動かしてみましょう。
今回はdockerをつかってkafkaを起動し、node.jsでproducer/consumerサンプルの確認をしてみます。
環境
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.12.6
- Docker : 18.09.2
- node.js : v11.14.0
Dockerイメージからkafka用コンテナを作成してコンテナにログインします。
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
$ docker exec -it test_kafka bash
kafkaディレクトリに移動し、topicをtestという名前で作成します。
root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
topicを作成したらnode.jsでProducer側のプログラム(producer.js)を作成します。
本稿では、node.jsからkafkaにアクセスするためのkafka-nodeモジュールを使用します。
事前にkafka-nodeをnpmでインストールしておきましょう。
% cd path/yuor/node-app
% npm install kafka-node
// publisher.js
'use strict';
var kafka = require("kafka-node");
const Producer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({
kafkaHost: "localhost:9092"
});
const producer = new Producer(client, {
partitionerType: 1
});
producer.on("ready", () => {
//プログラム引数で名前と年齢を受け取る
const name = process.argv[2];
const age = process.argv[3];
const message = [
{
topic: "test",
messages: JSON.stringify({name: name, age: age})
}
];
producer.send(message, (err, data) => {
if (err) console.log(err);
else console.log('send messages');
process.exit();
});
});
次にConsumer(consumer.js)を作成します。
'use strict';
var kafka = require('kafka-node');
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: "localhost:9092"});
const consumer = new Consumer(
client,
[{topic: "test", partision:0}],
{
groupId: "my-consumer",
autoCommit: true,
fromOffset: true
}
);
consumer.on("message", (message, err) => {
if (err) console.log("error : " + err);
const json = JSON.parse(message.value);
console.log("JSON:" + JSON.stringify(json));
console.log("Name:" + json.name);
console.log("Age:" + json.age);
});
consumer.on('error', function (err) {
console.log('error', err);
});
producer.jsを実行してkafkaにメッセージを送ります。
$ node producer.js taro 30
send messages
consumerを起動すると、topicに対してメッセージを取得しに行きます。
(先にconsumer.jsを起動していてもメッセージが送られたときに取得する)
$ node consumer.js
JSON:{"name":"taro","age":"30"}
Name:taro
Age:30
これでkafkaの動作確認は終了です。
Kafka Streamsとは
Kafkaの動作確認もできたので、次はKafka Streamsを動かしてみましょう。
Kafka Streamsとは、Apache Kafka v0.10から同梱されているライブラリで、
これを使えばStream処理をある程度簡単に実装できるようになります。
例えば、
「サンプルAのtopicにデータが送られたら、それに対して処理を実行してサンプルBのtopicへ送る」
といった処理が可能になります。
KStreamとKTable
Kafkaに流れてくるStream(key-value形式)には2つの種類があるという考えのもと、
「KStream」「KTable」という2つのStreamタイプを使い分けることができます。
1つ目はKStream(record stream)と呼ばれる、
Kafkaからうけとったデータがそのまま追加されるタイプです。
ここにある例でいうと、↓にあるような2件のStreamデータが流れてきたとします。
// { user(key) : count }
{"alice" : 1}
{"alice" : 3}
このデータはユーザーごとのカウント数を表しており、KStreamを使用しているならcountは4となります。
2つ目はKTable(changelog stream)で、
もしすでに同じキーのデータが存在するならデータが更新されます。
上記2件のStreamデータをKTableで処理した場合、countは3となります。
なお、KTableにおいてNULLを持つデータは、そのデータのキーに対する削除を表します。
その他Kafakaの特徴
他にもKafkaではいろいろな機能を持っています。
Streamとテーブルのjoinができたり、集約(max/min/avg/sum) も可能です。
また、window関数を使用して任意の期間についてStreamデータをグルーピングすることも可能です。
join/aggregation/windowingについては公式ドキュメント等をご確認ください。
node-kafka-streamsを使ったサンプル
では、kafka-streamsを使った簡単なサンプルを作成してみましょう。
ここにあるwordCountを少しかえて試してみます。
ここではinput-topicとoutput-topicの2つのtopicを使用するので、
コンテナにログインして作成しましょう。
root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic
次に、Kafka Streamsを使用するためのモジュール、kafka-streamsもnpmでインストールします。
% cd path/yuor/node-app
% npm install kafka-streams
Kafka Streamsを使ったサンプルです。
ここではinput-topicにメッセージを送り、そのメッセージの数をcountしてoutput-topicに送ります。
例えば、
"ftuit banana"
"ftuit orange"
"ftuit apple"
と3つのメッセージをinput-topicにおくった場合、
"fluit 3"
というふうに、メッセージの最初の文字列(ここだとfluit)をキーとして
その数をoutpu-topicに送ります。
"use strict";
//wordCount.js
const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./config.js");
const keyMapperEtl = (kafkaMessage) => {
const value = kafkaMessage.value.toString("utf8");
console.log("message : " + kafkaMessage);
const elements = value.toLowerCase().split(" ");
return {
someField: elements[0],
};
};
const kafkaStreams = new KafkaStreams(config);
kafkaStreams.on("error", (error) => {
console.log("Error occured:", error.message);
});
const stream = kafkaStreams.getKStream();
//input-topicから取得したデータを
//キー毎にカウントしてoutput-topicに送る(count >= 3のキー)
stream
.from("input-topic")
.map(keyMapperEtl)
.countByKey("someField", "count")
.filter(kv => kv.count >= 3)
.map(kv => kv.someField + " " + kv.count)
.tap(kv => console.log(kv))
.to("output-topic");
Promise.all([
stream.start()
]).then(() => {
console.log("started..");
// 50秒したらStreamをclose
setTimeout(() => {
kafkaStreams.closeAll();
console.log("stopped..");
}, 50000);
});
Streamの設定ファイルです。
"use strict";
//config.js
//dont use these settings for production, it will set your broker on fire..
const batchOptions = {
batchSize: 5,
commitEveryNBatch: 1,
concurrency: 1,
commitSync: false,
noBatchCommits: false
};
const nativeConfig = {
noptions: {
"metadata.broker.list": "localhost:9092", //native client requires broker hosts to connect to
"group.id": "kafka-streams-test-native",
"client.id": "kafka-streams-test-name-native",
"event_cb": true,
"compression.codec": "snappy",
"api.version.request": true,
"socket.keepalive.enable": true,
"socket.blocking.max.ms": 100,
"enable.auto.commit": false,
"auto.commit.interval.ms": 100,
"heartbeat.interval.ms": 250,
"retry.backoff.ms": 250,
"fetch.min.bytes": 100,
"fetch.message.max.bytes": 2 * 1024 * 1024,
"queued.min.messages": 100,
"fetch.error.backoff.ms": 100,
"queued.max.messages.kbytes": 50,
"fetch.wait.max.ms": 1000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 10000
},
tconf: {
"auto.offset.reset": "earliest",
"request.required.acks": 1
},
batchOptions
};
module.exports = {
nativeConfig
};
wordCount.jsを起動し、input-topicに対して下記のようなメッセージをおくってみると、
"fruit banana"
"fruit orange"
"fruit apple"
のような同一キーを指定したメッセージを3つ送ると、
output-topicに対して
"fruit 3"
といったキーのカウント数を送ります。
最後に
今回はApache KafkaとKafka Streamsの動きを簡単に確認してみました。
ストリーム処理が非常に簡単に動かすことができたと思います。
他にも使えそうな機能が多くあるので、確認してみてください。
なお、動作確認がおわったら、不必要なコンテナはdocker rmで削除しておきましょう。
# コンテナを削除
$ docker rm test_kafka
参考サイト
- https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
- https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3
- https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
- https://qiita.com/mkyz08/items/a3b866c46ca49c52e647
- http://pppurple.hatenablog.com/entry/2019/03/28/235810
- https://www.slideshare.net/techblogyahoo/kafka-streams-kafkajp
- https://github.com/SOHU-Co/kafka-node
- https://github.com/tulios/kafkajs
- https://qiita.com/minarai/items/f571db36a19806aee491
- https://github.com/nodefluent/kafka-streams
- https://vicki.substack.com/p/you-dont-need-kafka