データ基盤にArgoWorkflowsを導入した話

Yoshina9aPosted by

この記事はGRIPHONE Advent Calendar 2021 6日目の記事です。

グリフォンのデータラボでエンジニアをしている吉永です。
好きな空港は羽田空港で、羽田空港でおすすめのお店は第2ターミナル3階にあるお蕎麦屋さんです。

グリフォンでは昨年にデータラボが立ち上がり、このチームではデータ周りのエンジニアリングや分析支援などをやっています。

今年の9月にサービス運用基盤がGCPのKubernetesに移行したことに伴い、データ基盤も刷新することにしました。
新しいデータ基盤にArgoWorkflowsを導入したので、その経緯を本記事ではお話します。

モチベーション

2021年9月にパブリッククラウドをAWSからGCPへ集約、サービス運用基盤はKubernetesへ移行することから、AWS上のデータ基盤もGCPへ移す必要がありました。
これを機に、今使用しているデータ基盤も刷新しようという考えが浮上し、GCP上に構築する新しいデータ基盤を考えることにしました。

ちなみに以前までのデータ基盤はワークフローエンジンにDigDag、データ転送にはEmbulkを使用していました。

引越し前のアーキテクチャ図

次の候補はArgoWorkflows

次のデータ基盤に使うワークフローエンジンの候補は3つありました。

  1. Cloud Composer(Googleが提供しているAirflow)
  2. ArgoWorkflows(GKEに構築)
  3. DigDag(GKEに構築)

タイトルにもある通り、私達はArgoWorkflowsを採用しました。

引越し後のアーキテクチャ図

なぜそれを採用したかというと、

  • 新しく構築するKubernetes環境についでにのせられて費用面で有利
  • Kubernetesネイティブなのでスケールが容易
  • どうせ新しい学習コストがかかるならKubernetesの恩恵を受けたほうが得
  • GitOpsに乗れる
  • (Kubernetesの不明点についてはSREの支援を受けられる)
  • サービスが増えた際に横展開がしやすそう

というメリットがあったからでした。

Cloud Composerのワークフローをコード管理するとなるとterraformを利用しなければならないということを考えると、GitOpsでよしなにやってくれるArgoが有利だなと考えました。

また、利用実績のあったDigDagは今後スケールすることを考えたときに、自前で色々と用意しなければならないことや、柔軟なワークフローが組めないことから、大変お世話になったDigDagには別れを告げることにしました。

余談ではありますが、以前はEC2のインスタンス1台で約200テーブルの転送を担っていました。
すべての転送が終わる頃には、日付が変わる手前あたりになっていました。
そんな状況だったため、転送が一つでもエラーになってしまうとその後の転送が遅れて、翌日の可視化にも影響が出ることがしばしばありました。

Argoにしてよかったこと

まずは、期待していたとおりKubernetesの恩恵を受けられました。転送をスケールさせることができ、Digdag時代ではほぼ1日中転送に対して6時間程度で終わるようになりました。

2つ目に良かったことは、ワークフローのテンプレートが使い回せることでした。
特にSlack通知など各ワークフローで必要な処理を使い回すのに重宝しました。

3つ目に良かったことは、ワークフローの制御が細かくできたことでした。DigDag時代ではいくつかの依存関係のあるワークフローが動いており、そのワークフローは職人の勘で設定されたcronにしたがって動いていました。
なにかの節に重要なワークフローが止まってしまうと、その後のワークフローに影響を及ぼすことがしばしばありました。

最後に良かったことは、Jenkinsで動いていた闇ジョブをArgoWorkflowsにお引越ししてしっかりコード化できたことです。

またまた余談ですが、パブリッククラウドをGCPに合わせたことによって、セキュリティ周りではWorkload Identityを活用したので、鍵の管理の手間が減りました。

Argoにして困ったこと

日付操作系が少し弱いと感じています。
例えば「日付指定して数値を取得したい類のジョブ」を実行する際には、パラメーターとして日付を渡したいと思います。
しかしArgoWorkflowsのvariablesには良さそうな変数がなく自前でどうにかするしか無いのが困りました。

日付操作を実現するには、DAGやSTEPなどのワークフローで対応するか、テンプレート側で、別途その操作をする必要がありました。
好みの問題かと思いますが、私はワークフローにはロジックを記述しない方針でやっていたため、新たにワークフローを作成することにしました。が、せっかくなのでここでは2つの解決法をご紹介いたします。

sprigを組み合わせて日付を操作する

sprigのdateModifyで時間を操作する形で日付操作を実現するのが1つ目の手段です。こちらであれば、日時指定系のジョブに1行で対応ができます。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
   generateName: date
spec:
  entrypoint: whalesay
  arguments:
    parameters:
    - name: message
      value: "{{=sprig.dateInZone('2006-01-02', sprig.dateModify('-24h', sprig.toDate('2006-01-02T15:04:05Z07:00', workflow.creationTimestamp.RFC3339)), 'JST')}}"

  templates:
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

前日の日付を出力するワークフローテンプレート

もう一つは、ワークフロー実行日の前日の日付を用意するワークフローテンプレートを用意しました。
基本的には前日の日付が出るようにしていますが、テストなどで任意の日付を渡したい場合もあったので任意の日付を渡せるようにしています。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: job
spec:
  arguments:
    parameters:
      - name: date
  templates:
    - name: main
      inputs:
        parameters:
          - name: date
            value: "{{ inputs.parameters.date }}"
            default: yesterday
      script:
        image: alpine
        command:
          - sh
        source: |
          ln -sfn /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
          date="{{inputs.parameters.date}}"
          if [ $date = yesterday ]; then
            date=$(date -d yesterday +%Y-%m-%d)
          fi
          echo "$date" > "/tmp/date.txt"
      outputs:
        parameters:
        - name: date
          valueFrom:
            default: "{{ workflow.parameter.date }}"
            path: /tmp/date.txt

確認用のワークフロー

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: yesterday
spec:
  entrypoint: main
  templates:
    - name: main
      steps:
        - - name: day
            templateRef:
              name: job
              template: main
            arguments:
              parameters:
              - name: date
                value: yesterday
        - - name: consume-parameter
            template: print-message
            arguments:
              parameters:
              - name: message
                value: "{{steps.day.outputs.parameters.date}}"

    - name: print-message
      inputs:
        parameters:
        - name: message
      container:
        image: docker/whalesay:latest
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]

実行結果

 ____________
< 2021-11-28 >
 ------------
    \
     \
      \
                    ##        .
              ## ## ##       ==
           ## ## ## ##      ===
       /""""""""""""""""___/ ===
  ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ /  ===- ~~~
       \______ o          __/
        \    \        __/
          \____\______/

まとめ

今年の9月にデータ基盤のワークフローエンジンにArgoWorkflowsに移行しました。
サービス基盤がKubernetesへ移行したため、データ基盤もそれにあわせることで、GCP、Kubernetesの恩恵を受けました。
また、本記事の経緯では触れませんでしたが、GitOpsなどの恩恵も受けることができました。

日付操作系などちょっとした癖はありますが、ワークフローエンジンでArgoWorkflows以外を利用していて、今後kubernetesでの利用を想定している方は、一度移行を考えてみても良いかもしれません。