概要Embulkとは?

データ処理タスクのためのオープンソース(Javaアプリケーション)のデータインポート・エクスポートツールのいわゆるETLになります。

データパイプラインを効率的かつ柔軟に構築するために設計され、大量のデータを処理する際のボトルネックであるデータの集約をEmbulkは非常に人気のあるツールであり、エコシステムが豊富で、多くのデータストアやデータベースとの当面の統合が可能です。

近頃のトレンドでは、1社で複数のオンプレアプリやSaaSを使用しており、Embulkを活用することでさまざまなプラグインがあり、多様なInput/Output処理が可能となり、便利なツールです。

embulkの特徴embulkはプラグインアーキテクチャです。

これにより、異なるデータソースやデータストアに対して柔軟な接続が可能です。データの取り込み元や出力先に合わせて、適切なプラグインを選択することで、データパイプラインの柔軟性と拡張性を高めることができます。

大規模データ処理のサポートが可能?

Embulk は、大量のデータを高速かつ長期処理することができます。

データ処理タスクを複数のスレッドやプロセスに分割し、メモリ処理を実行することで、パフォーマンスの向上を図りまた、データのバルクやインポート/エクスポートの機能も提供しています、データの移行やバッチ処理に最適なツールと言われます。

構成ファイルによる設定管理

Embulkでは、YAML形式の構成ファイルを使用して、データパイプラインの設定を管理します。 構成ファイルを編集することで、データソースの設定やフィルタリング、変換のルールを簡単にまた、複数のタスクを定義し、依存関係や実行順序をわかりやすく指定します。

Embulkの利用Embulkは様々なデータ処理タスクに使用することができます。以下にEmbulk の代表的な利用例を紹介します。

embulkをインストールしたサーバにs3→bigqueryへデータ転送をする場合に活用方法です。

インストールと記載方法

・参照:Javaのインストールと環境変数設定

https://qiita.com/reflet/items/be31629735db210ae0d1

・Embulkをインストール

curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"chmod +x ~/.embulk/bin/embulkecho 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrcsource ~/.bashrc

・S3 inputプラグインのインストール

https://github.com/embulk/embulk-input-s3

$ embulk gem install embulk-input-s3

・filter-json_keyのインストール

https://github.com/civitaspo/embulk-filter-json_key

$embulk gem install embulk-filter-json_key

・BigQuery outputプラグインのインストール

https://github.com/embulk/embulk-output-bigquery

$ embulk gem install embulk-output-bigquery

処理ymlの中身

in: type: s3 bucket:★S3のバケット名 path_prefix: ★S3に配置したファイルへのパス endpoint:★エンドポイント。s3-us-west-1.amazonaws.comみたいなの auth_method: basic access_key_id:★AWSのアクセスキーID secret_access_key:★AWSのシークレットアクセスキー parser: type: csv delimiter: "\t" charset: UTF-8 newline: CRLF null_string: 'NULL' skip_header_lines: 0 comment_line_marker: '#' allow_extra_columns: true columns: - {name: json_payload, type: string} filters:

  • type: json_key column: json_payload nested_key_delimiter: "." drop_keys:
    • {key: "a.b.userId"} out: type: bigquery mode: append auth_method: json_key json_keyfile:★サービスアカウントやユーザのシークレットキー(json) project: ★GCPのプロジェクト dataset: ★BigQueryのデータセット table: ★BigQueryのテーブル auto_create_dataset: true auto_create_table: true schema_file: insert_schema.json ★BQのテーブルのスキーマ column_options:
    • {name: json_payload, type: string } timeout_sec: 300 open_timeout_sec: 300 retries: 3 path_prefix: temp_data file_ext: .gz delete_from_local_when_job_end: true source_format: CSV max_bad_records: 0 formatter: type: jsonl

実行方法

実際の処理はrun実行ですが、preview実行によりdryrunも可能です。

$ embulk preview ~.yml実行時

$ embulk run ~.yml

ワークフローエンジンとの合わせ技

Digdagワークフローエンジンやcronを活用してshファイルのymlファイルを定期実行することが可能です。

0 0 * ~.sh処理1 0 10 * 処理2 # 処理1は10分以内には終わるはず 0 20 * 処理3 # 処理2は10分以内には終わるはず最後にパフォーマンスの最適化 Embulk のパフォーマンスはすでに優れていますが、大規模データ処理のさらなる最適化が進められています。データの分散処理や並列処理の改善、ストリーム処理のサポートなど、より高速かつ効率的なデータ処理が実現されることが期待されています。

学習コストは高く環境構築も時間がかかりますがデータベースを扱うエンジニアとしては面白いツールです。

参照資料

Embulkを使ってS3からBigQueryへデータをフィルタリングしてロードする

https://qiita.com/t-yotsu/items/48b9154db94f85726777

zozo TECH BLOG

https://techblog.zozo.com/entry/digdagandembulk#Embulk%E3%81%A8Digdag%E3%81%AB%E3%81%A4%E3%81%84%E3%81%A6