LoginSignup
2
2

More than 3 years have passed since last update.

WindowsのログをWinlogbeatとLogstash経由でBigQueryに取り込む

Last updated at Posted at 2020-04-02

はじめに

WindowsのイベントログをWinlogbeatで取得して、Logstash経由でBigQueryに取り込む手順を記載しています。
環境は以下の通りです。

  • Windows Server 2016
  • Winlogbeat 7.6
  • Logstash 7.6.1
  • LogstashはDockerで稼働

dockerの使い方は知っている前提で記載しています。

手順

1. Winlogbeatのインストール

ダウンロード

以下からダウンロードします。
https://www.elastic.co/jp/downloads/beats/winlogbeat

展開

C:\Program Files に展開し、ディレクトリ名をwinlogbeat-<version>からWinlogbeatに変更します。

インストール

管理者権限でPowershellを起動し、以下のコマンドを実行します。

cd 'C:\Program Files\Winlogbeat'
.\install-service-winlogbeat.ps1

以下の結果が出力されればOKです。

Status   Name               DisplayName
------   ----               -----------
Stopped  winlogbeat         winlogbeat

2. Winlogbeatの設定

C:\Program Files\Winlogbeat\winlogbeat.ymlを開き設定をします。

取得するログを選ぶ

以下で取得するログ種別を選びます。
私の場合、セキュリティイベントログだけを取得したいのでname: Securityの項目以外をコメントアウトしました。

winlogbeat.event_logs:
  # - name: Application
  #   ignore_older: 72h

  # - name: System

  - name: Security
    processors:
      - script:
          lang: javascript
          id: security
          file: ${path.home}/module/security/config/winlogbeat-security.js

  # - name: Microsoft-Windows-Sysmon/Operational
  #   processors:
  #     - script:
  #         lang: javascript
  #         id: sysmon
  #         file: ${path.home}/module/sysmon/config/winlogbeat-sysmon.js

アウトプットの指定

今回はWinlogbeatからLogstashに出力するので、まずElasticsearch outputをコメントアウトします。

#-------------------------- Elasticsearch output ------------------------------
# output.elasticsearch:
  # Array of hosts to connect to.
  # hosts: ["localhost:9200"]

次にLogstash outputを指定します。

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:15044"]

winlogbeat.ymlのテスト

以下のコマンドを実行してください。
実行結果の最後にConfig OKと表示されていれば問題ありません。

.\winlogbeat.exe test config -c .\winlogbeat.yml -e

3. Logstashまでログが届くか確認

WindowsのログがWinlogbeat経由でLogstashまでログが届くか確認します。
evt.confdocker-compose.ymlを以下の内容で作成してください。

evt.conf
input {
  beats {
    port => 15044
  }
}

output {
  stdout {
  }
}
docker-compose.yml
logstash:
  image: docker.elastic.co/logstash/logstash:7.6.1
  ports:
    - "15044:15044"
  volumes:
    - ./evt.conf:/usr/share/logstash/pipeline/evt.conf

その後以下のコマンドでLogstashコンテナの起動と、Winlogbeatの起動を行ってください。

Docker側
docker-compose up
Powershell側
Start-Service winlogbeat

するとLogstashコンテナ側の標準出力にWindows Serverのログが表示されます。
無事に表示されたら以下のコマンドで停止したのち、次の項目に進んでください。

Docker側
ctrl + c
Powershell側
Stop-Service winlogbeat

4. BigQuery側の準備

サービスアカウント作成

IAMと管理からサービスアカウントを選択し、[サービスアカウントを作成]をクリックしてください。

image.png

次に[サービスアカウント名]を適当に入力し、[作成]をクリックしてください。

image.png

ロールを選択で、[BigQueryデータ編集者]を選択し、[続行]をクリックしてください。

image.png

[キーを作成]をクリックし、[JSON]が選択された状態で[作成]をクリックしてください。

image.png

JSONファイルがダウンロードされたら、最後に[完了]をクリックしてください。

BigQueryのデータセット作成

[データセットを作成]をクリックしてください。

image.png

[データセットID]を入力し、[データセットを作成]をクリックしてください。

image.png

5. Docker側の準備

Dockerイメージの作成

LogstashからBigQueryにログを送信するには、Google BigQuery output pluginが必要になります。
そのためlogstashコンテナにGoogle BigQuery output pluginをインストールしたDockerイメージを作成します。
まずDockerfileを作成します。

Dockerfile
FROM logstash:7.6.1
RUN logstash-plugin install logstash-output-google_bigquery

次にDockerfileと同じフォルダで以下のコマンドを実行し、Dockerイメージを作成します。

docker build -t logstash_bqplugin .

これでlogstash-output-google_bigqueryがインストールされたlogstash_bqpluginという名前のコンテナが作成されます。

docker-compose.ymlの変更

前の手順で作成したdocker-compose.ymlを変更します。
****.jsonには前の手順でダウンロードしたjsonファイルを指定してください。

docker-compose.yml
logstash:
  image: logstash_bqplugin
  container_name: eventlog
  ports:
    - "15044:15044"
  volumes:
    - ./evt.conf:/usr/share/logstash/pipeline/evt.conf
    - ./****.json:/usr/share/logstash/config/****.json
  environment:
    - "xpack.monitoring.enabled=false"

6. Logstashの設定ファイルの変更

前の手順で作成したevt.confを変更します。
project_idにはGCPのプロジェクト名、json_key_fileには前の手順で作成したjsonファイル名を入力してください。
以下の設定ファイルではevent_idとSubjectUserNameと日時の情報しか取得していませんが、必要に応じて変更してください。

evt.conf
input {
  beats {
    port => 15044
  }
}

filter {
    # 日時の整形
    ruby {
        code => "event.set( 'date', Time.parse( event.get('[event][created]').to_s ).localtime('+09:00').strftime('%Y-%m-%dT%H:%M:%S.%N+09:00') )"
    }

    # フィールド名の整形
    mutate {
        rename => {
          "[winlog][event_id]" => "event_id"
          "[winlog][event_data][SubjectUserName]" => "SubjectUserName"
        }
    }

    # 取り込むフィールドの抽出
    prune {
        whitelist_names => [ "date", "event_id", "SubjectUserName"]
    }
}

output {
  google_bigquery {
    project_id => "****"
    dataset => "winlogbeat"
    table_prefix => "windows"
    table_separator => "-"
    date_pattern => "%Y_%m_%d"
    csv_schema => "date:TIMESTAMP,event_id:INTEGER,SubjectUserName:STRING"
    json_key_file => "/usr/share/logstash/config/****.json"
  }
}

7. WinlogbeatとLogstashコンテナの起動

以下のコマンドでLogstashコンテナの起動と、Winlogbeatの起動を行ってください。

Docker側
docker-compose up
Powershell側
Start-Service winlogbeat

これでBigQueryへのログ送信が開始します。

8. BigQuery確認

winlogbeatデータセットにログが取り込まれました。

image.png

参考

Get started with Winlogbeat
https://www.elastic.co/guide/en/beats/winlogbeat/current/winlogbeat-getting-started.html

BigQuery の事前定義された Cloud IAM の役割
https://cloud.google.com/bigquery/docs/access-control?hl=ja#bigquery

エラー対応
ポート番号を5044ではなく15044にしている理由は、5044だと以下のエラーが発生したためです。

A plugin had an unrecoverable error. Will restart this plugin
Address already in use

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2