Treasure DataにおけるHiveQLのTips

 こんにちは、今回のブログ担当 高橋です。
 本題とは逸れますが、ビッグデータに関連するトレンドとして、M2M(Machine to Machine)やIoT(Internet of Things)と呼ばれる技術があります。 SIOSビッグデータチームとしても、これらの技術によって大量に収集されるデータには注目しています。 これらの技術を個人で実現可能なプログラマブルデバイスとして、ArduinoやRaspberry Piが普及してきています。 特に、Arduinoは、接触センサや赤外センサなど各種センサを実装でき、なおかつBluetoothやZigBeeなどの通信モジュールの実装も可能です。 例えば、複数台のArduinoを組み合わせて自宅内センサネットワークを構築し、日常生活の見える化ができたら楽しそうですね。 こうしたビッグデータを生み出す様々なアイデアを実現するために、私たちも日々、ビッグデータ関連技術についての調査も進めています。
 さて、それでは本題に入ります。今回のブログのテーマは、HiveQLです。
 TreasureDataのtd-commandでは、下記のコマンドの様に、HiveQLというSQLに似たクエリ言語を利用し、TreasureData上のデータを操作することを可能としています。
 $td query -d (database) “SELECT * FROM table LIMIT 5”
 そこで、HiveQLの知識共有のために、ビッグデータチーム内でプレゼンを行った際のスライドとその補足情報にて、HiveQLを記述する際の注意点や役立つTipsを紹介したいと思います。

(オリジナルのスライドから部分的に削除有り)

Treasure Dataで利用可能なHiveQL

Treasure Dataでは、Apache Hiveが提供しているHiveQLを利用しており、td commandでは、SELECT文に関するクエリのみ有効(一部例外有り)となっています。 下記にHiveQLのSELECTの構文を示します(参考:公式ドキュメント)。
SELECTの構文
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[CLUSTER BY col_list
| [DISTRIBUTE BY col_list] [SORT BY col_list]
]
[LIMIT number]

「2.5 Treasure Dataにおけるデータ形式」の補足

'v'カラムのデータ形式は、map となっています。 そのため、より高速にデータ処理を行いたい場合は、カラム内のデータにスキーマを設定することが効果的です。
 $ schema:set (database) (table) [columns...]
 スキーマを設定後に、カラム指定を行う場合は、SELECT v['column'] ~ ではなく、SELECT column ~ となります。 (追記2014-11-13: 現在はデータインポート時にスキーマがセットされるため、この辺りをきにする必要はないです。)

「2.7 LEFT SEMI JOIN」と「2.8 MAPJOIN」の補足

約2億件のレコードに対して、下記3種類の集計用クエリをTreasureData上で発行した際のMapReduceのCPU処理時間の比較を行います。
(1)非効率的なクエリ例
   SELECT COUNT(1) FROM
  (
   SELECT v['id'] AS id FROM logs1
  ) ids1 JOIN
  (
   SELECT v['id'] AS id FROM logs2 GROUP BY v['id']
  ) ids2 ON ids1.id = ids2.id

上記のクエリを発行した場合、MapReduceは3段構成で実行されます。
  1. stage-1 : 46秒 : GROUP BY計算
  2. stage-2 : 25分50秒 : JOIN計算
  3. stage-3 : 6秒 : SELECT COUNT計算

(2)GROUP BYの代わりにLEFT SEMI JOINの利用
   SELECT COUNT(1) FROM
  (
   SELECT v['id'] AS id FROM logs1
  ) ids1 LEFT SEMI JOIN
  (
   SELECT v['id'] AS id FROM logs2
  ) ids2 ON ids1.id = ids2.id

上記のクエリを発行した場合、MapReduceは2段構成で実行されます。
  1. stage-1 : 23分59秒 : JOIN計算
  2. stage-2 : 6秒 : SELECT COUNT計算

(3)MapJoinの利用
   SELECT /*+ MAPJOIN(ids2) */ COUNT(1) FROM
  (
   SELECT v['id'] AS id FROM logs1
  ) ids1 LEFT SEMI JOIN
  (
   SELECT v['id'] AS id FROM logs2
  ) ids2 ON ids1.id = ids2.id

上記のクエリを発行した場合、MapReduceは2段構成で実行されます。
  1. stage-1 : 9分35秒 : JOIN計算
  2. stage-2 : 28秒 : SELECT COUNT計算

 (1)では24分6秒かかっていたものが、(3)では10分4秒と半分以下に時間を短縮することができました。  このようにLEFT SEMI JOINやMapJoinを利用する事で、効果的に処理することができます。(LEFT SEMI JOINについては、CPU処理時間の短縮には効果が薄いですが、MapReduceの回数を削減できることで、データ転送の時間を削減できるメリットがあります。) *注意*CPU処理時間は、CPU占有数によっても変動します。

(追記2014-11-13: スキーマが自動でセットされるようになったため、クエリも下記の通りにvカラムではなく直接カラム名を利用しましょう。)
   SELECT /*+ MAPJOIN(ids2) */ COUNT(1) FROM
  (
   SELECT id FROM logs1
  ) ids1 LEFT SEMI JOIN
  (
   SELECT id FROM logs2
  ) ids2 ON ids1.id = ids2.id

「2.9 全体集計と個別集計」の訂正

 現在のTDにおけるHiveのバージョンは、0.10のため、ROLLUPの利用は可能となっております。

 以上、無駄な処理を減らして、Treasure Dataを効果的・効率的に利用していきましょう。

髙橋達