この記事はGRIPHONE Advent Calendar 2018の9日目の記事です。
こんにちは。SREの徳田です。
今回はここ最近でETL基盤を作った際に、ワークフローエンジンとして使用したDigdagというツールについて紹介しようと思います。
※この記事はGRIPHONE Advent Calendar 2018 9日目の記事です。
https://qiita.com/advent-calendar/2018/griphone
https://adventar.org/calendars/3147
Digdagについては以下の3回に分けて解説できればと思います!
- Digdagについてとワークフローの定義・記述について
- Digdag Serverの構成についてと構築について
- Digdag Embulk を使ったETL処理の実装について
Digdagとは
Digdagはオープンソースで開発されているワークフローエンジンです。yamlでシンプルにワークフローを定義することが出来ます。
また、以下のような機能を持っています。
- 依存関係の解決
- 多くのCloudのサービスに対応
- Python / Rubyのサポート
- エラーハンドリング
- 管理画面UI
- セキュアなSecret
- バージョンコントロール
- Dockerのサポート
インストール方法
DigdagはJavaで実装されています。jarファイルを落としてきて実行権限つければ動きます。
curl -o /usr/local/bin/digdag "https://dl.digdag.io/digdag-latest"
chmod +x /usr/local/bin/digdag
Digdagのコンセプトについて
プロジェクトとリビジョン(project / revision)
プロジェクトはワークフローの定義ファイルや設定ファイルなどを含むセット(ディレクトリ)のことです。
プロジェクトをDigdagにアップロードする際に、Digdagはプロジェクトを置き換えるのではなく新しいバージョンとして保存し、古いバージョンを保持します。このプロジェクトの履歴のことをリビジョンと呼びます。
Digdagは通常最新のリビジョンを使ってワークフローを実行しますが、過去のリビジョンを指定して実行することも可能です。
セッションとアテンプト(session / attempt)
セッションはワークフローの実行計画です。ワークフローの全てのタスクが正常終了することでセッションが完了します。
セッションの実際の実行単位がアテンプトになります。
例えば、session1の実際の実行であるattempt1が失敗した際にsession1をリトライするする際はsession1に紐づくattempt2で実行されます。
スケジュール実行とsession timeについて
session timeはスケジュールされているワークフローが実行される時間のことです。
例として、日毎にスケジュールされているワークフローのsession timeは2018-11-28 00:00:00のようなその日の0時0分0秒になります。
しかし、これは実際の実行時間とは異なるものとなります。他のデータ処理待ちのため1時間遅延させてから実行したい時などにこのsession timeは有効に使えます。
また、このsession timeはワークフローの履歴上でユニークなものになります。
Digdagの使い方
ワークフローの定義について
+step1:
echo>: ${session_time}
タスク
+で始まっているものがタスクになります。
タスクの実行順序は上から下に実行されていきます。また、タスクはネストして子のタスクを定義することも出来ます。
オペレーター
operator>という形で表現されている(上の例ではecho>)ものがオペレーターになります。タスクはオペレーターを並んで記述されます。オペレーターは以下のような処理を実行できます
- echo:メッセージの表示
- shell:シェルの実行
- py:pythonスクリプトの実行
上記のもの以外にも様々なオペレーターが提供されています。一覧は以下のリンクから確認できます。
http://docs.digdag.io/operators.html
変数について
${variable_name}という表記でdigdagで変数を利用することが出来ます。
利用できる変数は以下のリンクから確認できます。
http://docs.digdag.io/workflow_definition.html#using-variables
例のワークフローを実行
上記のワークフローをworkflow.digというファイルで保存し、実行してみます。
ワークフローの実行はdigdag runというコマンドで行います。
$ digdag run workflow.dig
2018-11-28 01:54:00 +0900: Digdag v0.9.31
2018-11-28 01:54:01 +0900 [WARN] (main): Using a new session time 2018-11-27T00:00:00+00:00.
2018-11-28 01:54:01 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181127T000000+0000.
2018-11-28 01:54:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-27T00:00:00+00:00
2018-11-28 01:54:02 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: 2018-11-27T00:00:00+00:00
2018-11-27T00:00:00+00:00
Success. Task state is saved at /home/digdag/.digdag/status/20181127T000000+0000 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
分かりづらいですが、Successの上にsession_timeが表示されています。
さて、ここでなぜ2018-11-27T00:00:00+00:00と表示されるのでしょうか。一つはTimeZoneを指定していないためです。TimeZoneを指定して再度実行してみます。
(runを実行する前に.digdagフォルダを削除してください)
timezone: Asia/Tokyo
+step1:
echo>: ${session_time}
$ digdag run workflow.dig
2018-11-28 02:02:06 +0900: Digdag v0.9.31
2018-11-28 02:02:07 +0900 [WARN] (main): Using a new session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:02:07 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:02:07 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:02:09 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: 2018-11-28T00:00:00+09:00
2018-11-28T00:00:00+09:00
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
今度はsession_timeが実行日と同じ日になりました。しかし時間が0時0分0秒です。
これはsessionの時間の指定がないとデフォルトで0時0分0秒になるようになっているためです。
ログにありますが、sessionにはdaily、hourly、日時指定をすることができます。デフォルトではdailyになります。よって、特にしないと実行日の0時0分0秒がsession_timeになります。
今回の実行で2018-11-28分の実行がされたわけですが、試しにもう一度実行してみます。
$ digdag run workflow.dig
2018-11-28 02:15:19 +0900: Digdag v0.9.31
2018-11-28 02:15:20 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:15:20 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:15:20 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:15:20 +0900 [WARN] (0018@[0:default]+workflow+step1): Skipped
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
先程実行しているのでSkippedと表示され処理を実行していませんね。
これを強制的にもう一度実行する場合は–rerunオプションを使用します。
変数の定義
手段は3つあります。
- _exportパラメータを使う
- APIを用いて変数を定義
- Session開始時に変数を定義
1. _exportパラメータを使う
workflowに_exportパラメータで変数を定義します。
timezone: Asia/Tokyo
_export:
msg: hello world!!
+step1:
echo>: ${msg}
_exportで定義したパラメータは定義したタスクとその子のタスクまでが変数のスコープになります。
2. APIを用いて変数を定義
python・rubyオペレーターで利用できるAPIを用いて変数の定義をします。
詳細は以下のリンクへどうぞ。
Python:http://docs.digdag.io/python_api.html#defining-variables
Ruby:http://docs.digdag.io/ruby_api.html#defining-variables
3. Session開始時に変数を定義
Session開始時に-pオプションを用いて変数を定義します。また、-Pオプションでyamlファイルを渡すことも出来ます。
digdag run -p msg="hello world!!" workflow.dig
タスクの並列実行
_parallelパラメータをtrueにすることで定義したタスクの子タスクを並列で実行してくれます。
timezone: Asia/Tokyo
_parallel: true
+step1:
echo>: step1
+step2:
echo>: step2
+step3:
echo>: step3
上記のワークフローを実行してみましょう。
$ digdag run -a workflow.dig
2018-11-28 02:46:44 +0900: Digdag v0.9.31
2018-11-28 02:46:46 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:46:46 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:46:46 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:46:47 +0900 [INFO] (0020@[0:default]+workflow+step3): echo>: step3
2018-11-28 02:46:47 +0900 [INFO] (0019@[0:default]+workflow+step2): echo>: step2
2018-11-28 02:46:47 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: step1
step1
step3
step2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
順序を考慮せず並列で処理してくれてますね。分かりづらいですが・・w
ワークフローコントロール
分岐処理をさせる(ifオペレーター)
ifオペレータを使うことで分岐処理を実現できます。_doパラメータに真の時、_else_doパラメータに偽の時に実行するタスクやオペレーターを記述します。
timezone: Asia/Tokyo
+condition:
if>: ${cond}
_do:
echo>: ${cond} == true
_else_do:
echo>: ${cond} == false
$ digdag run -a -p cond=true workflow.dig
2018-11-28 02:56:42 +0900: Digdag v0.9.31
2018-11-28 02:56:43 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:56:43 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:56:43 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:56:44 +0900 [INFO] (0018@[0:default]+workflow+condition): if>: true
2018-11-28 02:56:44 +0900 [INFO] (0018@[0:default]+workflow+condition^sub): echo>: true == true
true == true
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
$ digdag run -a -p cond=false workflow.dig
2018-11-28 02:57:01 +0900: Digdag v0.9.31
2018-11-28 02:57:02 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:57:02 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:57:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:57:03 +0900 [INFO] (0018@[0:default]+workflow+condition): if>: false
2018-11-28 02:57:04 +0900 [INFO] (0018@[0:default]+workflow+condition^sub): echo>: false == false
false == false
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
ループ処理をさせる(loop/for_each/for_rangeオペレーター)
loopオペレーターは指定回数_doパラメータに指定されたタスクやオペレーターを実行します。
_parallelパラメータにtrueを指定することで_doパラメータのタスクを並列処理してくれます。
また、変数${i}にループ回数が格納されています。
timezone: Asia/Tokyo
+looptask:
loop>: 3
_do:
echo>: loop=${i}
$ digdag run -a workflow.dig
2018-11-28 03:06:30 +0900: Digdag v0.9.31
2018-11-28 03:06:31 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:06:31 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:06:31 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask): loop>: 3
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-0): echo>: loop=0
loop=0
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-1): echo>: loop=1
loop=1
2018-11-28 03:06:34 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-2): echo>: loop=2
loop=2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
for_eachオペレータはオブジェクトを取り出し、指定のkeyに格納してループ処理を行います。
例を見てもらったほうが早いかと・・w
timezone: Asia/Tokyo
+foreach:
for_each>:
var1:
- foo
- bar
- baz
var2:
- hoge
- fuga
_do:
echo>: ${var1} & ${var2}
$ digdag run -a workflow.dig
2018-11-28 03:12:27 +0900: Digdag v0.9.31
2018-11-28 03:12:28 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:12:28 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:12:28 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:12:29 +0900 [INFO] (0018@[0:default]+workflow+foreach): for_each>: {var1=[foo, bar, baz], var2=[hoge, fuga]}
2018-11-28 03:12:29 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=0=foo&1=var2=0=hoge): echo>: foo & hoge
foo & hoge
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=0=foo&1=var2=1=fuga): echo>: foo & fuga
foo & fuga
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=1=bar&1=var2=0=hoge): echo>: bar & hoge
bar & hoge
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=1=bar&1=var2=1=fuga): echo>: bar & fuga
bar & fuga
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=2=baz&1=var2=0=hoge): echo>: baz & hoge
baz & hoge
2018-11-28 03:12:31 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=2=baz&1=var2=1=fuga): echo>: baz & fuga
baz & fuga
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
for_rangeオペレータはfrom,to,stepを指定し、fromからstepずつ増加させtoになるまでループを行います。
timezone: Asia/Tokyo
+forrange:
for_range>:
from: 0
to: 30
step: 10
_do:
echo>: from=${range.from}, to=${range.to}, index=${range.index}
$ digdag run -a workflow.dig
2018-11-28 03:21:34 +0900: Digdag v0.9.31
2018-11-28 03:21:35 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:21:35 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:21:35 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:21:36 +0900 [INFO] (0018@[0:default]+workflow+forrange): for_range>: {from=0, to=30, step=10}
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=0&to=10): echo>: from=0, to=10, index=0
from=0, to=10, index=0
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=10&to=20): echo>: from=10, to=20, index=1
from=10, to=20, index=1
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=20&to=30): echo>: from=20, to=30, index=2
from=20, to=30, index=2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
他のワークフローを呼び出す(call/requireオペレータ)
callオペレーターは他のワークフローファイルを呼び出し、実行します。
hello.digを以下のように定義しておきます。
+echo:
echo>: hello
そして以下のワークフローを実行してみます。
timezone: Asia/Tokyo
+call1:
call>: hello.dig
+call2:
call>: hello.dig
+call3:
call>: hello.dig
$ digdag run -a workflow.dig
2018-11-28 03:25:32 +0900: Digdag v0.9.31
2018-11-28 03:25:33 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:25:33 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:25:33 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:25:34 +0900 [INFO] (0018@[0:default]+workflow+call1): call>: hello.dig
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call1^sub+echo): echo>: hello
hello
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call2): call>: hello.dig
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call2^sub+echo): echo>: hello
hello
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call3): call>: hello.dig
2018-11-28 03:25:36 +0900 [INFO] (0018@[0:default]+workflow+call3^sub+echo): echo>: hello
hello
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
requireオペレーターはcallオペレーターと似ており、他のワークフローファイルを呼び出し実行しますが、同一sessionで1度実行が完了されている場合はそれ以降同一sessionでは実行されません。
timezone: Asia/Tokyo
+require1:
require>: hello
+require2:
require>: hello
+require3:
require>: hello
$ digdag run -a workflow.dig
2018-11-28 03:29:41 +0900: Digdag v0.9.31
2018-11-28 03:29:42 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:29:42 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:29:42 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:29:43 +0900 [INFO] (0018@[0:default]+workflow+require1): require>: hello
2018-11-28 03:29:43 +0900 [INFO] (0018@[0:default]+workflow+require1): Starting a new session project id=1 workflow name=hello session_time=2018-11-27T15:00:00+00:00
2018-11-28 03:29:44 +0900 [INFO] (0018@[0:default]+hello+echo): echo>: hello
hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require1): require>: hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require2): require>: hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require3): require>: hello
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
エラーハンドリング
オペレーターに_errorパラメーターを指定することでエラーが発生した際の処理を定義することが出来ます。
failオペレーターを使ってerrorを発生させてみます。ついでにですが、failオペレーターを使う際は_errorパラメーターのオペレーターで${error.message}変数を利用することができます。
timezone: Asia/Tokyo
+step:
fail>: hello
_check:
echo>: ok
_error:
echo>: error occurred(${error.message})
$ digdag run -a workflow.dig
2018-11-28 03:42:05 +0900: Digdag v0.9.31
2018-11-28 03:42:06 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:42:06 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:42:06 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:42:07 +0900 [INFO] (0018@[0:default]+workflow+step): fail>: hello
2018-11-28 03:42:07 +0900 [ERROR] (0018@[0:default]+workflow+step): Task +workflow+step failed.
hello
2018-11-28 03:42:08 +0900 [INFO] (0018@[0:default]+workflow+step^error): echo>: error occurred(hello)
error occurred(hello)
2018-11-28 03:42:08 +0900 [INFO] (0018@[0:default]+workflow^failure-alert): type: notify
error:
* +workflow+step:
hello
Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
また、成功時には_checkパラメータで任意のオペレーターで処理を行うことが出来ます。
timezone: Asia/Tokyo
+step:
echo>: hello
_check:
echo>: ok
_error:
echo>: error occurred(${error.message})
$ digdag run -a workflow.dig
2018-11-28 03:43:18 +0900: Digdag v0.9.31
2018-11-28 03:43:19 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:43:19 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:43:19 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:43:20 +0900 [INFO] (0018@[0:default]+workflow+step): echo>: hello
hello
2018-11-28 03:43:21 +0900 [INFO] (0018@[0:default]+workflow+step^check): echo>: ok
ok
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
スクリプティングオペレーターについて
今回はETL処理をさせる際に用いたshオペレーターとpyオペレーターの解説をします。
shオペレーター
シェルコマンドやシェルスクリプトを実行します。
timezone: Asia/Tokyo
+step:
sh>: date
$ digdag run -a workflow.dig
2018-11-28 03:49:23 +0900: Digdag v0.9.31
2018-11-28 03:49:24 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:49:24 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:49:24 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:49:25 +0900 [INFO] (0018@[0:default]+workflow+step): sh>: date
Wed Nov 28 03:49:25 JST 2018
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
pyオペレーター
pythonスクリプトを呼び出して実行します。
簡単なpythonスクリプトを用意します。
class MyWorkflow(object):
def step1(self):
print("step1")
そして、このスクリプトを呼び出すワークフローを定義して実行してみます。
timezone: Asia/Tokyo
+step:
py>: task.MyWorkflow.step1
$ digdag run -a workflow.dig
2018-11-28 03:53:20 +0900: Digdag v0.9.31
2018-11-28 03:53:21 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:53:21 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:53:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:53:22 +0900 [INFO] (0018@[0:default]+workflow+step): py>: task.MyWorkflow.step1
step1
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
pythonスクリプトが呼び出せていますね。
pythonスクリプト内でdigdagというパッケージをimportすることでdigdagのAPIを利用することが出来ます。
APIの詳細についてはこちらをどうぞ。
http://docs.digdag.io/python_api.html
まとめ
今回は文面と例での解説ばかりになってしまいました・・・が、Digdagのコンセプトやワークフローの定義方法について解説しました!
今回はDigdagをローカルで利用して解説を進めてきましたが、次回はDigdagのサーバー版であるDigdag Serverの構成と構築方法について記述できればと思います!
それでは!!