ETL基盤を構築してみた 〜Digdag編 その1〜

Posted by

この記事はGRIPHONE Advent Calendar 2018の9日目の記事です。

こんにちは。SREの徳田です。

今回はここ最近でETL基盤を作った際に、ワークフローエンジンとして使用したDigdagというツールについて紹介しようと思います。

※この記事はGRIPHONE Advent Calendar 2018 9日目の記事です。
https://qiita.com/advent-calendar/2018/griphone
https://adventar.org/calendars/3147

Digdagについては以下の3回に分けて解説できればと思います!

  1. Digdagについてとワークフローの定義・記述について
  2. Digdag Serverの構成についてと構築について
  3. Digdag Embulk を使ったETL処理の実装について

Digdagとは

Digdagはオープンソースで開発されているワークフローエンジンです。yamlでシンプルにワークフローを定義することが出来ます。

また、以下のような機能を持っています。

  • 依存関係の解決
  • 多くのCloudのサービスに対応
  • Python / Rubyのサポート
  • エラーハンドリング
  • 管理画面UI
  • セキュアなSecret
  • バージョンコントロール
  • Dockerのサポート

インストール方法

DigdagはJavaで実装されています。jarファイルを落としてきて実行権限つければ動きます。

curl -o /usr/local/bin/digdag "https://dl.digdag.io/digdag-latest"
chmod +x /usr/local/bin/digdag

 Digdagのコンセプトについて

プロジェクトとリビジョン(project / revision)

プロジェクトはワークフローの定義ファイルや設定ファイルなどを含むセット(ディレクトリ)のことです。

プロジェクトをDigdagにアップロードする際に、Digdagはプロジェクトを置き換えるのではなく新しいバージョンとして保存し、古いバージョンを保持します。このプロジェクトの履歴のことをリビジョンと呼びます。

Digdagは通常最新のリビジョンを使ってワークフローを実行しますが、過去のリビジョンを指定して実行することも可能です。

セッションとアテンプト(session / attempt)

セッションはワークフローの実行計画です。ワークフローの全てのタスクが正常終了することでセッションが完了します。

セッションの実際の実行単位がアテンプトになります。

例えば、session1の実際の実行であるattempt1が失敗した際にsession1をリトライするする際はsession1に紐づくattempt2で実行されます。

スケジュール実行とsession timeについて

session timeはスケジュールされているワークフローが実行される時間のことです。

例として、日毎にスケジュールされているワークフローのsession timeは2018-11-28 00:00:00のようなその日の0時0分0秒になります。

しかし、これは実際の実行時間とは異なるものとなります。他のデータ処理待ちのため1時間遅延させてから実行したい時などにこのsession timeは有効に使えます。

また、このsession timeはワークフローの履歴上でユニークなものになります。

Digdagの使い方

ワークフローの定義について

+step1:
  echo>: ${session_time}

タスク

+で始まっているものがタスクになります。

タスクの実行順序は上から下に実行されていきます。また、タスクはネストして子のタスクを定義することも出来ます。

オペレーター

operator>という形で表現されている(上の例ではecho>)ものがオペレーターになります。タスクはオペレーターを並んで記述されます。オペレーターは以下のような処理を実行できます

  • echo:メッセージの表示
  • shell:シェルの実行
  • py:pythonスクリプトの実行

上記のもの以外にも様々なオペレーターが提供されています。一覧は以下のリンクから確認できます。

http://docs.digdag.io/operators.html

変数について

${variable_name}という表記でdigdagで変数を利用することが出来ます。

利用できる変数は以下のリンクから確認できます。

http://docs.digdag.io/workflow_definition.html#using-variables

例のワークフローを実行

上記のワークフローをworkflow.digというファイルで保存し、実行してみます。

ワークフローの実行はdigdag runというコマンドで行います。

$ digdag run workflow.dig
2018-11-28 01:54:00 +0900: Digdag v0.9.31
2018-11-28 01:54:01 +0900 [WARN] (main): Using a new session time 2018-11-27T00:00:00+00:00.
2018-11-28 01:54:01 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181127T000000+0000.
2018-11-28 01:54:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-27T00:00:00+00:00
2018-11-28 01:54:02 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: 2018-11-27T00:00:00+00:00
2018-11-27T00:00:00+00:00
Success. Task state is saved at /home/digdag/.digdag/status/20181127T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

分かりづらいですが、Successの上にsession_timeが表示されています。

さて、ここでなぜ2018-11-27T00:00:00+00:00と表示されるのでしょうか。一つはTimeZoneを指定していないためです。TimeZoneを指定して再度実行してみます。

(runを実行する前に.digdagフォルダを削除してください)

timezone: Asia/Tokyo

+step1:
  echo>: ${session_time}
$ digdag run workflow.dig
2018-11-28 02:02:06 +0900: Digdag v0.9.31
2018-11-28 02:02:07 +0900 [WARN] (main): Using a new session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:02:07 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:02:07 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:02:09 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: 2018-11-28T00:00:00+09:00
2018-11-28T00:00:00+09:00
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

今度はsession_timeが実行日と同じ日になりました。しかし時間が0時0分0秒です。

これはsessionの時間の指定がないとデフォルトで0時0分0秒になるようになっているためです。

ログにありますが、sessionにはdaily、hourly、日時指定をすることができます。デフォルトではdailyになります。よって、特にしないと実行日の0時0分0秒がsession_timeになります。

今回の実行で2018-11-28分の実行がされたわけですが、試しにもう一度実行してみます。

$ digdag run workflow.dig
2018-11-28 02:15:19 +0900: Digdag v0.9.31
2018-11-28 02:15:20 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:15:20 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:15:20 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:15:20 +0900 [WARN] (0018@[0:default]+workflow+step1): Skipped
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

先程実行しているのでSkippedと表示され処理を実行していませんね。

これを強制的にもう一度実行する場合は–rerunオプションを使用します。

変数の定義

手段は3つあります。

  1. _exportパラメータを使う
  2. APIを用いて変数を定義
  3. Session開始時に変数を定義

1. _exportパラメータを使う

workflowに_exportパラメータで変数を定義します。

timezone: Asia/Tokyo

_export:
  msg: hello world!!

+step1:
  echo>: ${msg}

_exportで定義したパラメータは定義したタスクとその子のタスクまでが変数のスコープになります。

2. APIを用いて変数を定義

python・rubyオペレーターで利用できるAPIを用いて変数の定義をします。

詳細は以下のリンクへどうぞ。

Python:http://docs.digdag.io/python_api.html#defining-variables

Ruby:http://docs.digdag.io/ruby_api.html#defining-variables

3. Session開始時に変数を定義

Session開始時に-pオプションを用いて変数を定義します。また、-Pオプションでyamlファイルを渡すことも出来ます。

digdag run -p msg="hello world!!" workflow.dig

タスクの並列実行

_parallelパラメータをtrueにすることで定義したタスクの子タスクを並列で実行してくれます。

timezone: Asia/Tokyo

_parallel: true
+step1:
  echo>: step1

+step2:
  echo>: step2

+step3:
  echo>: step3

上記のワークフローを実行してみましょう。

$ digdag run  -a workflow.dig
2018-11-28 02:46:44 +0900: Digdag v0.9.31
2018-11-28 02:46:46 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:46:46 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:46:46 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:46:47 +0900 [INFO] (0020@[0:default]+workflow+step3): echo>: step3
2018-11-28 02:46:47 +0900 [INFO] (0019@[0:default]+workflow+step2): echo>: step2
2018-11-28 02:46:47 +0900 [INFO] (0018@[0:default]+workflow+step1): echo>: step1
step1
step3
step2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

順序を考慮せず並列で処理してくれてますね。分かりづらいですが・・w

ワークフローコントロール

分岐処理をさせる(ifオペレーター)

ifオペレータを使うことで分岐処理を実現できます。_doパラメータに真の時、_else_doパラメータに偽の時に実行するタスクやオペレーターを記述します。

timezone: Asia/Tokyo

+condition:
  if>: ${cond}
  _do:
    echo>: ${cond} == true
  _else_do:
    echo>: ${cond} == false
$ digdag run -a -p cond=true workflow.dig
2018-11-28 02:56:42 +0900: Digdag v0.9.31
2018-11-28 02:56:43 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:56:43 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:56:43 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:56:44 +0900 [INFO] (0018@[0:default]+workflow+condition): if>: true
2018-11-28 02:56:44 +0900 [INFO] (0018@[0:default]+workflow+condition^sub): echo>: true == true
true == true
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

$ digdag run -a -p cond=false workflow.dig
2018-11-28 02:57:01 +0900: Digdag v0.9.31
2018-11-28 02:57:02 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 02:57:02 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 02:57:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 02:57:03 +0900 [INFO] (0018@[0:default]+workflow+condition): if>: false
2018-11-28 02:57:04 +0900 [INFO] (0018@[0:default]+workflow+condition^sub): echo>: false == false
false == false
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

ループ処理をさせる(loop/for_each/for_rangeオペレーター)

loopオペレーターは指定回数_doパラメータに指定されたタスクやオペレーターを実行します。

_parallelパラメータにtrueを指定することで_doパラメータのタスクを並列処理してくれます。

また、変数${i}にループ回数が格納されています。

timezone: Asia/Tokyo

+looptask:
  loop>: 3
  _do:
    echo>: loop=${i}
$ digdag run -a  workflow.dig
2018-11-28 03:06:30 +0900: Digdag v0.9.31
2018-11-28 03:06:31 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:06:31 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:06:31 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask): loop>: 3
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-0): echo>: loop=0
loop=0
2018-11-28 03:06:33 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-1): echo>: loop=1
loop=1
2018-11-28 03:06:34 +0900 [INFO] (0018@[0:default]+workflow+looptask^sub+loop-2): echo>: loop=2
loop=2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

for_eachオペレータはオブジェクトを取り出し、指定のkeyに格納してループ処理を行います。

例を見てもらったほうが早いかと・・w

timezone: Asia/Tokyo

+foreach:
  for_each>:
    var1:
      - foo
      - bar
      - baz
    var2:
      - hoge
      - fuga
  _do:
    echo>: ${var1} & ${var2}
$ digdag run -a  workflow.dig
2018-11-28 03:12:27 +0900: Digdag v0.9.31
2018-11-28 03:12:28 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:12:28 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:12:28 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:12:29 +0900 [INFO] (0018@[0:default]+workflow+foreach): for_each>: {var1=[foo, bar, baz], var2=[hoge, fuga]}
2018-11-28 03:12:29 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=0=foo&1=var2=0=hoge): echo>: foo & hoge
foo & hoge
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=0=foo&1=var2=1=fuga): echo>: foo & fuga
foo & fuga
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=1=bar&1=var2=0=hoge): echo>: bar & hoge
bar & hoge
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=1=bar&1=var2=1=fuga): echo>: bar & fuga
bar & fuga
2018-11-28 03:12:30 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=2=baz&1=var2=0=hoge): echo>: baz & hoge
baz & hoge
2018-11-28 03:12:31 +0900 [INFO] (0018@[0:default]+workflow+foreach^sub+for-0=var1=2=baz&1=var2=1=fuga): echo>: baz & fuga
baz & fuga
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

for_rangeオペレータはfrom,to,stepを指定し、fromからstepずつ増加させtoになるまでループを行います。

timezone: Asia/Tokyo

+forrange:
  for_range>:
    from: 0
    to: 30
    step: 10
  _do:
    echo>: from=${range.from}, to=${range.to}, index=${range.index}
$ digdag run -a  workflow.dig
2018-11-28 03:21:34 +0900: Digdag v0.9.31
2018-11-28 03:21:35 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:21:35 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:21:35 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:21:36 +0900 [INFO] (0018@[0:default]+workflow+forrange): for_range>: {from=0, to=30, step=10}
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=0&to=10): echo>: from=0, to=10, index=0
from=0, to=10, index=0
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=10&to=20): echo>: from=10, to=20, index=1
from=10, to=20, index=1
2018-11-28 03:21:37 +0900 [INFO] (0018@[0:default]+workflow+forrange^sub+range-from=20&to=30): echo>: from=20, to=30, index=2
from=20, to=30, index=2
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

他のワークフローを呼び出す(call/requireオペレータ)

callオペレーターは他のワークフローファイルを呼び出し、実行します。

hello.digを以下のように定義しておきます。

+echo:
  echo>: hello

そして以下のワークフローを実行してみます。

timezone: Asia/Tokyo

+call1:
  call>: hello.dig

+call2:
  call>: hello.dig

+call3:
  call>: hello.dig
$ digdag run -a  workflow.dig
2018-11-28 03:25:32 +0900: Digdag v0.9.31
2018-11-28 03:25:33 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:25:33 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:25:33 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:25:34 +0900 [INFO] (0018@[0:default]+workflow+call1): call>: hello.dig
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call1^sub+echo): echo>: hello
hello
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call2): call>: hello.dig
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call2^sub+echo): echo>: hello
hello
2018-11-28 03:25:35 +0900 [INFO] (0018@[0:default]+workflow+call3): call>: hello.dig
2018-11-28 03:25:36 +0900 [INFO] (0018@[0:default]+workflow+call3^sub+echo): echo>: hello
hello
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

requireオペレーターはcallオペレーターと似ており、他のワークフローファイルを呼び出し実行しますが、同一sessionで1度実行が完了されている場合はそれ以降同一sessionでは実行されません。

timezone: Asia/Tokyo

+require1:
  require>: hello

+require2:
  require>: hello

+require3:
  require>: hello
$ digdag run -a  workflow.dig
2018-11-28 03:29:41 +0900: Digdag v0.9.31
2018-11-28 03:29:42 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:29:42 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:29:42 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:29:43 +0900 [INFO] (0018@[0:default]+workflow+require1): require>: hello
2018-11-28 03:29:43 +0900 [INFO] (0018@[0:default]+workflow+require1): Starting a new session project id=1 workflow name=hello session_time=2018-11-27T15:00:00+00:00
2018-11-28 03:29:44 +0900 [INFO] (0018@[0:default]+hello+echo): echo>: hello
hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require1): require>: hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require2): require>: hello
2018-11-28 03:29:45 +0900 [INFO] (0018@[0:default]+workflow+require3): require>: hello
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

エラーハンドリング

オペレーターに_errorパラメーターを指定することでエラーが発生した際の処理を定義することが出来ます。

failオペレーターを使ってerrorを発生させてみます。ついでにですが、failオペレーターを使う際は_errorパラメーターのオペレーターで${error.message}変数を利用することができます。

timezone: Asia/Tokyo

+step:
  fail>: hello

  _check:
    echo>: ok

  _error:
    echo>: error occurred(${error.message})
$ digdag run -a  workflow.dig
2018-11-28 03:42:05 +0900: Digdag v0.9.31
2018-11-28 03:42:06 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:42:06 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:42:06 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:42:07 +0900 [INFO] (0018@[0:default]+workflow+step): fail>: hello
2018-11-28 03:42:07 +0900 [ERROR] (0018@[0:default]+workflow+step): Task +workflow+step failed.
hello
2018-11-28 03:42:08 +0900 [INFO] (0018@[0:default]+workflow+step^error): echo>: error occurred(hello)
error occurred(hello)
2018-11-28 03:42:08 +0900 [INFO] (0018@[0:default]+workflow^failure-alert): type: notify
error:
  * +workflow+step:
    hello

Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

また、成功時には_checkパラメータで任意のオペレーターで処理を行うことが出来ます。

timezone: Asia/Tokyo

+step:
  echo>: hello

  _check:
    echo>: ok

  _error:
    echo>: error occurred(${error.message})
$ digdag run -a  workflow.dig
2018-11-28 03:43:18 +0900: Digdag v0.9.31
2018-11-28 03:43:19 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:43:19 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:43:19 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:43:20 +0900 [INFO] (0018@[0:default]+workflow+step): echo>: hello
hello
2018-11-28 03:43:21 +0900 [INFO] (0018@[0:default]+workflow+step^check): echo>: ok
ok
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

スクリプティングオペレーターについて

今回はETL処理をさせる際に用いたshオペレーターとpyオペレーターの解説をします。

shオペレーター

シェルコマンドやシェルスクリプトを実行します。

timezone: Asia/Tokyo

+step:
  sh>: date
$ digdag run -a  workflow.dig
2018-11-28 03:49:23 +0900: Digdag v0.9.31
2018-11-28 03:49:24 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:49:24 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:49:24 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:49:25 +0900 [INFO] (0018@[0:default]+workflow+step): sh>: date
Wed Nov 28 03:49:25 JST 2018
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

 

pyオペレーター

pythonスクリプトを呼び出して実行します。

簡単なpythonスクリプトを用意します。

class MyWorkflow(object):
    def step1(self):
        print("step1")

そして、このスクリプトを呼び出すワークフローを定義して実行してみます。

timezone: Asia/Tokyo

+step:
  py>: task.MyWorkflow.step1
$ digdag run -a  workflow.dig
2018-11-28 03:53:20 +0900: Digdag v0.9.31
2018-11-28 03:53:21 +0900 [WARN] (main): Reusing the last session time 2018-11-28T00:00:00+09:00.
2018-11-28 03:53:21 +0900 [INFO] (main): Using session /home/digdag/.digdag/status/20181128T000000+0900.
2018-11-28 03:53:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2018-11-28T00:00:00+09:00
2018-11-28 03:53:22 +0900 [INFO] (0018@[0:default]+workflow+step): py>: task.MyWorkflow.step1
step1
Success. Task state is saved at /home/digdag/.digdag/status/20181128T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

pythonスクリプトが呼び出せていますね。

pythonスクリプト内でdigdagというパッケージをimportすることでdigdagのAPIを利用することが出来ます。

APIの詳細についてはこちらをどうぞ。

http://docs.digdag.io/python_api.html

まとめ

今回は文面と例での解説ばかりになってしまいました・・・が、Digdagのコンセプトやワークフローの定義方法について解説しました!

今回はDigdagをローカルで利用して解説を進めてきましたが、次回はDigdagのサーバー版であるDigdag Serverの構成と構築方法について記述できればと思います!

それでは!!