Always, always, always - compress your tables. Most of your Hive query execution time is not going into doing complex computations. Aggregates, averages, and counts across tons of weblog data are not exactly computationally complex. It's the reading & writing of tons of data that's taking time.
In computer science parlance, it's not CPU bound, it's I/O bound.
If your data is already compressed (which in all probability, is the case), then nothing can beat the speed at which you can import it into Hive. Simply use the LOAD DATA LOCAL command to load compressed files into a TextFile table. Hive will automatically detect the compression and decompress on-the-fly when executing queries.
Leaving aside the data import times, here are some (unscientific) benchmarks for a SELECT COUNT(1) query on my raw data tables. From 106 sec down to 60 sec. At 10% of the storage space.
| Storage | Row count | Table size | Query time |
|---|---|---|---|
| Uncompressed | 8,259,720 | 7,686 MB | 106 sec |
| Compressed – GzipCodec (RECORD) | 8,259,720 | 4,773 MB | 101 sec |
| Compressed – native gzip | 8,259,720 | 736 MB | 60 sec |
Now, each of these tables was put through a moderately complex Hive query [1], which processed the data using two custom map/reduce scripts, and inserted the procssed data into a new table. Sure, the initial ETL phase is not significantly faster, but look at the amount of space I'm saving.
| Storage | Resultant row count | Resultant table size | Query time |
|---|---|---|---|
| Uncompressed | 1,561,633 | 1,608 MB | 699 sec |
| Compressed – GzipCodec (RECORD) | 1,561,633 | N/A [2] | 563 sec |
| Compressed – native gzip | 1,561,633 | 86 MB | 510 sec |
I'll try to run some complex queries on the table with processed data to compared between compressed and uncompressed storage.
Learn how to compress your Hive tables at Hive Wiki – Compressed Storage
[1] Take a look at the section titled "Processing the raw data (ETL)" here
[2] I forgot to record the table size before dropping the table :-)
I don't know Hive but it is point to keep in mind while doing database optimization as compression may be the key in similar cases.
ReplyDeleteCan go way more faster with LZO ;)
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHi,
ReplyDeletewhat does "Compressed – native gzip" stand for? Do you mean LOADing already gzipped files into Hive?
If yes, how do you compress the logs before loading them to Hive? Do you use Mapreduce to compress log files in parallel?
@Inv: Yes, it means loading already compressed files directly into Hive/HDFS.
ReplyDeleteIn my case, the files were weblogs and were compressed by logrotate every day.
What's your data source? How you implement the compressions depends a lot on where & how you're getting your data.
This comment has been removed by the author.
ReplyDeleteHi, thanks for quick reply!
ReplyDeleteMy source is Flume writing plaintext files into HDFS.
I'm thinking of doing something similar to logrotate for HDFS - compressing and loading into Hive once a day.
I'm also looking into Hive indexes and not sure if they would work with compressed data.
Any experience with Flume/logrotating HDFS files/indexes?
I haven't worked with Flume, but going from what the user guide says [1], I'd recommend compressing the files on the machines that are producing them. THEN moving the compressed files into HDFS. It has the added advantage of reducing transfer times. The only scenario where you can't possibly do this is where the original machines (ones producing the text files) cannot take the additional load of compressing them.
ReplyDeleteAlso, I'm not sure if Hive indexes have been released properly yet. It seems that indexes have been added through a plugin architecture (different plugins will use different indexing algorithms and may support different table formats) and there is only a reference plugin available right now.
[1] Flume is a distributed, reliable, and available service for efficiently moving large amounts of data soon after the data is produced. The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).