AWS Step Functions とは
SNS, SQS等のイベントやLambdaやECS Task、AWS Batchなどの実行・制御を定義したワークフローを作成できるAWSのサービスです。
https://docs.aws.amazon.com/ja_jp/step-functions/index.html
ワークフローはjsonで記述し、自動的にビジュアライズされます。
Step Functionsの記述とビジュアライズされたワークフローの例
AWS Step FunctionsがDynamic Parallelismをサポートしたという発表が9月18日にありました。
https://aws.amazon.com/jp/blogs/aws/new-step-functions-support-for-dynamic-parallelism/
これまでも複数のタスクを並列に動かすことは可能でしたが、並列処理の内容は静的に定義しておく必要がありました。
今回サポートされたDynamic Parallelismでは、実行時の条件に応じて個数の変わる複数のアイテムに対して、同じ処理を並列に実行することができます。
これまで可能だった並列処理の例
今回からサポートされた並列処理の例
Dynamic Parallelismの記述方法
新たに追加されたMap
という状態を使用します。
Mapで指定できるフィールド
Mapには全ての状態で共通のフィールドや、Task, Parallelと共通のRetry, Catchといったフィールドの他に、下記の独自のフィールドがあります。
フィールド名 | 必須 | 内容 |
---|---|---|
Iterator | * | 与えられた配列の各要素に対して実行する処理 |
ItemsPath | - | イテレート対象の配列への参照パス |
MaxConcurrency | - | 並列処理の最大同時実行数 |
ResultPath | - | 結果の出力先への参照パス |
InputPath, ItemsPathによる入力値のマッピング
"InputPath": "$.result",
"ItemsPath": "$.files",
↑のような設定で
{
"result": {
"files": [
{"filepath":"/path/to/1.csv"},
{"filepath":"/path/to/2.csv"},
{"filepath":"/path/to/3.csv"}
]
}
}
↑のような入力がMapにあった時
{"filepath":"/path/to/1.csv"}
↑のようなjsonがイテレータ内の処理に入力されます。
Dynamic Parallelismを利用したステートマシンの例
複数のCSVファイルの情報が渡された時に、それぞれ計算およびバックアップを行うステートマシンを例として記述します。
(Iterator内で起動するLambdaの実装についてはここでは割愛します)
{
"StartAt":"csv-processes",
"States": {
"csv-processes": {
"Type": "Map",
"InputPath": "$.result",
"ItemsPath": "$.files",
"MaxConcurrency": 10,
"Iterator": {
"StartAt":"csv-process",
"States": {
"csv-process": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Calcurate",
"States": {
"Calcurate": {
"Type": "Task",
"Resource": "arn:aws:lambda:{REGION}:{ACCOUNT_ID}:function:{FUNCTION_NAME}",
"End": true
}
}
},
{
"StartAt": "Backup",
"States": {
"Backup": {
"Type": "Task",
"Resource": "arn:aws:lambda:{REGION}:{ACCOUNT_ID}:function:{FUNCTION_NAME}",
"End": true
}
}
}
]
}
}
},
"End": true
}
}
}
MapやParallel状態はStatesを内包するので、ネストすると記述が複雑になります。
状態の名称やフィールドの順序などに注意し、できるだけわかりやすい記述をできるよう心がけたいです。
まとめ
Step Functionsの中でイテレーションが行えるようになったことにより、Lambda FunctionやECS Taskなどをよりシンプルに作ることができるようになったと思います。
複数のLambda FunctionやECS Taskを連携させて行うバッチ処理などで活用していきたいです。