LoginSignup
2
1

More than 3 years have passed since last update.

digdag 拡張用 Python モジュール

Last updated at Posted at 2019-06-19

概要

現職で digdag を集計バッチをコントロールするジョブサーバとして利用していますが、いくつか機能的に足りないところがあったので Python で機能拡張してみました。それについて簡単に紹介したいと思います。

不足してる機能

ジョブ待ち合わせ

ワークフロー

require オペレータが存在し、これによって複数のワークフロー間の依存関係を記述することは既存機能で可能です。しかしながら、待ち合わせができるワークフローは同一プロジェクト内のワークフローに限られています。「依存関係があるならば同一プロジェクトで管理しろ」と言われそうですが、よんどころない理由で別プロジェクト管理にしたい場合には依存関係をもたせることができません。

タスク

_parallel: true を記述して並行実行しているところで一部だけタスクの待ち合わせをしたい場合、タスクの記述順序とかを駆使して書く必要があり、若干 dig ファイルの可読性が落ちるのではと思ってます。

MySQL の操作

digdag には Treasure Data や PostgreSQL に対する操作が簡単にできるオペレータが用意されていますが、MySQL に対するものはありません。唯一できるのは Treasure Data 用のオペレータで出力先として MySQL のエンドポイントURLを指定して結果をインポートすることくらいです。たとえば MySQL にクエリを投げて、結果を MySQL に書き出すといったことを実現するオペレータがありません。

_error ディレクトリでの失敗タスクの取得

digdag はタスクの実行が失敗した場合の操作として error ディレクトリが用意されています。これはちょうど Java などのプログラム言語で実装されている try~catch の catch ブロック内に記載する処理に相当します。例えばここでエラーがあったという通知を Chatwork や Slack にしようとしても、肝心のエラーが発生したタスク名を取得することができません。digdag の v0.9.9 移行では digdag.env.params 辞書にタスク名が格納されるようになりましたが、_error ディレクトリ内では既にタスク名が更新され _error+タスク名 となっているため利用価値がありません。

機能拡張について

上記機能を digdag に補うために Python3 で pydigdag というのを作りました。

インストール

Python3 が使用できる環境で以下を実行します。CentOS 7 など Python3 が入ってない環境の場合には、拙筆 マルチユーザ環境のための pyenv を参考にして構築するといいでしょう。

sudo -E pip install git+https://github.com/takkeybook/pydigdag.git

提供メソッド

MySQL

以下のメソッドを利用するに当たり、_export ディレクトリ内に mysql キーが存在し、その配下に user, password, host, database が設定されている必要があります。

partial_delete_hourly

指定テーブルの列 region_day、region_hour がそれぞれ指定年月日、時のレコードを削除します。テーブル名として OUTPUT、指定日時として START_TIME(書式は YYYY-mm-dd HH:00:00)のパラメタが _export ディレクトリ内で設定されている必要があります。

partial_delete_daily

指定テーブルの列 region_day が指定年月日のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_DATE(書式は YYYY-mm-dd)のパラメタが _export ディレクトリ内で設定されている必要があります。

partial_delete_weekly

指定テーブルの列 region_start_week が指定年月日のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_WEEK_DATE(書式は YYYY-mm-dd)のパラメタが _export ディレクトリ内で設定されている必要があります。

partial_delete_monthly

指定テーブルの列 region_month が指定年月のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_MONTH(書式は YYYY-mm-01)のパラメタが _export ディレクトリ内で設定されている必要があります。

delete_all

指定テーブル全てを削除します。テーブル名として OUTPUT パラメタが _export ディレクトリ内で指定されている必要があります。

run_sql

指定ファイルに記載されたクエリを実行します。クエリ内に {変数} という記述がある場合、_export ディレクトリ内で設定された値で置き換えられます。_export ディレクトリ内で QUERY_FILE パラメタが設定されていること。またクエリファイル内で使用している {変数} の「変数」もパラメタとして設定されている必要があります。

wait

指定ファイルに記載されたクエリを実行してレコードの存在確認をし、存在するまで待ち受けます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で QUERY_FILE、MAX_WAIT_TIME パラメタが設定されていること、またクエリファイル内で使用している {変数} の「変数」もパラメタとして設定されている必要があります。

補足

partial_delete_XXXX は TreasureData で集計した結果を MySQL に格納する際に既存レコードとの置き換えができないので、予め削除しておくために用意されたものです。該当レコードの指定が特定の列名になっているのは現職のテーブル構成がそうなっているからで、汎用性がないのは勘弁してください。このあたりは修正して列名も指定できるようにすれば汎用性が高まるかと思ってます。

Chatwork

以下のメソッドを利用するに当たり、_export ディレクトリ内に chatwork キーが存在し、apikey, endpoint, roomid が設定されている必要があります。

notify

_error ディレクトリ内でエラー発生の通知を指定された roomid の部屋に投稿します。なお _export ディレクトリ内の digdag キーの配下に endpoint(digdag webUI の URL)が設定されていると、通知メッセージにエラーが発生した attempt へのリンクが併記されます。

その他

get_failed_task

現在実行中のワークフロー内で実行に失敗したタスク名を返します。並列実行する複数のタスクが2個以上失敗した場合には attempt id の大きい方を返します。これは digdag 内部で利用するというより、Chatwork の notify メソッドで利用されている関数です。

wait_local_file

指定ファイルがワークフローが実行されるサーバのローカルファイルシステムに存在するまで待ち受けます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で LOCAL_FILE、MAX_WAIT_TIME パラメタが設定されている必要があります。

wait_task

指定されたタスクの実行完了を待ち受けます。当該タスクが失敗した場合には、待ち合わせしているタスクを実行せずに終了させます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で TASK_NAME、MAX_WAIT_TIME パラメタが設定されている必要があります。

wait_workflow

指定されたプロジェクト内のワークフローの実行完了を待ち受けます。当該ワークフローが失敗した場合には、待ち合わせしているタスクを実行せずに終了させます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。待ち受けるワークフローと待ち合わせるワークフローのスケジュール指定が同じであること(たとえばいずれも毎時実行とか日次実行とかという意味です)、_export ディレクトリ内で待ち受けるワークフロー情報として PROJECT_NAME, WORKFLOW_NAME パラメタが設定されていること、MAX_WAIT_TIME パラメタが設定されていることが必要となります。

使用例

MySQL

あるプロジェクト内において、以下のファイルを用意します。

config/mysql.dig
host: DBのホスト名あるいはIPアドレス
user: DBにアクセスするためのユーザ名
password: 上記ユーザのパスワード
database: アクセスするデータベース名
lib/_init_.py
from pydigdag.MySQLJob import MySQLJob

実行したいクエリを以下のように定義しておきます。

queries/daily_payments.sql
REPLACE INTO {OUTPUT}
SELECT
  region_day,
  service_key,
  SUM(payment) AS payment,
  NOW() AS created_at,
  NOW() AS updated_at
FROM
  {INTPUT}
WHERE
  region_day = '{START_DATE}'
GROUP BY
  region_day,
  service_key

これを digdag で定期的に実行する場合、dig ファイルを以下のように記述することになります。

sample_run_sql.dig
timezone: Asia/Tokyo

_export:
  START_TIME: ${moment(session_time).add(-1, "hours").format("YYYY-MM-DD HH:00:00")}
  START_DATE: ${moment(START_TIME).format("YYYY-MM-DD")}
  INPUT: hourly_payments
  OUTPUT: daily_payments
  QUERY_FILE: queries/daily_payments.sql
  mysql:
    !include : config/mysql.dig

+run:
  py>: lib.MySQLJob.run_sql

Chatwork

あるプロジェクト内において、以下のファイルを用意します。

lib/_init_.py
from pydigdag.chatwork import ChatworkApi
config/chatwork.dig
apikey: Chatwork API を利用するためのAPIキー
roomid: 投稿先のルームID
endpoint: https://api.chatwork.com/v2
digdag/digdag.dig
endpoint: http://digdag.example.com/
scripts/failed.sh
#!/bin/bash
exit 1
sample_notify.dig
timezone: Asia/Tokyo

_export:
  chatwork:
    !include : config/chatwork.dig

_error:
  py>: lib.ChatworkApi.notify

+task:
  sh>: scripts/failed.sh

sample_notify.dig を実行させると以下のようなメッセージが Chatwork に通知されることになります。

以下のタスクでエラーが発生しました
    タスク名: +pydigdagTest+task
    セッションUUID:7ceed71f-050b-400e-bcd8-d0ecd509335f
    セッション時間:2017-08-15T11:47:20+09:00
    管理画面URL:http://digdag.example.com/sessions/12345

その他(待受)

タスク

scripts/task1.sh
#!/bin/bash
sleep 120
echo "task1"
exit 0
scripts/task2.sh
#!/bin/bash
echo "task2"
exit 0
sample_wait_task.dig
timezone: Asia/Tokyo

+wait_success:
  _parallel: true

  +task1:
    sh>: scripts/task1.sh

  +task2:
    _export:
      TASK_NAME: task1
      MAX_WAIT_TIME: 20
    +wait:
      py>: lib.control.wait_task
    +run:
      sh>: scripts/task2.sh

上記は task1.sh の実行を待って task2.sh を実行させる例となります。これと同じ結果は単に逐次実行するように記述すればいいのでよい例ではありませんが、あくまで使用例ということで勘弁してください。

ワークフロー

wait_workflow.dig
timezone: Asia/Tokyo

schedule:
  hourly>: 39:00

+task:
  _export:
    PROJECT_NAME: required_workflow
    WORKFLOW_NAME: required_workflow
    MAX_WAIT_TIME: 180
  +wait:
    py>: lib.control.wait_workflow
  +run:
    sh>: scripts/task.sh

これを例えば wait_workflow として登録しておきます。

required_workflow.dig
timezone: Asia/Tokyo

schedule:
  hourly>: 40:00

+setup:
  echo>: start ${session_time}

+run:
  sh>: scripts/task.sh

これを required_workflow として登録しておきます。wait_workflow は毎時39分に、required_workflow は毎時40分に実行されますが、wait_workflow は required_workflow の終了を待って実行されるようになります。

2
1
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1