LoginSignup
1

More than 3 years have passed since last update.

Amazon Inspectorの結果を前回結果と比較してレポートする

Last updated at Posted at 2019-06-24

Inspectorの実行結果をCSVファイルとして出力する の続き。

概要

前回紹介した「Inspectorの実行結果をCSVファイルとして出力する」のLambdaファンクションの終了をトリガーに、今回紹介する新しいLambdaファンクションを起動する。このLambdaファンクションは以下の処理を実行する。

  • 直近と前回のInspectorの実行結果を比較して差分をCSVレポートに出力する。
  • CSVファイルをS3へアップロードし、Pre-Signed URLを生成して関係者へSNS経由で通知する。

qiita_inspector_diff_report.png

図の上段のLambdaが前回紹介した部分。今回紹介する部分は下段のLambda。

前提として、前回紹介したInspectorの実行結果をCSVファイルとして出力するのLambdaファンクションにより、毎回のInspectorの実行結果はCSVでS3上に出力されているものとする。これらを分析するため、あらかじめAthenaのテーブルを作成しておく必要がある。

Athenaのテーブル作成

今回、CSVファイルの差分検出にはAthenaを使う。前回の記事で生成したCSVファイルのレイアウトをAthenaが認識できるよう、テーブルを作成しておく必要がある。ファイルの容量は大したことないので、パーティションは考慮しない。

inspector.sql
CREATE EXTERNAL TABLE `inspector`(
  `arn` string COMMENT 'from deserializer', 
  `assessmentrunarn` string COMMENT 'from deserializer', 
  `agentid` string COMMENT 'from deserializer', 
  `rulespackagename` string COMMENT 'from deserializer', 
  `severity` string COMMENT 'from deserializer', 
  `numericseverity` string COMMENT 'from deserializer', 
  `confidence` string COMMENT 'from deserializer', 
  `id` string COMMENT 'from deserializer', 
  `title` string COMMENT 'from deserializer', 
  `description` string COMMENT 'from deserializer', 
  `recommendation` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ( 
  'escapeChar'='\n', 
  'quoteChar'='\"', 
  'separatorChar'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://hoge-bucket'
TBLPROPERTIES (
  'skip.header.line.count'='1'
)

差分抽出クエリ

Inspectorの前回結果と今回結果の差分を取得するクエリは、Lambdaファンクション内で以下のコードによって生成している。

sample.py
#Inspectorの前回結果と今回結果の差分を取得するクエリを生成して返す
def createQuery(assessmentRuns):
    return """
with bef as (
    select 
        agentid,
        rulespackagename,
        severity,
        numericseverity,
        id
    from inspector
    where
        assessmentrunarn = '{}'
),
aft as (
    select 
        agentid,
        rulespackagename,
        severity,
        numericseverity,
        id
    from inspector
    where 
        assessmentrunarn = '{}'
),
mst as (
    select distinct
        computername,
        instanceid
    from ssminventory.aws_instanceinformation
)

select 
    '-' as diff,
    bef.agentid,
    mst.computername,
    bef.rulespackagename,
    bef.severity,
    bef.numericseverity,
    bef.id
from bef
left outer join aft on 
    bef.agentid = aft.agentid
    and bef.rulespackagename = aft.rulespackagename
    and bef.severity = aft.severity
    and bef.id = aft.id
left outer join mst on
    bef.agentid = mst.instanceid
where
    aft.id is null
union
select
    '+' as diff,
    aft.agentid,
    mst.computername,
    aft.rulespackagename,
    aft.severity,
    aft.numericseverity,
    aft.id
from aft
left outer join bef on 
    aft.agentid = bef.agentid
    and aft.rulespackagename = bef.rulespackagename
    and aft.severity = bef.severity
    and aft.id = bef.id
left outer join mst on
    aft.agentid = mst.instanceid
where
    bef.id is null
order by 3,1,4,7
""".format(
    #前回のassessmentRunsArn
    assessmentRuns[1]['arn'],
    #今回のassessmentRunsArn
    assessmentRuns[0]['arn']
    )

まずはwith句で3つのサブテーブルを定義。(ここではbefaftmstの3つ。)
befはInspectorの前回実行結果、aftは今回実行結果。mstはEC2のインスタンスIDからホスト名を導出するためだけに使用しており、今回のテーマにはあまり関係ない。
AWS Systems Manager インベントリマネージャーの機能を使ってEC2のインベントリ情報をS3に集約して管理しており、それをさらにAthenaで利用可能にしているのだが、機会があれば別記事で紹介するものとして、ここでは深く掘り下げない。

befにしか存在しない脆弱性は解消したものとして-aftにしか存在しない脆弱性は新たに検出されたものとして+を付加する。
-のリストと+のリストはunion句で連結し、1回のクエリでまとめて取得できるようにする。

出力サンプル

出力ファイルのイメージはこんなかんじ。

sample.csv
"diff","agentid","computername","rulespackagename","severity","numericseverity","id"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","9.0","CVE-2019-xxxxx"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","8.0","CVE-2019-yyyyy"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","7.0","CVE-2019-zzzzz"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","9.0","CVE-2018-11111"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","8.0","CVE-2018-22222"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","7.0","CVE-2018-33333"

前回のInspector実行結果と比較してsamplehost01というサーバでは新規に3件の脆弱性が検出されて、かわりに3件の脆弱性が解消されたことがわかる。Excelで開くとまあまあいい感じのレポートである。

以下、Lambdaファンクションの実装の内容について紹介する。

Lambdaのトリガー

前回記事のLambdaファンクションの終了をトリガーに本処理をキックできると、Inspector終了→脆弱性リスト出力→前回と今回の差分出力 と一連の流れがスムーズに実行される。
特に連携すべきデータもないので、前回作成したLambdaファンクションの最後にSNSへPublishしているトピックを今回作成するLambdaファンクションがSubscribeすることでLambdaファンクションを数珠つなぎにするのが手っ取り早い。SNSのメッセージ内容は使用せず、単純な起動トリガーとしてのみ使用する。
(Lambdaの数珠つなぎはアンチパターン?今回は単純な処理なので、気にしない。)

InspectorのテンプレートARNを取得

Athenaのテーブルを検索するにはInspectorの実行に紐づくassessmentRunArnが必要。
assessmentRunArnを検索するにはassessmentTemplateのArnが必要なので、
Inspectorテンプレート名をキーにしてテンプレートARNを取得する。
(相変わらず例外処理はあまり考慮していないので、そこは適宜補完してください。)

sample.py
#Inspectorテンプレート名をもとにテンプレートARNを取得して返す
def get_assessment_template_arn(name):
    response = inspector.list_assessment_templates(
        filter={
            'namePattern': name
        }
    )
    return response['assessmentTemplateArns'][0]

assessmentRunArnsのリストを取得

テンプレートARNをもとに、assessmentRunのリストを新しい順で取得する。
一方で、list_assessment_runsは最大500件のassessmentRunArnsを返す。
Inspectorの実行開始日時でフィルタすることで、500件以上は取得対象が存在しないことを前提としている。

sample.py
#テンプレートARNをもとに、今日~days日前の期間中のassessmentRunのリストを実施日の逆順にソートして返す
def get_assessment_run_arn(assessmentTemplateArn,days=365):
    #基準日の範囲内のassessment_runを取得(最大500件。指定期間内に500件以上のassessmentRunが存在することは考慮しない)
    assessmentRunArns = inspector.list_assessment_runs(
        assessmentTemplateArns= [
            assessmentTemplateArn
        ],
        filter={
            'states': [
                'COMPLETED'
            ],
            'startTimeRange': {
                'beginDate': datetime.datetime.now() - datetime.timedelta(days),
                'endDate': datetime.datetime.now()
            }
        },
        maxResults = 500
    )['assessmentRunArns']
    if len(assessmentRunArns)==0:
        raise Exception('該当期間のassessmentRunArnsが見つかりません。')

    assessmentRuns=[]
    #ARNをもとに詳細を取得
    #describe_assessment_runs は10件ずつしか処理できないため、10件(上限値)ずつassessmentRunを取得
    for i in range(0, -(-len(assessmentRunArns) // 10 )): #assessmentRunArnsの件数を10で割り算して切り上げ
        response = inspector.describe_assessment_runs(
            assessmentRunArns= assessmentRunArns[ i*10 : min( i*10+10 , len(assessmentRunArns) ) ]
        )
        assessmentRuns.extend(response['assessmentRuns'])
    #nameの逆順でソート(=新しい順にソート)
    sortedList = sorted(assessmentRuns, key=lambda x:x['createdAt'],reverse=True)
    return sortedList

クエリ実行

材料は揃ったのでクエリ実行してみよう。

sample.py
#Athenaのクエリを実行し、QueryExecutionIdを返す
def athena_query_execution(bucketname,dbname,querystring):
    try:
        response = athena.start_query_execution(
            QueryString=querystring,
            QueryExecutionContext={
                'Database': dbname
            }
            ,
            ResultConfiguration={
                'OutputLocation': 's3://' + bucketname + '/' + os.environ['AWS_LAMBDA_FUNCTION_NAME']+'/'
                ,
                'EncryptionConfiguration': {
                    'EncryptionOption': 'SSE_S3'
                }
            }
        )
        queryexecutionid = response['QueryExecutionId']
    except Exception as e:
        raise e
    return queryexecutionid

Athenaのクエリ実行は非同期処理になる点に注意。
クエリ実行すると、実行状況や結果をトレースするのに必要なQueryExecutionIdが得られる。これを使って処理完了までポーリングしてステータスをチェックする。

クエリ実行結果取得

Athenaのクエリ処理時間が長い場合、ひとつのLambdaファンクション内で完了を待ち続ける設計はよろしくないと思われるが、今回のクエリであれば所要時間はせいぜい10秒くらいなので、同一Lambdaファンクション内でAthenaのクエリ実行結果をポーリングして待つことにする。

sample.py
#Athenaのクエリ実行結果をポーリングして待ち、出力ファイルののS3パスを返す
def athena_get_query_results(queryexecutionid):
    #実行結果を2秒おきに確認する
    x=0
    for x in range(1,10):
        sleep(2)
        response = athena.get_query_execution(
            QueryExecutionId=queryexecutionid
        )
        status=response['QueryExecution']['Status']['State']
        if status in {"QUEUED","RUNNING"}:
            continue
        else:
            break

    if status == "SUCCEEDED":
        #S3のファイルパスを返す
        return response['QueryExecution']['ResultConfiguration']['OutputLocation']
    elif status == "CANCELLED":
        raise Exception('sql execution CANCELLED error')
    elif status == "FAILED":
        raise Exception('sql execution FAILED error')
    elif status in {"QUEUED","RUNNING"}:
        raise Exception('sql execution NOT COMPLETE error')
    else:
        raise Exception('sql execution UNKNOWN STATUS error')

クエリ実行結果ファイルの文字コード変換

最終的なcsvファイルをExcelで開けるようにするため、文字コード変換を行う。そのためにわざわざ一時的にS3からLambda実行環境へ一時的にダウンロードする。以下の関数に与えるfilepathは先程のathena_get_query_resultsの戻り値、つまりAthenaの実行結果が格納されているS3のパス。

sample.py
#S3に出力されたAthenaのクエリ実行結果ファイルをローカルにダウンロードし、文字コード変換を行い、変換後ファイルのパスを返す
def get_csv_file(filepath):
    #CSVファイル取得
    bucket = 'hogehoge'
    key = filepath.split(bucket + "/")[1]
    local_filepath = '/tmp/' + os.path.basename(filepath).replace(".csv","_utf8.csv")
    try:
        s3 = boto3.resource('s3', config=Config(signature_version='s3v4'))
        s3.Bucket(bucket).download_file(key, local_filepath)
    except Exception as e:
        print(e)
    # UTF-8 ファイルのパス
    utf8_csv_path = local_filepath
    # Shift_JIS ファイルのパス
    shiftjis_csv_path = local_filepath.replace("_utf8.csv",".csv")
    # 文字コードを Shift_JIS に変換して保存
    fin = codecs.open(utf8_csv_path, "r", "utf-8")
    fout_utf = codecs.open(shiftjis_csv_path, "w", "cp932", "ignore")
    for row in fin:
        fout_utf.write(row)
    fin.close()
    fout_utf.close()
    return shiftjis_csv_path

signature_version='s3v4'でさり気なくS3の署名バージョン対応を入れている。参考記事。

CSVファイルをS3へアップロードしてPresignedURLを生成

これ以降の処理は前回の記事Inspectorの実行結果をCSVファイルとして出力する とさほど変わらない。

sample.py
def s3_upload(filepath,basetime):
    #IAMロールを使用してS3のPresignedURLを作成すると、IAMロールの有効期限(1時間)が切れると無効になってしまうため
    #より長い有効期間のクレデンシャルを使用する。get_parameters()の内容は前回記事参照。
    accesskey,secretkey = get_parameters()
    try:
        s3 = boto3.resource(
            's3',
            aws_access_key_id=accesskey,
            aws_secret_access_key=secretkey,
            config=Config(signature_version='s3v4')
        )
        s3c = boto3.client(
            's3',
            aws_access_key_id=accesskey,
            aws_secret_access_key=secretkey,
            config=Config(signature_version='s3v4')
        )
        #S3バケット名
        bucket = s3.Bucket('hogehoge')
        #S3ファイル名
        s3_filepath = 'inspector_diff_' + basetime.strftime('%Y-%m-%d') + ".csv"
    except Exception as e:
        print(e)
    #S3アップロード
    try:
        bucket.upload_file(filepath, s3_filepath)
    except Exception as e:
        print(e)
    #Pre-SignedURLの作成
    try:
        url = s3c.generate_presigned_url(
            ClientMethod = 'get_object',
            Params = {
                'Bucket' : bucket.name,
                'Key' : s3_filepath
            },
            ExpiresIn = expired_hour*60*60,
            HttpMethod = 'GET'
        )
    except Exception as e:
        print(e)
    return url

ファイル名は脆弱性診断実施日ごとにユニーク。同一日に複数回脆弱性診断を実行している場合はファイルを上書きする仕様。

SNSへPublish

前回の記事と同じ。

sample.py
#SNSへ通知する
def sns_publish(topicarn,url):
    #SNS title
    title = "Inspectorの差分分析結果レポート"
    #SNS本文
    message=u"""Inspector分析結果レポートを作成しました。
{}
こちらのURLからダウンロード可能です。
ダウンロード期限は{}時間です。

""".format(
        url,
        expired_hour
    )
    #SNS Publish
    try:
        response = sns.publish(
            TopicArn = topicarn,
            Message = message,
            Subject = title
        )
    except Exception as e:
        print(e)
        raise e

まとめ

全体を繋げた処理はこんな感じ?

sample.py
# -*- coding: utf-8 -*-
import boto3
import os
import datetime
import csv
from   time import sleep
import codecs
from botocore.client import Config

sns       = boto3.client('sns')
ssm       = boto3.client('ssm')
inspector = boto3.client('inspector')
athena    = boto3.client('athena')

#Presigned-URLの有効期間(時間)
expired_hour = 24

#S3アップロード用IAMユーザの認証情報を格納したパラメータストア名
accesskey_name="/hogehoge/accesskey"
secretkey_name="/hogehoge/secretkey"

#Athenaデータベース名
dbname = 'hoge-athena-dbname'
#Inspector実行テンプレート名
templatename = 'hoge-template-name'

def lambda_handler(event, context):
    #テンプレート名に対応するTemplateArnを取得
    assessmentTemplateArn = get_assessment_template_arn(templatename)
    #テンプレートARNに紐づく、過去の実行履歴(30日前まで検索)
    assessmentRuns = get_assessment_run_arn(assessmentTemplateArn,days=30)
    #直近の実行日時
    basetime = assessmentRuns[0]["completedAt"]

    #Inspectorの前回結果と今回結果の差分を取得するクエリ
    query = createQuery(assessmentRuns)
    #クエリ実行結果ファイル(ローカル)の取得
    filepath = get_csv_file(athena_get_query_results(athena_query_execution(dbname,query)))
    #S3へアップロードし、PresignedURLを生成
    url = s3_upload(filepath,basetime)
    #SNSパブリッシュ
    topicarn = "arn:aws:sns:ap-northeast-1:hogehoge"
    sns_publish(topicarn,url)

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1