TalendとTreasureDataを組み合わせて利用する

明けましておめでとうございます。 SSTDの髙橋です。 本年もビッグデータに関する情報発信を頑張って行きますので、どうぞ宜しくお願い致します。

新年第一回の記事は、ETLツールのTalendとTreasureDataについての記事です。 私たちが普段扱うデータは、事前にデータの加工が必要だったり、または第三者からデータを頂いてそのデータを代わりにTreasureDataにアップロードするなどの一手間が必要なことが多いです。 このような場合に、ETLツールを使ってデータを集約してアップロードするということが容易にできれば、データのアップロードという作業の手間を減らすことができるかと思います。


Talendとは

Talendとは、様々なデータソースに対してEclipseベースのGUIツールを用いて、データの集約をし統合するためのETLツールです。 このETLツールを使うことで、個々のデータソース専用のプログラムなどを作成する手間を減らすことができるようになります。 Talend製品の中には、Talend Open Studio(TOS)があり、こちらがオープンソースとして提供されています。 このTOSの中には、幾つか種類があり、今回はTalend Open Studio for Data Integrationを利用して、TreasureDataのBulkImport機能とJDBCドライバを利用して、TreasureDataへのデータのインポートとエクスポートを試してみます。


Talendのインストール

Talend Open Studio for Data Integration v5.4.1をダウンロードしてインストールします。 Cドライブ直下にはインストールできないようなので、talendというフォルダを作成して、そのフォルダ以下にインストールしました。
インストール後は、TOS_DI~.exeというファイルがインストール先フォルダ以下に作成されます。 初めて起動した際には、プロジェクトを作成して下さい。
プロジェクトを開いた後、新しくジョブを作成します。


Talendの利用方法

Talendでは、下図に示すように、一つ一つ機能を持つコンポーネントを組み合わせることで、処理のフローを記述することができます。 下図では、コマンドプロンプトを利用してBulkImportを実行し、データをTreasureData上にアップロードし、その後、JDBC経由にてTreasureDataにクエリを発行して、 アップロードしたデータを取得するという処理になっています。 次から各コンポーネントの設定方法を説明していきます。


データの準備

今回は、BulkImportとJDBCをTalendから呼び出す方法を試すので、テストデータを作成しておきます。 CSV形式で、ヘッダーもデータ内に入っているデータにします。

テストデータ

ファイルパス:C:\talend\TOS_DI-Win32-r110020-V5.4.0\workspace\test.csv
time,name,age
2014/01/02 00:00:00,ビッグデー太,20


BulkImportコマンドの利用

コマンドを利用するには、tSystemコンポーネントを利用します。 windowsのコマンドプロンプトを利用するには、"cmd /c"をコマンドの先頭に挿入する必要があります。

tSystemの設定

  • ホームディレクトリを使用を選択
  • Use Array Commandを選択
  • 値:"cmd","/c","td import:upload","test.csv","--output",".","--auto-create","test_db.test_table","--auto-perform","--auto-commit","--column-header","--time-column","time"
  • 標準出力:コンソールとグローバル変数の両方へ出力
  • エラー出力:コンソールとグローバル変数の両方へ出力


JDBCドライバの設定

まず、こちらからTreasureDataのJDBCドライバをダウンロードしておきます。 ドライバの共有設定のために、tJDBCConnectionコンポーネントを利用します。

tJDBCConnectionの設定

  • JDBCのURL:"jdbc:td://api.treasure-data.com:80/(db_name)"
  • ドライバJarファイル:ダウンロードしたJDBCのファイルパスを指定
  • ドライバクラス:"com.treasure_data.jdbc.TreasureDataDriver"
  • ユーザ名:"TreasureDataに登録したメールアドレス"
  • パスワード:"TreasureDataに登録したパスワード"


TreasureDataへのクエリ発行

JDBCを利用してクエリを発行するには、tJDBCInputを利用します。また、このコンポーネントから結果を得るためには、 コンポーネントにスキーマを設定しておく必要があるようです。

tJDBCInputの設定

  • 既存の接続を使用:(上記で設定した)tJDBCConnection_1
  • ドライバJarファイル:ダウンロードしたJDBCのファイルパスを指定
  • スキーマの編集:下記のスキーマの設定にて確認
  • テーブル名:"利用するテーブル名"
  • クエリ:"SELECT * FROM (table_name) WHERE time = 1388588400"

スキーマの設定

スキーマの編集にて、スキーマを設定することでデータの取得が可能となります。 今回はTreasureData上にてスキーマの設定をしていないため、vとtimeの二つがカラムとして出力される。 そのため、スキーマの編集では、この二つをスキーマとして登録します。


各コンポーネントの接続

各コンポーネント間は順序関係や依存関係を作成することができます。

tSystemとtJDBCInputの接続

tSystemの実行結果を得た後に、クエリを発行するフローにするために、tSystem上で右クリックを押し、条件付きを指定し、tJDBCInputと接続します。 このときの条件として、BulkImportを実行するtSystemの標準出力にて、SUCCESSが出力されていた場合に、JDBCにてクエリを発行するフローにします。
((String)globalMap.get("tSystem_1_OUTPUT")).contains("SUCCESS")

tJDBCInputとtLogRowの接続

クエリの実行結果を取得するのにtLogRowコンポーネントをを利用し、tJDBCInputにてRow->メインを指定して接続します。 接続すると、tJDBInputにて設定したスキーマの値がジョブの実行後にtLogRowによって出力されます。


ジョブの実行

全ての設定が完了した後、ジョブの実行をします。 実行後、下図に示されるのジョブ実行のパネルにて、処理結果が表示されていきます。 BulkImportの標準出力が表示された後、TreasureDataへクエリが発行され、最後にそのクエリの実行結果が、 "|"区切りで表示されていることが分かります。


以上で、TalendとTreasureDataの組み合わせの紹介を終わります。 ETLツールを利用することで、より効率的にデータソースからのデータ統合ができるようになりますので、 引き続きETLツールについても紹介していきたいと思います。