はじめに
Elastic社の提供するETLツールLogstashを利用して、Amazon Kinesis Data Streamsにログデータを出し入れしてみました。
【参考】
・Amazon Kinesis Data Streamsとは
・AWS kinesisまとめ
※かなり雑に言うとApache KafkaのAWSマネージドサービスです。
利用環境と構成図
product | version |
---|---|
Filebeat | 6.6.0 |
Logstash | 6.6.0 |
Elasticsearch | 6.6.0 |
Kibana | 6.6.0 |
OS(EC2) | Amazon Linux2 |
※Elastic Stackは現行最新版の6.6.0を利用しています。
※AWSリージョンはバージニア(us-east-1)を利用しています。
敢えて「Logstash → Kinesis Data Streams」と「Kinesis Data Streams → Logstash」の両方向を実施しています。
- LogstashからKinesis Data Streamsにデータを置くときはkinesis output pluginを利用します。
- LogstashでKinesis Data Streamsからデータを取り出すときはkinesis input pluginを利用します。
前提
Logstash_put-kinesisマシンのJava、Logstash、Filebeatのインストールは省略しています。
またLogstash_get-kinesisマシンのJava、Logstash、Elasticsearch、Kibanaのインストールも省略しています。
上記環境が揃っていることを前提としています。
Logstash kinesis output pluginとは
Logstashの取り込んだログデータの出力先として、Amazon Kinesis Data Streamsを指定することが出来ます。
上記機能を実現するのがkinesis output pluginになります。
このPluginはElastic社公式のPluginではなく、Community Pluginになります。
設定項目 | デフォルト値 | 説明 |
---|---|---|
stream_name | - (必須) | kinesisストリーム名を指定します。 |
region | us-east-1 (任意) | Kinesis Data StreamsのAWSリージョンを指定します。 |
metrics_level | cloudwatch (任意) | メトリック統計情報のCloudWatchへの送信有無を指定します。 |
access_key | - (任意) | Kinesisに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。 |
secret_key | - (任意) | アクセスキーを利用する場合の対になるシークレットキーを指定します。 |
metrics_access_key | - (任意) | CloudWatchに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。 |
metrics_secret_key | - (任意) | CloudWatchのアクセスキーを利用する場合の対になるシークレットキーを指定します。 |
role_arn | - (任意) | AssumeRoleを利用する場合のARNを指定します。 |
metrics_role_arn | - (任意) | CloudWatchへの書き込みでAssumeRoleを利用する場合のARNを指定します。 |
event_partition_keys | - (任意) | 各レコードに挿入するパーティションキーを指定します。 |
randomized_partition_key | false (任意) | データ送信順序を無視したランダムなパーティションキーの利用有無を指定します。 |
aggregation_enabled | true (任意) | Kinesisに投入するレコードを集約して送るか指定します。 |
max_pending_records | 1000 (任意) | Kinesis高負荷時のLogstash側でのバッファリングするレコード数を指定します。 |
Logstash kinesis input pluginとは
Logstashがログデータの取得先として、Amazon Kinesis Data Streamsを指定することが出来ます。
上記機能を実現するのがkinesis input pluginになります。
こちらのPluginはElastic社公式のPluginになりますが、サポート対象外です(´;ω;`)
設定項目 | デフォルト値 | 説明 |
---|---|---|
application_name | logstash (任意) | dynamodb調整テーブルに使用されるアプリケーション名を指定します。 |
checkpoint_interval_seconds | 60 (任意) | dynamodbをチェックにしに行くインターバル(秒)を指定します。 |
kinesis_stream_name | - (必須) | kinesisストリーム名を指定します。 |
metrics | nil (任意) | メトリック統計情報のCloudWatchへの送信有無を指定します。 |
profile | - (任意) | AWS認証情報を参照する場合のファイルパスを指定します。 |
region | us-east-1 (任意) | Kinesis Data StreamsのAWSリージョンを指定します。 |
role_arn | - (任意) | AssumeRoleを利用する場合のARNを指定します。 |
role_session_name | - (任意) | IAMロールを引き受けるときに使用するセッション名を指定します。 |
設定手順
- Kinesis Streamsの作成
- IAM Roleの作成
- Pluginインストール
- logstash.confの設定
1. Kinesis Streamsの作成
[Kinesisストリームの名称]は
kinesis_poc
とします。シャード数はサイジング要件に応じてセットすれば良いですが、今回は
1
とします。
※Kinesis Data Streamのサイジングや動きに関して、以下の記事が良くまとまっています。
【参考】
・ AWS Kinesis Stream を大規模データで検証してわかったことの事例紹介
2. IAMロールの作成
- Logstash_put-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
- Logstash_get-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:Scan",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
"dynamodb:DeleteItem",
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
IAM ポリシーとユーザーの作成に必要なIAM Policyが記載されていますが、これでは足りませんでした。
権限が不足している場合は、/var/log/messages
に以下のようなエラーが出力されますので、見逃さないように!!
※以下の例はdynamodb:DescribeTable
が不足しています。
Feb 13 16:01:39 ip-172-31-1-190 logstash: Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:sts::<AWSアカウント>:assumed-role/kinesis_get_role/i-03c32b05117b3235a is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-east-1:<AWSアカウント>:table/logstash (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: BQRBG0RSE187CLGTP802ONS5BFVV4KQNSO5AEMVJF66Q9ASUAAJG)
3. Pluginインストール
標準でインストールされているPluginは/usr/share/logstash/Gemfile
を参照することで把握することが出来ます。
※ちなみに今回対象の2つのPluginはいずれも標準では未インストールです。追加でインストールしましょう。
[root@ip-172-31-1-190 ~]# cat /usr/share/logstash/Gemfile
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.
source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
gem "logstash-core-plugin-api", :path => "./logstash-core-plugin-api"
gem "paquet", "~> 0.2.0"
gem "ruby-progressbar", "~> 1.8.1"
gem "builder", "~> 3.2.2"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.5", :group => :development
gem "logstash-devutils", "= 1.3.5", :group => :development
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.22", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.2.1", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "rack-test", :require => "rack/test", :group => :development
gem "flores", "~> 0.0.6", :group => :development
gem "term-ansicolor", "~> 1.3.2", :group => :development
gem "json-schema", "~> 2.6", :group => :development
gem "belzebuth", :group => :development
gem "pleaserun", "~>0.0.28"
gem "webrick", "~> 1.3.1"
gem "atomic", "<= 1.1.99"
gem "rake", "~> 12.2.1", :group => :build
gem "logstash-codec-cef"
gem "logstash-codec-collectd"
gem "logstash-codec-dots"
gem "logstash-codec-edn"
gem "logstash-codec-edn_lines"
gem "logstash-codec-es_bulk"
gem "logstash-codec-fluent"
gem "logstash-codec-graphite"
gem "logstash-codec-json"
gem "logstash-codec-json_lines"
gem "logstash-codec-line"
gem "logstash-codec-msgpack"
gem "logstash-codec-multiline"
gem "logstash-codec-netflow", ">=3.14.1", "<4.0.0"
gem "logstash-codec-plain"
gem "logstash-codec-rubydebug"
gem "logstash-filter-aggregate"
gem "logstash-filter-anonymize"
gem "logstash-filter-cidr"
gem "logstash-filter-clone"
gem "logstash-filter-csv"
gem "logstash-filter-date"
gem "logstash-filter-de_dot"
gem "logstash-filter-dissect"
gem "logstash-filter-dns"
gem "logstash-filter-drop"
gem "logstash-filter-elasticsearch"
gem "logstash-filter-fingerprint"
gem "logstash-filter-geoip"
gem "logstash-filter-grok"
gem "logstash-filter-http"
gem "logstash-filter-jdbc_static"
gem "logstash-filter-jdbc_streaming"
gem "logstash-filter-json"
gem "logstash-filter-kv"
gem "logstash-filter-memcached"
gem "logstash-filter-metrics"
gem "logstash-filter-mutate"
gem "logstash-filter-ruby"
gem "logstash-filter-sleep"
gem "logstash-filter-split"
gem "logstash-filter-syslog_pri"
gem "logstash-filter-throttle"
gem "logstash-filter-translate"
gem "logstash-filter-truncate"
gem "logstash-filter-urldecode"
gem "logstash-filter-useragent"
gem "logstash-filter-xml"
gem "logstash-input-beats"
gem "logstash-input-azure_event_hubs"
gem "logstash-input-dead_letter_queue"
gem "logstash-input-elasticsearch"
gem "logstash-input-exec"
gem "logstash-input-file"
gem "logstash-input-ganglia"
gem "logstash-input-gelf"
gem "logstash-input-generator"
gem "logstash-input-graphite"
gem "logstash-input-heartbeat"
gem "logstash-input-http"
gem "logstash-input-http_poller"
gem "logstash-input-imap"
gem "logstash-input-jdbc"
gem "logstash-input-kafka"
gem "logstash-input-pipe"
gem "logstash-input-rabbitmq"
gem "logstash-input-redis"
gem "logstash-input-s3"
gem "logstash-input-snmp"
gem "logstash-input-snmptrap"
gem "logstash-input-sqs"
gem "logstash-input-stdin"
gem "logstash-input-syslog"
gem "logstash-input-tcp"
gem "logstash-input-twitter"
gem "logstash-input-udp"
gem "logstash-input-unix"
gem "logstash-output-elastic_app_search"
gem "logstash-output-cloudwatch"
gem "logstash-output-csv"
gem "logstash-output-elasticsearch"
gem "logstash-output-email"
gem "logstash-output-file"
gem "logstash-output-graphite"
gem "logstash-output-http"
gem "logstash-output-kafka"
gem "logstash-output-lumberjack"
gem "logstash-output-nagios"
gem "logstash-output-null"
gem "logstash-output-pagerduty"
gem "logstash-output-pipe"
gem "logstash-output-rabbitmq"
gem "logstash-output-redis"
gem "logstash-output-s3", ">=4.0.9", "<5.0.0"
gem "logstash-output-sns"
gem "logstash-output-sqs"
gem "logstash-output-stdout"
gem "logstash-output-tcp"
gem "logstash-output-udp"
gem "logstash-output-webhdfs"
- Logstash_put-kinesisには
kinesis output plugin
をインストールします。
[root@ip-172-31-1-235 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kinesis
Validating logstash-output-kinesis
Installing logstash-output-kinesis
Installation successful
- Logstash_get-kinesisには
kinesis input plugin
をインストールします。
[root@ip-172-31-9-35 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-kinesis
Validating logstash-input-kinesis
Installing logstash-input-kinesis
Installation successful
4. logstash.confの設定
- Logstash_put-kinesisのlogstash.confを作成します。
input {
beats {
port => 5044
}
}
output {
kinesis {
stream_name => "kinesis_poc"
max_pending_records => 10000
randomized_partition_key => true
region => "us-east-1"
}
}
※Filebeatからログデータを受信したものをKinesisに流します。
※複数のシャードに分散する時はrandomized_partition_key
を有効化します。
- Logstash_get-kinesisのlogstash.confを作成します。
input {
kinesis {
kinesis_stream_name => "kinesis_poc"
application_name => "kinesis_poc"
checkpoint_interval_seconds => 10
metrics => "cloudwatch"
region => "us-east-1"
codec => json {}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
※今回は同じOS内にインストール済のElasticsearchにoutputして、Kibanaで可視化出来るようにしています。
codecを指定しないと以下のように全てmessage
にvalueとして入ってしまったので、codec => json {}
を記載しています。
実施結果
- Kinesisデータストリームの[モニタリング]タブでPUTレコードにデータが流れてくるのが確認できれば、Logstash_put-kinesisは成功です。
- レコードの取得にデータが流れてくるのが確認できれば、Logstash_get-kinesisも成功です。
公式サイトには、なぜかオプションとして記載がないのですが、githubにはイテレータの指定が2つのみですが、出来ると書いてあります。
まとめ
いかがでしたでしょうか。
これまではAWS上のDataLakeとしてS3 Bucketを利用することが多かった私ですが
DataLakeから大量のログデータを拾い、分析に必要なストアへリアルタイムに投入するには
スケールアウト出来るアーキテクチャが必要になりましたので、この度試してみました!!
意外とLogstashとKinesis Data Streamsの連携に関する情報が少ないので、何かの役に立てれば幸いです^^