概要Embulkとは?
データ処理タスクのためのオープンソース(Javaアプリケーション)のデータインポート・エクスポートツールのいわゆるETLになります。
データパイプラインを効率的かつ柔軟に構築するために設計され、大量のデータを処理する際のボトルネックであるデータの集約をEmbulkは非常に人気のあるツールであり、エコシステムが豊富で、多くのデータストアやデータベースとの当面の統合が可能です。
近頃のトレンドでは、1社で複数のオンプレアプリやSaaSを使用しており、Embulkを活用することでさまざまなプラグインがあり、多様なInput/Output処理が可能となり、便利なツールです。
embulkの特徴embulkはプラグインアーキテクチャです。
これにより、異なるデータソースやデータストアに対して柔軟な接続が可能です。データの取り込み元や出力先に合わせて、適切なプラグインを選択することで、データパイプラインの柔軟性と拡張性を高めることができます。
大規模データ処理のサポートが可能?
Embulk は、大量のデータを高速かつ長期処理することができます。
データ処理タスクを複数のスレッドやプロセスに分割し、メモリ処理を実行することで、パフォーマンスの向上を図りまた、データのバルクやインポート/エクスポートの機能も提供しています、データの移行やバッチ処理に最適なツールと言われます。
構成ファイルによる設定管理
Embulkでは、YAML形式の構成ファイルを使用して、データパイプラインの設定を管理します。 構成ファイルを編集することで、データソースの設定やフィルタリング、変換のルールを簡単にまた、複数のタスクを定義し、依存関係や実行順序をわかりやすく指定します。
Embulkの利用Embulkは様々なデータ処理タスクに使用することができます。以下にEmbulk の代表的な利用例を紹介します。
embulkをインストールしたサーバにs3→bigqueryへデータ転送をする場合に活用方法です。
インストールと記載方法
・参照:Javaのインストールと環境変数設定
https://qiita.com/reflet/items/be31629735db210ae0d1
・Embulkをインストール
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"chmod +x ~/.embulk/bin/embulkecho 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrcsource ~/.bashrc
・S3 inputプラグインのインストール
https://github.com/embulk/embulk-input-s3
$ embulk gem install embulk-input-s3
・filter-json_keyのインストール
https://github.com/civitaspo/embulk-filter-json_key
$embulk gem install embulk-filter-json_key
・BigQuery outputプラグインのインストール
https://github.com/embulk/embulk-output-bigquery
$ embulk gem install embulk-output-bigquery
処理ymlの中身
in: type: s3 bucket:★S3のバケット名 path_prefix: ★S3に配置したファイルへのパス endpoint:★エンドポイント。s3-us-west-1.amazonaws.comみたいなの auth_method: basic access_key_id:★AWSのアクセスキーID secret_access_key:★AWSのシークレットアクセスキー parser: type: csv delimiter: "\t" charset: UTF-8 newline: CRLF null_string: 'NULL' skip_header_lines: 0 comment_line_marker: '#' allow_extra_columns: true columns: - {name: json_payload, type: string} filters:
- type: json_key
column: json_payload
nested_key_delimiter: "."
drop_keys:
- {key: "a.b.userId"} out: type: bigquery mode: append auth_method: json_key json_keyfile:★サービスアカウントやユーザのシークレットキー(json) project: ★GCPのプロジェクト dataset: ★BigQueryのデータセット table: ★BigQueryのテーブル auto_create_dataset: true auto_create_table: true schema_file: insert_schema.json ★BQのテーブルのスキーマ column_options:
- {name: json_payload, type: string } timeout_sec: 300 open_timeout_sec: 300 retries: 3 path_prefix: temp_data file_ext: .gz delete_from_local_when_job_end: true source_format: CSV max_bad_records: 0 formatter: type: jsonl
実行方法
実際の処理はrun実行ですが、preview実行によりdryrunも可能です。
$ embulk preview ~.yml実行時
$ embulk run ~.yml
ワークフローエンジンとの合わせ技
Digdagワークフローエンジンやcronを活用してshファイルのymlファイルを定期実行することが可能です。
0 0 * ~.sh処理1 0 10 * 処理2 # 処理1は10分以内には終わるはず 0 20 * 処理3 # 処理2は10分以内には終わるはず最後にパフォーマンスの最適化 Embulk のパフォーマンスはすでに優れていますが、大規模データ処理のさらなる最適化が進められています。データの分散処理や並列処理の改善、ストリーム処理のサポートなど、より高速かつ効率的なデータ処理が実現されることが期待されています。
学習コストは高く環境構築も時間がかかりますがデータベースを扱うエンジニアとしては面白いツールです。
参照資料
Embulkを使ってS3からBigQueryへデータをフィルタリングしてロードする
https://qiita.com/t-yotsu/items/48b9154db94f85726777
zozo TECH BLOG