Inspectorの実行結果をCSVファイルとして出力する の続き。
概要
前回紹介した「Inspectorの実行結果をCSVファイルとして出力する」のLambdaファンクションの終了をトリガーに、今回紹介する新しいLambdaファンクションを起動する。このLambdaファンクションは以下の処理を実行する。
- 直近と前回のInspectorの実行結果を比較して差分をCSVレポートに出力する。
- CSVファイルをS3へアップロードし、Pre-Signed URLを生成して関係者へSNS経由で通知する。
図の上段のLambdaが前回紹介した部分。今回紹介する部分は下段のLambda。
前提として、前回紹介したInspectorの実行結果をCSVファイルとして出力するのLambdaファンクションにより、毎回のInspectorの実行結果はCSVでS3上に出力されているものとする。これらを分析するため、あらかじめAthenaのテーブルを作成しておく必要がある。
Athenaのテーブル作成
今回、CSVファイルの差分検出にはAthenaを使う。前回の記事で生成したCSVファイルのレイアウトをAthenaが認識できるよう、テーブルを作成しておく必要がある。ファイルの容量は大したことないので、パーティションは考慮しない。
CREATE EXTERNAL TABLE `inspector`(
`arn` string COMMENT 'from deserializer',
`assessmentrunarn` string COMMENT 'from deserializer',
`agentid` string COMMENT 'from deserializer',
`rulespackagename` string COMMENT 'from deserializer',
`severity` string COMMENT 'from deserializer',
`numericseverity` string COMMENT 'from deserializer',
`confidence` string COMMENT 'from deserializer',
`id` string COMMENT 'from deserializer',
`title` string COMMENT 'from deserializer',
`description` string COMMENT 'from deserializer',
`recommendation` string COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'escapeChar'='\n',
'quoteChar'='\"',
'separatorChar'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://hoge-bucket'
TBLPROPERTIES (
'skip.header.line.count'='1'
)
差分抽出クエリ
Inspectorの前回結果と今回結果の差分を取得するクエリは、Lambdaファンクション内で以下のコードによって生成している。
#Inspectorの前回結果と今回結果の差分を取得するクエリを生成して返す
def createQuery(assessmentRuns):
return """
with bef as (
select
agentid,
rulespackagename,
severity,
numericseverity,
id
from inspector
where
assessmentrunarn = '{}'
),
aft as (
select
agentid,
rulespackagename,
severity,
numericseverity,
id
from inspector
where
assessmentrunarn = '{}'
),
mst as (
select distinct
computername,
instanceid
from ssminventory.aws_instanceinformation
)
select
'-' as diff,
bef.agentid,
mst.computername,
bef.rulespackagename,
bef.severity,
bef.numericseverity,
bef.id
from bef
left outer join aft on
bef.agentid = aft.agentid
and bef.rulespackagename = aft.rulespackagename
and bef.severity = aft.severity
and bef.id = aft.id
left outer join mst on
bef.agentid = mst.instanceid
where
aft.id is null
union
select
'+' as diff,
aft.agentid,
mst.computername,
aft.rulespackagename,
aft.severity,
aft.numericseverity,
aft.id
from aft
left outer join bef on
aft.agentid = bef.agentid
and aft.rulespackagename = bef.rulespackagename
and aft.severity = bef.severity
and aft.id = bef.id
left outer join mst on
aft.agentid = mst.instanceid
where
bef.id is null
order by 3,1,4,7
""".format(
#前回のassessmentRunsArn
assessmentRuns[1]['arn'],
#今回のassessmentRunsArn
assessmentRuns[0]['arn']
)
まずはwith
句で3つのサブテーブルを定義。(ここではbef
、aft
、mst
の3つ。)
bef
はInspectorの前回実行結果、aft
は今回実行結果。mst
はEC2のインスタンスIDからホスト名を導出するためだけに使用しており、今回のテーマにはあまり関係ない。
AWS Systems Manager インベントリマネージャーの機能を使ってEC2のインベントリ情報をS3に集約して管理しており、それをさらにAthenaで利用可能にしているのだが、機会があれば別記事で紹介するものとして、ここでは深く掘り下げない。
bef
にしか存在しない脆弱性は解消したものとして-
、aft
にしか存在しない脆弱性は新たに検出されたものとして+
を付加する。
-
のリストと+
のリストはunion
句で連結し、1回のクエリでまとめて取得できるようにする。
出力サンプル
出力ファイルのイメージはこんなかんじ。
"diff","agentid","computername","rulespackagename","severity","numericseverity","id"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","9.0","CVE-2019-xxxxx"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","8.0","CVE-2019-yyyyy"
"+","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","7.0","CVE-2019-zzzzz"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","9.0","CVE-2018-11111"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","8.0","CVE-2018-22222"
"-","i-0123456789abcdefg","samplehost01","Common Vulnerabilities and Exposures","High","7.0","CVE-2018-33333"
前回のInspector実行結果と比較してsamplehost01
というサーバでは新規に3件の脆弱性が検出されて、かわりに3件の脆弱性が解消されたことがわかる。Excelで開くとまあまあいい感じのレポートである。
以下、Lambdaファンクションの実装の内容について紹介する。
Lambdaのトリガー
前回記事のLambdaファンクションの終了をトリガーに本処理をキックできると、Inspector終了→脆弱性リスト出力→前回と今回の差分出力 と一連の流れがスムーズに実行される。
特に連携すべきデータもないので、前回作成したLambdaファンクションの最後にSNSへPublishしているトピックを今回作成するLambdaファンクションがSubscribeすることでLambdaファンクションを数珠つなぎにするのが手っ取り早い。SNSのメッセージ内容は使用せず、単純な起動トリガーとしてのみ使用する。
(Lambdaの数珠つなぎはアンチパターン?今回は単純な処理なので、気にしない。)
InspectorのテンプレートARNを取得
Athenaのテーブルを検索するにはInspectorの実行に紐づくassessmentRunArnが必要。
assessmentRunArnを検索するにはassessmentTemplateのArnが必要なので、
Inspectorテンプレート名をキーにしてテンプレートARNを取得する。
(相変わらず例外処理はあまり考慮していないので、そこは適宜補完してください。)
#Inspectorテンプレート名をもとにテンプレートARNを取得して返す
def get_assessment_template_arn(name):
response = inspector.list_assessment_templates(
filter={
'namePattern': name
}
)
return response['assessmentTemplateArns'][0]
assessmentRunArnsのリストを取得
テンプレートARNをもとに、assessmentRunのリストを新しい順で取得する。
一方で、list_assessment_runsは最大500件のassessmentRunArnsを返す。
Inspectorの実行開始日時でフィルタすることで、500件以上は取得対象が存在しないことを前提としている。
#テンプレートARNをもとに、今日~days日前の期間中のassessmentRunのリストを実施日の逆順にソートして返す
def get_assessment_run_arn(assessmentTemplateArn,days=365):
#基準日の範囲内のassessment_runを取得(最大500件。指定期間内に500件以上のassessmentRunが存在することは考慮しない)
assessmentRunArns = inspector.list_assessment_runs(
assessmentTemplateArns= [
assessmentTemplateArn
],
filter={
'states': [
'COMPLETED'
],
'startTimeRange': {
'beginDate': datetime.datetime.now() - datetime.timedelta(days),
'endDate': datetime.datetime.now()
}
},
maxResults = 500
)['assessmentRunArns']
if len(assessmentRunArns)==0:
raise Exception('該当期間のassessmentRunArnsが見つかりません。')
assessmentRuns=[]
#ARNをもとに詳細を取得
#describe_assessment_runs は10件ずつしか処理できないため、10件(上限値)ずつassessmentRunを取得
for i in range(0, -(-len(assessmentRunArns) // 10 )): #assessmentRunArnsの件数を10で割り算して切り上げ
response = inspector.describe_assessment_runs(
assessmentRunArns= assessmentRunArns[ i*10 : min( i*10+10 , len(assessmentRunArns) ) ]
)
assessmentRuns.extend(response['assessmentRuns'])
#nameの逆順でソート(=新しい順にソート)
sortedList = sorted(assessmentRuns, key=lambda x:x['createdAt'],reverse=True)
return sortedList
クエリ実行
材料は揃ったのでクエリ実行してみよう。
#Athenaのクエリを実行し、QueryExecutionIdを返す
def athena_query_execution(bucketname,dbname,querystring):
try:
response = athena.start_query_execution(
QueryString=querystring,
QueryExecutionContext={
'Database': dbname
}
,
ResultConfiguration={
'OutputLocation': 's3://' + bucketname + '/' + os.environ['AWS_LAMBDA_FUNCTION_NAME']+'/'
,
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'
}
}
)
queryexecutionid = response['QueryExecutionId']
except Exception as e:
raise e
return queryexecutionid
Athenaのクエリ実行は非同期処理になる点に注意。
クエリ実行すると、実行状況や結果をトレースするのに必要なQueryExecutionId
が得られる。これを使って処理完了までポーリングしてステータスをチェックする。
クエリ実行結果取得
Athenaのクエリ処理時間が長い場合、ひとつのLambdaファンクション内で完了を待ち続ける設計はよろしくないと思われるが、今回のクエリであれば所要時間はせいぜい10秒くらいなので、同一Lambdaファンクション内でAthenaのクエリ実行結果をポーリングして待つことにする。
#Athenaのクエリ実行結果をポーリングして待ち、出力ファイルののS3パスを返す
def athena_get_query_results(queryexecutionid):
#実行結果を2秒おきに確認する
x=0
for x in range(1,10):
sleep(2)
response = athena.get_query_execution(
QueryExecutionId=queryexecutionid
)
status=response['QueryExecution']['Status']['State']
if status in {"QUEUED","RUNNING"}:
continue
else:
break
if status == "SUCCEEDED":
#S3のファイルパスを返す
return response['QueryExecution']['ResultConfiguration']['OutputLocation']
elif status == "CANCELLED":
raise Exception('sql execution CANCELLED error')
elif status == "FAILED":
raise Exception('sql execution FAILED error')
elif status in {"QUEUED","RUNNING"}:
raise Exception('sql execution NOT COMPLETE error')
else:
raise Exception('sql execution UNKNOWN STATUS error')
クエリ実行結果ファイルの文字コード変換
最終的なcsvファイルをExcelで開けるようにするため、文字コード変換を行う。そのためにわざわざ一時的にS3からLambda実行環境へ一時的にダウンロードする。以下の関数に与えるfilepathは先程のathena_get_query_results
の戻り値、つまりAthenaの実行結果が格納されているS3のパス。
#S3に出力されたAthenaのクエリ実行結果ファイルをローカルにダウンロードし、文字コード変換を行い、変換後ファイルのパスを返す
def get_csv_file(filepath):
#CSVファイル取得
bucket = 'hogehoge'
key = filepath.split(bucket + "/")[1]
local_filepath = '/tmp/' + os.path.basename(filepath).replace(".csv","_utf8.csv")
try:
s3 = boto3.resource('s3', config=Config(signature_version='s3v4'))
s3.Bucket(bucket).download_file(key, local_filepath)
except Exception as e:
print(e)
# UTF-8 ファイルのパス
utf8_csv_path = local_filepath
# Shift_JIS ファイルのパス
shiftjis_csv_path = local_filepath.replace("_utf8.csv",".csv")
# 文字コードを Shift_JIS に変換して保存
fin = codecs.open(utf8_csv_path, "r", "utf-8")
fout_utf = codecs.open(shiftjis_csv_path, "w", "cp932", "ignore")
for row in fin:
fout_utf.write(row)
fin.close()
fout_utf.close()
return shiftjis_csv_path
signature_version='s3v4'
でさり気なくS3の署名バージョン対応を入れている。参考記事。
CSVファイルをS3へアップロードしてPresignedURLを生成
これ以降の処理は前回の記事Inspectorの実行結果をCSVファイルとして出力する とさほど変わらない。
def s3_upload(filepath,basetime):
#IAMロールを使用してS3のPresignedURLを作成すると、IAMロールの有効期限(1時間)が切れると無効になってしまうため
#より長い有効期間のクレデンシャルを使用する。get_parameters()の内容は前回記事参照。
accesskey,secretkey = get_parameters()
try:
s3 = boto3.resource(
's3',
aws_access_key_id=accesskey,
aws_secret_access_key=secretkey,
config=Config(signature_version='s3v4')
)
s3c = boto3.client(
's3',
aws_access_key_id=accesskey,
aws_secret_access_key=secretkey,
config=Config(signature_version='s3v4')
)
#S3バケット名
bucket = s3.Bucket('hogehoge')
#S3ファイル名
s3_filepath = 'inspector_diff_' + basetime.strftime('%Y-%m-%d') + ".csv"
except Exception as e:
print(e)
#S3アップロード
try:
bucket.upload_file(filepath, s3_filepath)
except Exception as e:
print(e)
#Pre-SignedURLの作成
try:
url = s3c.generate_presigned_url(
ClientMethod = 'get_object',
Params = {
'Bucket' : bucket.name,
'Key' : s3_filepath
},
ExpiresIn = expired_hour*60*60,
HttpMethod = 'GET'
)
except Exception as e:
print(e)
return url
ファイル名は脆弱性診断実施日ごとにユニーク。同一日に複数回脆弱性診断を実行している場合はファイルを上書きする仕様。
SNSへPublish
前回の記事と同じ。
#SNSへ通知する
def sns_publish(topicarn,url):
#SNS title
title = "Inspectorの差分分析結果レポート"
#SNS本文
message=u"""Inspector分析結果レポートを作成しました。
{}
こちらのURLからダウンロード可能です。
ダウンロード期限は{}時間です。
""".format(
url,
expired_hour
)
#SNS Publish
try:
response = sns.publish(
TopicArn = topicarn,
Message = message,
Subject = title
)
except Exception as e:
print(e)
raise e
まとめ
全体を繋げた処理はこんな感じ?
# -*- coding: utf-8 -*-
import boto3
import os
import datetime
import csv
from time import sleep
import codecs
from botocore.client import Config
sns = boto3.client('sns')
ssm = boto3.client('ssm')
inspector = boto3.client('inspector')
athena = boto3.client('athena')
#Presigned-URLの有効期間(時間)
expired_hour = 24
#S3アップロード用IAMユーザの認証情報を格納したパラメータストア名
accesskey_name="/hogehoge/accesskey"
secretkey_name="/hogehoge/secretkey"
#Athenaデータベース名
dbname = 'hoge-athena-dbname'
#Inspector実行テンプレート名
templatename = 'hoge-template-name'
def lambda_handler(event, context):
#テンプレート名に対応するTemplateArnを取得
assessmentTemplateArn = get_assessment_template_arn(templatename)
#テンプレートARNに紐づく、過去の実行履歴(30日前まで検索)
assessmentRuns = get_assessment_run_arn(assessmentTemplateArn,days=30)
#直近の実行日時
basetime = assessmentRuns[0]["completedAt"]
#Inspectorの前回結果と今回結果の差分を取得するクエリ
query = createQuery(assessmentRuns)
#クエリ実行結果ファイル(ローカル)の取得
filepath = get_csv_file(athena_get_query_results(athena_query_execution(dbname,query)))
#S3へアップロードし、PresignedURLを生成
url = s3_upload(filepath,basetime)
#SNSパブリッシュ
topicarn = "arn:aws:sns:ap-northeast-1:hogehoge"
sns_publish(topicarn,url)