PigによるTreasureDataのデータ処理

こんにちは、髙橋です。 暑い日が続いていますが、皆様はいかがお過ごしでしょうか。

私は先日、『プログラミング Hive』 『Hadoop 第3版』刊行記念 Hadoopセミナーに参加してきました。 セミナーでは、Cloudera社の方や書籍翻訳者の玉川さんのお話を伺うことができ、充実した時間を過ごすことができました。 また、セミナー最後のグッズプレゼントのじゃんけん大会では、景品になっていたHiveTシャツに心惹かれたのですが、残念ながら初戦敗退でした…

Hadoopセミナーのように、ビッグデータに関連するセミナーや勉強会も盛んに行われていますので、夏休みのある方はビッグデータについて勉強する絶好の機会だと思います。 さて、前ふりはこれくらいにして、今回のブログの内容は、Hiveの対抗馬であるPigについてです。

Pigとは?

Pigとは、Hiveと同様にMapReduceのラッパーであり、Hadoop上で分散処理を実行することが可能なオープンソースソフトウェアです。 以前紹介したHiveはHiveQLというSQLライクなクエリを記述することでMapReduceを実行させることができたのに対して、PigはPigLatinというデータフローを記述することによってMapReduceを実行させることができます。 また、特徴として、スクリプトの可読性やデータ処理の柔軟性が高いことが挙げられます。

しかし、上記のような特徴はありながらも、現状では、PigよりもHiveを利用しているユーザの方が多いようです。 その理由として、HiveQLがSQLライクに記述できるため利用の敷居が低いことや、性能面で見るとHive > Pigであることが挙げられます。 そのため、これからHiveかPigのどちらかを触ってみようと考えている方は、わざわざPigを選ばず、Hiveを利用して頂いた方が良いと思います。 しかし、折角、TreasureDataではPigも利用することができるので、簡単に紹介したいと思います。

TreasureDataにおけるPigの実行方法

TreasureDataのQueryコマンドでは、Hiveをデフォルトで実行させるようになっています。 そのため、-tオプションを用いて、Pigの利用を明示する必要があります。 また、コマンドが長くなってしまうため、PigLatinのスクリプトファイルを作成し、Pigを利用しましょう。 スクリプトファイルを読み込んで、Pigを実行するqueryコマンドは下記の通りです。

$ td query -w -d dbname -t pig -q scripts.pig

scripts.pigの内容例は、下記の通りです。

-- scripts.pig
OUT = FOREACH tablename GENERATE *;

上記スクリプトを実行させることで、tablenameのデータを全件取得することが可能です。

ちなみに、Hiveで同様の実行方法をする場合は下記のようになります。

$ td query -w -d dbname -t hive -q scripts.sql
-- scripts.sql
SELECT * FROM tablename

次に、TreasureDataのPigを利用する上での注意点が二点あります。

  1. LOADコマンドの自動化
  2. Pigでは、HDFSからデータを読み込む際に、LOADコマンドを利用します。 しかし、TreasureDataでは、TreasureData独自DBからデータを取得するため、 スクリプト内でテーブル名を呼び出すと自動でテーブルがLOADされるようになっています。
  3. STOREコマンドの自動化
  4.  Pigでは、結果の出力にStoreコマンドを用います。 しかし、TreasureDataでは、スクリプトの最終行の結果が自動的に出力されます。

テーブルの内容を1行だけ取得したい場合、PigLatinでは、ILLUSTRATEコマンドを実行することで各行におけるスクリプトの実行結果を1行ずつ取得することが可能です。しかし、日本語のデータは出力されないため、注意が必要です。

-- scripts.pig
ILLUSTRATE tablename;

TreasureDataにおけるPigの基本的な処理

TreasureDataにおけるPigの基本的な処理を紹介していきます。 各行の処理を分かり易くするために、1行1行の処理内容を簡潔するようにスクリプトを記述しています。

利用するデータ

www_accessテーブル:
v:map{user, code, method, size, time, path, referrer, agent, host},
time:int
*マップ形式内は、chararray型となっています。

データの取得

データを取得する際にはFOREACHコマンドを利用します。 vカラムはMap形式になっています。Map形式内のカラムは、v#'column_name'で取得することができます。
OUT = FOREACH www_access GENERATE v#'host' AS host, time;

データの件数制限取得(LIMIT)

データの取得件数を制限したい場合は、LIMITを利用します。
A = FOREACH www_access GENERATE *;
OUT = LIMIT A 10;

グループ化(GROUP BY)

グループ化をするときはGROUP BYを利用します。
A = FOREACH www_access GENERATE v#'host' AS host;
OUT = GROUP A BY host;

重複除去(DISTINCT)

重複除去は、DISTINCTを利用します。
A = FOREACH www_access GENERATE v#'host' AS host;
OUT = DISTINCT A;

並び替え(ORDER BY)

並び替えは、ORDER BYを利用します。
A = FOREACH www_access GENERATE v#'host' AS host;
OUT = ORDER A BY host;

日付毎のデータ数をカウント数

Pigの基本的な処理は、SQLと似ているので分かり易いと思います。 しかし、Pigでの日付関連の処理は、非常に面倒なことになるため注意が必要です。 それでは、日付処理を含んだスクリプトを見ていきましょう。

www_accessのtimeカラムには秒基準のunixtimeが格納されています。 HiveでのUnixTimeは秒基準で処理を行っていますが、Pigではミリ秒基準のため、元のデータに1000倍する必要があります。 また、PigではISOTIMEでの日付処理を行うため、UnixTimeからIsoTimeへの変換も必要です。 そのため、日付毎にカウントする場合などは下記の通りになります。(Pigの1行コメントは、--です。)

--www_access からtimestampを取得
A = FOREACH www_access GENERATE time AS longtime : int;

--pigではunixtimeをmillisecondで基準として用いるためsecond基準のUnixtimeを1000倍する
B = FOREACH A GENERATE (long) longtime * 1000;

--udfを利用して,unixtimeからisotimeに変換
--カラム名が無い場合、$0のようにカラムの順番を指定することで、そのカラムを取得可能
C = FOREACH B GENERATE org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO ($0);

--udfを利用して,時分秒を除去
D = FOREACH C GENERATE org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay ($0);

--group byをかけ,各日付でのカウントを取る
OUT = FOREACH (GROUP D BY $0) GENERATE GROUP AS DAY, COUNT( D ) AS cnt;

上記クエリを実行した際の各行の結果例は下記の通りです。

変数 カラム
A longtime int 1367989867
B longtime long 1367989867000
C org.apache~.unixtoiso_28 chararray 2013-05-08T05:11:07.000Z
D org.apache~.isotoday_~_28_29 chararray 2013-05-08T00:00:00.000Z
OUT DAY chararray 2013-05-08T00:00:00.000Z
cnt int 1

Pigは、各行についてのコメントを書くことも容易なため、可読性が良いかと思います。 しかし、スクリプトが冗長になってしまうと、処理時間に悪影響を及ぼしてしまうため、注意が必要です。

以上、簡単ではありますが、Pigの紹介でした。

Pigは敷居が高いと思っていましたが、触ってみると意外に分かり易く、この夏が終わる頃にはPig使いになれそうです。 みなさんも一緒にPig使いになってみませんか。

* 2013/08/05 クエリを一部修正致しました。
FOREACH tablename GENERATE *; →OUT = FOREACH tablename GENERATE *;

髙橋達