Thursday, July 30, 2009

How to make your Hive cluster blazingly fast

Squeeze it! Literally.

Always, always, always - compress your tables. Most of your Hive query execution time is not going into doing complex computations. Aggregates, averages, and counts across tons of weblog data are not exactly computationally complex. It's the reading & writing of tons of data that's taking time.

In computer science parlance, it's not CPU bound, it's I/O bound.

If your data is already compressed (which in all probability, is the case), then nothing can beat the speed at which you can import it into Hive. Simply use the LOAD DATA LOCAL command to load compressed files into a TextFile table. Hive will automatically detect the compression and decompress on-the-fly when executing queries.

Leaving aside the data import times, here are some (unscientific) benchmarks for a SELECT COUNT(1) query on my raw data tables. From 106 sec down to 60 sec. At 10% of the storage space.

StorageRow countTable sizeQuery time
Uncompressed8,259,7207,686 MB106 sec
Compressed – GzipCodec (RECORD)8,259,7204,773 MB101 sec
Compressed – native gzip8,259,720736 MB60 sec


Now, each of these tables was put through a moderately complex Hive query [1], which processed the data using two custom map/reduce scripts, and inserted the procssed data into a new table. Sure, the initial ETL phase is not significantly faster, but look at the amount of space I'm saving.

StorageResultant row countResultant table sizeQuery time
Uncompressed1,561,6331,608 MB699 sec
Compressed – GzipCodec (RECORD)1,561,633N/A [2]563 sec
Compressed – native gzip1,561,63386 MB510 sec


I'll try to run some complex queries on the table with processed data to compared between compressed and uncompressed storage.

Learn how to compress your Hive tables at Hive Wiki – Compressed Storage

[1] Take a look at the section titled "Processing the raw data (ETL)" here
[2] I forgot to record the table size before dropping the table :-)

8 comments:

  1. I don't know Hive but it is point to keep in mind while doing database optimization as compression may be the key in similar cases.

    ReplyDelete
  2. Can go way more faster with LZO ;)

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. Hi,

    what does "Compressed – native gzip" stand for? Do you mean LOADing already gzipped files into Hive?
    If yes, how do you compress the logs before loading them to Hive? Do you use Mapreduce to compress log files in parallel?

    ReplyDelete
  5. @Inv: Yes, it means loading already compressed files directly into Hive/HDFS.

    In my case, the files were weblogs and were compressed by logrotate every day.

    What's your data source? How you implement the compressions depends a lot on where & how you're getting your data.

    ReplyDelete
  6. This comment has been removed by the author.

    ReplyDelete
  7. Hi, thanks for quick reply!

    My source is Flume writing plaintext files into HDFS.

    I'm thinking of doing something similar to logrotate for HDFS - compressing and loading into Hive once a day.

    I'm also looking into Hive indexes and not sure if they would work with compressed data.

    Any experience with Flume/logrotating HDFS files/indexes?

    ReplyDelete
  8. I haven't worked with Flume, but going from what the user guide says [1], I'd recommend compressing the files on the machines that are producing them. THEN moving the compressed files into HDFS. It has the added advantage of reducing transfer times. The only scenario where you can't possibly do this is where the original machines (ones producing the text files) cannot take the additional load of compressing them.

    Also, I'm not sure if Hive indexes have been released properly yet. It seems that indexes have been added through a plugin architecture (different plugins will use different indexing algorithms and may support different table formats) and there is only a reference plugin available right now.

    [1] Flume is a distributed, reliable, and available service for efficiently moving large amounts of data soon after the data is produced. The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).

    ReplyDelete