ETL基盤を構築してみた 〜Embulk編〜

AvatarPosted by

この記事はGRIPHONE Advent Calendar 2018の4日目の記事です。

こんにちは。SREの徳田です。

今回はここ最近でETL基盤を作ったので、そのうちの一つのツールであるEmbulkについて紹介しようと思います。

※この記事はGRIPHONE Advent Calendar 2018 4日目の記事です。
https://qiita.com/advent-calendar/2018/griphone
https://adventar.org/calendars/3147

Embulkとは

Embulkはオープンソースのバルクデータローダーです。

データベース・ストレージ・ファイルフォーマットやクラウドサービスなど、様々なデータストアのデータ転送を手助けしてくれます。

Embulkは以下の機能があります。

  • 入力ファイルフォーマットの自動推測
  • 巨大なデータセットを扱うための並列・分散実行
  • All or Nothingを保証するトランザクション制御
  • 再開機能
  • RubyGems.orgでのPluginの提供

Embulkのプラグインは様々なものがあり、様々なデータストアからデータを抽出したり、ロードさせることができます。以下にプラグインのリストが掲載されているページを載せておきます。

http://www.embulk.org/plugins/

インストール方法

EmbulkはJavaで実装されているのでJarファイルをとってくるだけです。

curl -o /usr/local/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x /usr/local/bin/embulk

Embulkの使い方

今回はこのような転送を考えてみます。

MySQLのデータベースからExtractしてBigQueryにLoadさせます。

bundlerを使ってEmbulkのPluginをインストール

今回は入力にMySQL、出力にBigQueryを使うので、embulk-input-mysqlembulk-output-bigqueryをインストールします。

まずembulk用のbundle環境を作ります。

embulk mkbundle .

例ではカレントディレクトを指定してますが、別のフォルダを作ってそれを指定しても問題ありません。

また、作成されたembulkフォルダはPluginを自分で実装しない場合は消してしまって問題ないです。

次にmkbundleで作成されたGemfileに必要なPluginを記述します。

source 'https://rubygems.org/'
 
# No versions are specified for 'embulk' to use the gem embedded in embulk.jar.
# Note that prerelease versions (e.g. "0.9.0.beta") do not match the statement.
# Specify the exact prerelease version (like '= 0.9.0.beta') for prereleases.
gem 'embulk'
 
#
# 1. Use following syntax to specify versions of plugins
# to install this bundle directory:
#
#gem 'embulk-output-mysql' # the latest version
#gem 'embulk-input-baz', '= 0.2.0' # specific version
#gem 'embulk-input-xyz', '~> 0.3.0' # latest major version
#gem 'embulk-output-postgresql', '>= 0.1.0' # newer than specific version
#
#gem 'embulk-output-awesome', git: 'https://github.com/you/embulk-output-awesome.git', branch: 'master'
#
 
#
# 2. When you modify this file, run following command to
# install plugins:
#
# $ cd this_directory
# $ embulk bundle
#
 
#
# 3. Then you can use plugins with -b, --bundle BUNDLE_PATH command:
#
# $ embulk guess -b path/to/this/directory ...
# $ embulk run -b path/to/this/directory ...
# $ embulk preview -b path/to/this/directory ...
#
 
gem 'embulk-input-mysql'
gem 'embulk-output-bigquery'

そしてGemfileのPluginをインストールします。(別フォルダを作って実行した場合はそのフォルダに移動して実行します)

embulk bundle

完了するとbundleのフォルダにGemfile.lockとjrubyフォルダが出来ていると思います。

これにてbundle使ったPluginのインストールは完了です。

EmbulkのConfigを記述

次にConfigを書きます。Embulkはyamlで記述します。また、liquidという拡張子を追加することでliquidのテンプレートファイルとして処理してくれて、環境変数などを利用できるようになります。

inputの設定

まず入力のMySQL側から。

in:
  type: mysql
  host: {{ env.mysql_host }}
  port: {{ env.mysql_port }}
  user: {{ env.mysql_user }}
  password: {{ env.mysql_pass }}
  database: {{ env.mysql_db }}
  table: {{ env.mysql_table }}

殆ど環境変数になっちゃってますが、適宜値を直接書いてしまっても問題ないです。

基本的にはこれで転送されるようになりますが、これだと毎回テーブルをコピーする形になってしまいます。

そこでincremental loadingを使用してみます。

incremental loadingは、指定されたカラムでソートを行いloadしたデータのポジションを記録しておき、それ以降に作成や更新があったレコードだけをloadしてくれる機能です。

in:
  type: mysql
  host: {{ env.mysql_host }}
  port: {{ env.mysql_port }}
  user: {{ env.mysql_user }}
  password: {{ env.mysql_pass }}
  database: {{ env.mysql_db }}
  table: {{ env.mysql_table }}
  incremental: true
  incremental_columns:
    - update_time
    - id

上記のように設定すると、update_timeとidでソートを行い、最終処理以降にレコードの作成や更新があったものをloadするようになります。

上記はincremental loadingの例です。前回の実行で青色のレコードを対象にロードを行い、update_timeが「2018/11/26 23:34:22」、idが「3」のものまでという記録をします。次の実行でそれ移行のレコードである緑のレコードを対象にロードを行う、というものです。

この転送を行うことで指定日時での集計なども可能になりますね。

outputの設定

次に出力のBigQuery側の設定。

out:
  type: bigquery
  auth_method: json_key
  json_keyfile: '{{ env.bq_credential }}'
  project: {{ env.bq_project }}
  dataset: {{ env.bq_dataset }}
  location: asia-northeast1
  table: {{ env.bq_table }}
  schema_file: {{ env.bq_table }}.json
  default_timezone: Asia/Tokyo
  auto_create_table: true
  path_prefix: {{ env.bq_table }}
  compression: GZIP
  time_partitioning:
    type: DAY
    field: update_time

またもや環境変数ばかりですが。特徴的なところだけ解説します。

json_keyfileはBigQueryのCredentialのJSONファイルのパスを指定します。
しかし実際の所Digdagと連携させる時に都合が悪くなるためDigdagから実行させる際はCredentialのコンテンツ自体を環境変数を通してjson_keyfile.contentに渡します。

schema_fileはBigQueryのテーブルのスキーマ定義のファイルを指定しています。これはauto_create_tableを使用する時に必要です。schema_file以外の指定の方法としてtemplate_tableやcolumn_optionsを使う方法があります。詳しくはembulk-output-bigqueryのREADME.mdを見てみてください。

time_partitioningはBigQueryの時間分割パーティションの機能を使う際の設定です。詳しくはこちら。設定では日毎にテーブルを分割するように設定しています。また、分割する際に評価するカラムをupdate_timeに設定しています。

記述したConfigファイル

以下のようなConfigを作成しました。embulk.yaml.liquidというファイル名にします。

in:
  type: mysql
  host: {{ env.mysql_host }}
  port: {{ env.mysql_port }}
  user: {{ env.mysql_user }}
  password: {{ env.mysql_pass }}
  database: {{ env.mysql_db }}
  table: {{ env.mysql_table }}
  incremental: true
  incremental_columns:
    - update_time
    - id
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: '{{ env.bq_credential }}'
  project: {{ env.bq_project }}
  dataset: {{ env.bq_dataset }}
  location: asia-northeast1
  table: {{ env.bq_table }}
  schema_file: {{ env.bq_table }}.json
  default_timezone: Asia/Tokyo
  auto_create_table: true
  path_prefix: {{ env.bq_table }}
  compression: GZIP
  time_partitioning:
    type: DAY
    field: update_time

実行用のスクリプトを書く

今回テスト実行用にちょっとしたスクリプトを用意してみます。run.shというファイルで保存して実行権限を付けます。

#!/bin/sh
export mysql_host=127.0.0.1
export mysql_port=3306
export mysql_user=db
export mysql_pass=db_pass
export mysql_db=db
export mysql_table=example_test
export bq_credential=./credential.json
export bq_project=embulk-test-prj
export bq_dataset=embulk_test
export bq_table=embulk_test
 
$*

Previewを実行

以下のようなコマンドでPreviewを実行してみます。-bオプションは最初に作成したbundleのフォルダのパスを指定します。

$ ./run.sh embulk preview -b . embulk.yaml.liquid
2018-11-27 01:58:27.934 +0900: Embulk v0.9.9
2018-11-27 01:58:28.603 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-11-27 01:58:30.711 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/home/embulk/embulk-test/./Gemfile"
2018-11-27 01:58:30.712 +0900 [INFO] (main): Gem's home and path are being cleared.
2018-11-27 01:58:33.266 +0900 [INFO] (main): Started Embulk v0.9.9
2018-11-27 01:58:33.368 +0900 [INFO] (0001:preview): Loaded plugin embulk-input-mysql (0.9.3)
2018-11-27 01:58:33.411 +0900 [INFO] (0001:preview): JDBC Driver = /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-input-mysql-0.9.3/default_jdbc_driver/mysql-connector-java-5.1.44.jar
2018-11-27 01:58:33.420 +0900 [INFO] (0001:preview): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 01:58:33.421 +0900 [INFO] (0001:preview): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 01:58:33.632 +0900 [INFO] (0001:preview): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )
2018-11-27 01:58:33.632 +0900 [WARN] (0001:preview): embulk-input-mysql 0.9.0 upgraded the bundled MySQL Connector/J version from 5.1.34 to 5.1.44 .
2018-11-27 01:58:33.632 +0900 [WARN] (0001:preview): And set useLegacyDatetimeCode=false by default in order to get correct datetime value when the server timezone and the client timezone are different.
2018-11-27 01:58:33.632 +0900 [WARN] (0001:preview): Set useLegacyDatetimeCode=true if you need to get datetime value same as older embulk-input-mysql.
2018-11-27 01:58:33.806 +0900 [INFO] (0001:preview): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 01:58:33.806 +0900 [INFO] (0001:preview): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 01:58:33.814 +0900 [INFO] (0001:preview): SQL: SELECT * FROM `embulk_test` ORDER BY `update_time`, `id`
2018-11-27 01:58:33.819 +0900 [INFO] (0001:preview): > 0.00 seconds
+---------+--------------+-------------------------+
| id:long | state:string |   update_time:timestamp |
+---------+--------------+-------------------------+
|       1 |           ON | 2018-11-24 15:40:12 UTC |
|       2 |          OFF | 2018-11-26 02:10:43 UTC |
|       3 |          OFF | 2018-11-26 14:34:22 UTC |
+---------+--------------+-------------------------+

ちょっと上で解説した3つのレコードが転送されようとしていますね。

データ転送を実行

初回の転送

実際にMySQLからBigQueryにデータを転送してみましょう!

runで転送を実行します。-bオプションは先程と同じでbundleのフォルダのパスを指定します。-cでincremental loading用の差分情報を読み込み・出力するファイルの場所を指定します。

$ ./run.sh embulk run -b . -c embulk.diff.yaml embulk.yaml.liquid
2018-11-27 02:20:02.398 +0900: Embulk v0.9.9
2018-11-27 02:20:03.387 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-11-27 02:20:06.648 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/home/embulk/embulk-test/./Gemfile"
2018-11-27 02:20:06.649 +0900 [INFO] (main): Gem's home and path are being cleared.
2018-11-27 02:20:11.317 +0900 [INFO] (main): Started Embulk v0.9.9
2018-11-27 02:20:11.451 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.9.3)
2018-11-27 02:20:15.844 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.10)
2018-11-27 02:20:15.899 +0900 [INFO] (0001:transaction): JDBC Driver = /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-input-mysql-0.9.3/default_jdbc_driver/mysql-connector-java-5.1.44.jar
2018-11-27 02:20:15.910 +0900 [INFO] (0001:transaction): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:20:15.916 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:20:16.249 +0900 [INFO] (0001:transaction): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )
2018-11-27 02:20:16.250 +0900 [WARN] (0001:transaction): embulk-input-mysql 0.9.0 upgraded the bundled MySQL Connector/J version from 5.1.34 to 5.1.44 .
2018-11-27 02:20:16.250 +0900 [WARN] (0001:transaction): And set useLegacyDatetimeCode=false by default in order to get correct datetime value when the server timezone and the client timezone are different.
2018-11-27 02:20:16.250 +0900 [WARN] (0001:transaction): Set useLegacyDatetimeCode=true if you need to get datetime value same as older embulk-input-mysql.
2018-11-27 02:20:16.348 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-11-27 02:20:16.638 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get dataset... embulk-test-prj:embulk_test
2018-11-27 02:20:18.943 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.LOAD_TEMP_8a43cb2d_4bb7_422f_979b_0e3af1ff8b67_embulk_test
2018-11-27 02:20:19.468 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:20:20.029 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2018-11-27 02:20:20.150 +0900 [INFO] (0022:task-0000): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:20:20.151 +0900 [INFO] (0022:task-0000): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:20:20.164 +0900 [INFO] (0022:task-0000): SQL: SELECT * FROM `embulk_test` ORDER BY `update_time`, `id`
2018-11-27 02:20:20.173 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2018-11-27 02:20:20.245 +0900 [INFO] (embulk-output-executor-0): embulk-output-bigquery: create embulk_test.27590.2000.csv.gz
2018-11-27 02:20:20.330 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-11-27 02:20:20.356 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job starting... job_id:[embulk_load_job_020ac048-57ac-4ee7-bb50-db16a5380d94] embulk_test.27590.2000.csv.gz => embulk-test-prj:embulk_test.LOAD_TEMP_8a43cb2d_4bb7_422f_979b_0e3af1ff8b67_embulk_test in asia-northeast1
2018-11-27 02:20:23.143 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job checking... job_id:[embulk_load_job_020ac048-57ac-4ee7-bb50-db16a5380d94] elapsed_time:0.000141sec status:[RUNNING]
2018-11-27 02:20:33.684 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job completed... job_id:[embulk_load_job_020ac048-57ac-4ee7-bb50-db16a5380d94] elapsed_time:10.543211sec status:[DONE]
2018-11-27 02:20:33.686 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job response... job_id:[embulk_load_job_020ac048-57ac-4ee7-bb50-db16a5380d94] response.statistics:{:creation_time=>1543252822128, :completion_ratio=>1.0, :start_time=>1543252822885, :end_time=>1543252824087, :load=>{:input_file_bytes=>119, :output_bytes=>62, :output_rows=>3, :bad_records=>0, :input_files=>1}}
2018-11-27 02:20:33.693 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get table... embulk-test-prj:embulk_test.LOAD_TEMP_8a43cb2d_4bb7_422f_979b_0e3af1ff8b67_embulk_test
2018-11-27 02:20:34.094 +0900 [INFO] (0001:transaction): embulk-output-bigquery: transaction_report: {"num_input_rows":3,"num_response_rows":3,"num_output_rows":3,"num_rejected_rows":0}
2018-11-27 02:20:34.096 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job starting... job_id:[embulk_copy_job_b41d7c8c-b1d2-4e3e-b62d-7608c0f7ea51] embulk-test-prj:embulk_test.LOAD_TEMP_8a43cb2d_4bb7_422f_979b_0e3af1ff8b67_embulk_test => embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:20:34.714 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job checking... job_id:[embulk_copy_job_b41d7c8c-b1d2-4e3e-b62d-7608c0f7ea51] elapsed_time:0.000122sec status:[RUNNING]
2018-11-27 02:20:45.030 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job completed... job_id:[embulk_copy_job_b41d7c8c-b1d2-4e3e-b62d-7608c0f7ea51] elapsed_time:10.316349sec status:[DONE]
2018-11-27 02:20:45.031 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job response... job_id:[embulk_copy_job_b41d7c8c-b1d2-4e3e-b62d-7608c0f7ea51] response.statistics:{:creation_time=>1543252834380, :start_time=>1543252834531, :end_time=>1543252834958}
2018-11-27 02:20:45.034 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Delete table... embulk-test-prj:embulk_test.LOAD_TEMP_8a43cb2d_4bb7_422f_979b_0e3af1ff8b67_embulk_test
2018-11-27 02:20:45.368 +0900 [INFO] (0001:transaction): embulk-output-bigquery: delete embulk_test.27590.2000.csv.gz
2018-11-27 02:20:45.387 +0900 [INFO] (main): Committed.
2018-11-27 02:20:45.388 +0900 [INFO] (main): Next config diff: {"in":{"last_record":["2018-11-26T23:34:22.000000",3]},"out":{}}

ずらずらとログが出ていますが、ざっくりはMySQLからデータを抽出して一旦CSVに出力し、CSVからBigQueryの一時テーブルにロードさせています。次にロードが完了したら本命の名前のテーブルにコピーを実行し、一時テーブルとCSVを削除しています。これがAll or Nothingの処理の部分ですね!

そして最後に差分情報を出力しています。ちょっと-cオプションで指定したファイルを見てみましょう。

$ cat embulk.diff.yaml
in:
  last_record: ['2018-11-26T23:34:22.000000', 3]
out: {}

この様に転送した最後のレコードの情報を保持してくれています。2回目以降はこの情報を使って追加・更新があったレコードのみを転送するようになります。

2回目の転送

このような転送が起こる状態を作ってみます。初回の転送で青色のレコード部分を転送しました。

MySQLに追加・更新処理を行い下側のテーブルになるように変更をし、再度runを実行してみます。

./run.sh embulk run -b . -c embulk.diff.yaml embulk.yaml.liquid
2018-11-27 02:35:08.790 +0900: Embulk v0.9.9
2018-11-27 02:35:09.741 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-11-27 02:35:12.776 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/home/embulk/embulk-test/./Gemfile"
2018-11-27 02:35:12.778 +0900 [INFO] (main): Gem's home and path are being cleared.
2018-11-27 02:35:17.368 +0900 [INFO] (main): Started Embulk v0.9.9
2018-11-27 02:35:17.498 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.9.3)
2018-11-27 02:35:21.876 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.10)
2018-11-27 02:35:21.929 +0900 [INFO] (0001:transaction): JDBC Driver = /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-input-mysql-0.9.3/default_jdbc_driver/mysql-connector-java-5.1.44.jar
2018-11-27 02:35:21.942 +0900 [INFO] (0001:transaction): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:35:21.944 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:35:22.382 +0900 [INFO] (0001:transaction): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )
2018-11-27 02:35:22.382 +0900 [WARN] (0001:transaction): embulk-input-mysql 0.9.0 upgraded the bundled MySQL Connector/J version from 5.1.34 to 5.1.44 .
2018-11-27 02:35:22.383 +0900 [WARN] (0001:transaction): And set useLegacyDatetimeCode=false by default in order to get correct datetime value when the server timezone and the client timezone are different.
2018-11-27 02:35:22.383 +0900 [WARN] (0001:transaction): Set useLegacyDatetimeCode=true if you need to get datetime value same as older embulk-input-mysql.
2018-11-27 02:35:22.497 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-11-27 02:35:22.749 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get dataset... embulk-test-prj:embulk_test
2018-11-27 02:35:25.238 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.LOAD_TEMP_5a9309fe_2c91_48d6_b6c0_5aa2bb1d95f2_embulk_test
2018-11-27 02:35:25.670 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:35:26.125 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2018-11-27 02:35:26.281 +0900 [INFO] (0022:task-0000): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:35:26.283 +0900 [INFO] (0022:task-0000): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:35:26.298 +0900 [INFO] (0022:task-0000): SQL: SELECT * FROM `embulk_test` WHERE ((`update_time` > ?) OR (`update_time` = ? AND `id` > ?)) ORDER BY `update_time`, `id`
2018-11-27 02:35:26.300 +0900 [INFO] (0022:task-0000): Parameters: ["2018-11-26T23:34:22.000000", "2018-11-26T23:34:22.000000", 3]
2018-11-27 02:35:26.343 +0900 [INFO] (0022:task-0000): > 0.01 seconds
2018-11-27 02:35:26.370 +0900 [INFO] (embulk-output-executor-0): embulk-output-bigquery: create embulk_test.27809.2000.csv.gz
2018-11-27 02:35:26.432 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-11-27 02:35:26.455 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job starting... job_id:[embulk_load_job_c8817373-70d4-45e5-a0e4-27ec5ca96702] embulk_test.27809.2000.csv.gz => embulk-test-prj:embulk_test.LOAD_TEMP_5a9309fe_2c91_48d6_b6c0_5aa2bb1d95f2_embulk_test in asia-northeast1
2018-11-27 02:35:28.487 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job checking... job_id:[embulk_load_job_c8817373-70d4-45e5-a0e4-27ec5ca96702] elapsed_time:0.000152sec status:[RUNNING]
2018-11-27 02:35:38.915 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job completed... job_id:[embulk_load_job_c8817373-70d4-45e5-a0e4-27ec5ca96702] elapsed_time:10.431976sec status:[DONE]
2018-11-27 02:35:38.917 +0900 [INFO] (Ruby-0-Thread-1: /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-output-bigquery-0.4.10/lib/embulk/output/bigquery/bigquery_client.rb:156): embulk-output-bigquery: Load job response... job_id:[embulk_load_job_c8817373-70d4-45e5-a0e4-27ec5ca96702] response.statistics:{:creation_time=>1543253727735, :completion_ratio=>1.0, :start_time=>1543253728293, :end_time=>1543253729465, :load=>{:input_file_bytes=>78, :output_bytes=>40, :output_rows=>2, :bad_records=>0, :input_files=>1}}
2018-11-27 02:35:38.922 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get table... embulk-test-prj:embulk_test.LOAD_TEMP_5a9309fe_2c91_48d6_b6c0_5aa2bb1d95f2_embulk_test
2018-11-27 02:35:39.203 +0900 [INFO] (0001:transaction): embulk-output-bigquery: transaction_report: {"num_input_rows":2,"num_response_rows":2,"num_output_rows":2,"num_rejected_rows":0}
2018-11-27 02:35:39.206 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job starting... job_id:[embulk_copy_job_23a53aef-c9c5-4784-99f2-098e50ddd8a8] embulk-test-prj:embulk_test.LOAD_TEMP_5a9309fe_2c91_48d6_b6c0_5aa2bb1d95f2_embulk_test => embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:35:39.735 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job checking... job_id:[embulk_copy_job_23a53aef-c9c5-4784-99f2-098e50ddd8a8] elapsed_time:0.000148sec status:[RUNNING]
2018-11-27 02:35:50.125 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job completed... job_id:[embulk_copy_job_23a53aef-c9c5-4784-99f2-098e50ddd8a8] elapsed_time:10.389724sec status:[DONE]
2018-11-27 02:35:50.126 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job response... job_id:[embulk_copy_job_23a53aef-c9c5-4784-99f2-098e50ddd8a8] response.statistics:{:creation_time=>1543253739450, :start_time=>1543253739586, :end_time=>1543253740064}
2018-11-27 02:35:50.127 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Delete table... embulk-test-prj:embulk_test.LOAD_TEMP_5a9309fe_2c91_48d6_b6c0_5aa2bb1d95f2_embulk_test
2018-11-27 02:35:50.553 +0900 [INFO] (0001:transaction): embulk-output-bigquery: delete embulk_test.27809.2000.csv.gz
2018-11-27 02:35:50.569 +0900 [INFO] (main): Committed.
2018-11-27 02:35:50.570 +0900 [INFO] (main): Next config diff: {"in":{"last_record":["2018-11-27T18:53:37.000000",3]},"out":{}}

ログの端の方にありますが、output_rowsが2になっていますね。そしてログにも出ていますがlast_recordは

in:
  last_record: ['2018-11-27T18:53:37.000000', 3]
out: {}

このように最後のレコードに更新されています。

ついでに同じ状態で再度実行すると

./run.sh embulk run -b . -c embulk.diff.yaml embulk.yaml.liquid
2018-11-27 02:43:08.716 +0900: Embulk v0.9.9
2018-11-27 02:43:09.681 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-11-27 02:43:13.133 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/home/embulk/embulk-test/./Gemfile"
2018-11-27 02:43:13.135 +0900 [INFO] (main): Gem's home and path are being cleared.
2018-11-27 02:43:17.201 +0900 [INFO] (main): Started Embulk v0.9.9
2018-11-27 02:43:17.319 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.9.3)
2018-11-27 02:43:21.737 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.10)
2018-11-27 02:43:21.788 +0900 [INFO] (0001:transaction): JDBC Driver = /home/embulk/embulk-test/jruby/2.3.0/gems/embulk-input-mysql-0.9.3/default_jdbc_driver/mysql-connector-java-5.1.44.jar
2018-11-27 02:43:21.799 +0900 [INFO] (0001:transaction): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:43:21.802 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:43:22.208 +0900 [INFO] (0001:transaction): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )
2018-11-27 02:43:22.209 +0900 [WARN] (0001:transaction): embulk-input-mysql 0.9.0 upgraded the bundled MySQL Connector/J version from 5.1.34 to 5.1.44 .
2018-11-27 02:43:22.209 +0900 [WARN] (0001:transaction): And set useLegacyDatetimeCode=false by default in order to get correct datetime value when the server timezone and the client timezone are different.
2018-11-27 02:43:22.209 +0900 [WARN] (0001:transaction): Set useLegacyDatetimeCode=true if you need to get datetime value same as older embulk-input-mysql.
2018-11-27 02:43:22.302 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-11-27 02:43:22.534 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get dataset... embulk-test-prj:embulk_test
2018-11-27 02:43:25.027 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.LOAD_TEMP_600ca080_303c_4ae0_9ac3_67069fb1e621_embulk_test
2018-11-27 02:43:25.444 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Create table... embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:43:25.913 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2018-11-27 02:43:26.027 +0900 [INFO] (0022:task-0000): Fetch size is 10000. Using server-side prepared statement.
2018-11-27 02:43:26.029 +0900 [INFO] (0022:task-0000): Connecting to jdbc:mysql://127.0.0.1:3306/db options {useCompression=true, socketTimeout=1800000, useSSL=false, user=db, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2018-11-27 02:43:26.042 +0900 [INFO] (0022:task-0000): SQL: SELECT * FROM `embulk_test` WHERE ((`update_time` > ?) OR (`update_time` = ? AND `id` > ?)) ORDER BY `update_time`, `id`
2018-11-27 02:43:26.044 +0900 [INFO] (0022:task-0000): Parameters: ["2018-11-27T18:53:37.000000", "2018-11-27T18:53:37.000000", 3]
2018-11-27 02:43:26.158 +0900 [INFO] (0022:task-0000): > 0.01 seconds
2018-11-27 02:43:26.167 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-11-27 02:43:26.171 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Get table... embulk-test-prj:embulk_test.LOAD_TEMP_600ca080_303c_4ae0_9ac3_67069fb1e621_embulk_test
2018-11-27 02:43:26.446 +0900 [INFO] (0001:transaction): embulk-output-bigquery: transaction_report: {"num_input_rows":0,"num_response_rows":0,"num_output_rows":0,"num_rejected_rows":0}
2018-11-27 02:43:26.449 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job starting... job_id:[embulk_copy_job_3e473269-b0d1-4e30-b2b9-c217cbd8598e] embulk-test-prj:embulk_test.LOAD_TEMP_600ca080_303c_4ae0_9ac3_67069fb1e621_embulk_test => embulk-test-prj:embulk_test.embulk_test
2018-11-27 02:43:26.893 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job checking... job_id:[embulk_copy_job_3e473269-b0d1-4e30-b2b9-c217cbd8598e] elapsed_time:0.00018199999999999998sec status:[RUNNING]
2018-11-27 02:43:37.139 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job completed... job_id:[embulk_copy_job_3e473269-b0d1-4e30-b2b9-c217cbd8598e] elapsed_time:10.249845sec status:[DONE]
2018-11-27 02:43:37.141 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Copy job response... job_id:[embulk_copy_job_3e473269-b0d1-4e30-b2b9-c217cbd8598e] response.statistics:{:creation_time=>1543254206645, :start_time=>1543254206729, :end_time=>1543254206770}
2018-11-27 02:43:37.143 +0900 [INFO] (0001:transaction): embulk-output-bigquery: Delete table... embulk-test-prj:embulk_test.LOAD_TEMP_600ca080_303c_4ae0_9ac3_67069fb1e621_embulk_test
2018-11-27 02:43:37.525 +0900 [INFO] (main): Committed.
2018-11-27 02:43:37.525 +0900 [INFO] (main): Next config diff: {"in":{"last_record":["2018-11-27T18:53:37.000000",3]},"out":{}}

少々分かりづらいですが、中盤から少し下にあるtransaction_reportにnum_input_rowsもnum_output_rowsも0で、更新がないのでconfig diffも変更がありませんね。

incremental loadingが正しく動作していますね!

(MySQLの表示とBigQueryの結果を出してないがちゃんと転送できてますよw!)

まとめ

ETL基盤構築時に使用したEmbulkについて、紹介とMySQLからBigQueryへのデータ転送の例を紹介しました。

基本的にパラメータやCredentialはliquidテンプレートで環境変数を通して渡し、実行する形になります。(実はyamlファイルをincludeする方法もあります)

次回予定しているETL基盤を作る 〜Digdag編〜ではDigdagの紹介とServerでの使い方、そしてこの記事で紹介したEmbulkとの連携について書こうと思っています。お楽しみに!!