Introduction
I've been playing around with
Hadoop since the last fortnight to see how it performs with our weblog data processing jobs (Apache access logs). Right now we're using a blink-and-it-breaks system running a bunch of custom Perl scripts for log processing and MySQL for data storage. It's running on a single server class machine. It takes it about 3-4 hours to extract, transform, and load (into MySQL) one day of weblogs (this includes identifying sessions based on a 30min timeout). Another 1-2 hours to aggregate data and generate various CSV reports.
So, a total of 6-7 hours, when it's not crashing.
With
Hadoop +
Hive I've brought the first phase down from 3-4 hours to under an hour! Anywhere between 15 minutes to 45 minutes to copy log files into a raw Hive table (depending upon how much replication you've configured your cluster to have). After that, 12 minutes to extract, transform, and load into a new table (including session identification).
I gave
Cloudbase a shot before Hive. Cloudbase was *dead* easy to setup and get off the ground (I love their simple approach to UDFs & UDTs). However, it's performance was not as good as Hive. On the other hand, I spent quite a lot of time trying to even import my custom log formats into Hive (I'm looking at you GenericUDF). But once I got started, man, was I blown away! Take a look at my
Twitter updates on Cloudbase and Hive comparisons.
Cluster setup: I managed to corner 4 desktop machines in the office for my experimental cluster. They are Core-2 Duo 2.4 GHz machines with 1GB of RAM and standard SATA hard disks. All four are connected to each other using a switch.
Setting up the Hive tables
Here's how I've set up my Hive tables:
The following table (raw) stores the raw, unprocessed log files. I've partitioned the table by date so that the ETL operations can be run on bite sized chunks consisting of a day's worth of data.
create table raw(line string) partitioned by(dt string)
row format delimited fields terminated by '\t' lines terminated by '\n';
The following table (hits) contains the processed & filtered hits, partitioned by date. Within each partition the rows are clustered by Apache user-id (aid) and within each cluster they are again sorted by page view times (ts). According to
Hive Wiki this clustering & sorting can improve efficiency in certain queries. Although it does lead to an increase during the ETL phase. I'm really not sure what the buckets do - and 1,000 is just a number I pulled out of thin air.
create table streamed_hits(ip_address string, aid string, uid string,
ts string, method string, uri string,
response string, session_id string,
session_start string, pv_number int,
clickstream string)
partitioned by(dt string)
clustered by (aid) sorted by (aid, ts) into 1000 buckets
row format delimited fields terminated by '\t' lines terminated by '\n';
Importing the raw data
Here's how one import data into the raw table. The import process is a simple HDFS file copy operation[1]. The more replication you have, the larger number of nodes the data needs to be copied to. Hence, the more time it takes during this operation.
load data local inpath '/weblogs/20090602-access.log'
into table raw partition(dt='2009-06-02');
load data local inpath '/weblogs/20090603-access.log'
into table raw partition(dt='2009-06-03');
Processing the raw data (ETL)
Here's the Hive query for the ETL operation. I'm making sure that uninteresting rows are discarded upfront using the WHERE clause in the inner query. I had earlier put this in the outer query (which was causing LOTS of uninteresting rows to go through the parsing logic). The (apparently, in hindsight) stupid placement of the WHERE clause was bumping up the completion time of this query from 15mins to 2+ hours!
I'm using a custom map script to read my log files, which are in a 'non-standard' format. There is another way to do this - UDFs & SerDe - but according to the
discussion I had on the mailing list, these features are not fit for newbie consumption yet. The DISTRUBUTE BY & SORT BY is essential for my session & clickstream identification script to work (thanks to the
Hive Wiki).
from
(from raw
select transform line using 'parse_logs.pl' as ip_address, aid, uid, ts, method, uri,
response, referer, user_agent, cookies, ptime
where lower(line) rlike '^(\\S+) (\\S+) (\\S+) \\[(.*?)\\] "(.*?)" (\\d+) (\\d+|-) "(.*?)" ".*?(mozilla|msie|opera).*?".*'
and not line rlike '^(\\S+) (\\S+) (\\S+) \\[(.*?)\\] "(\\S+) (/images.*?|/styles.*?|/javascripts.*?|/adserver.*?|.*?favicon.*?) (\\S+)".*'
and dt='2009-06-30'
distribute by aid sort by aid, ts asc) parsed
insert overwrite table streamed_hits partition(dt='2009-06-30')
select transform parsed.ip_address, parsed.aid, parsed.uid, parsed.ts, parsed.method, parsed.uri,
parsed.response, parsed.referer, parsed.user_agent, parsed.cookies, parsed.ptime
using 'identify_sessions_and_clickstream.pl' as ip_address, aid, uid, ts, method, uri,
response, session_id, session_start, pv_number, clickstream;
Custom map/reduce scripts used
Here's the parse_logs.pl Perl script. Please excuse my Perl - I'm not a Perl programmer. I was just trying to reuse the funky regular expressions that we had lying around in our current log processing system. You use plug in any script, that reads/writes from standard input/output, into Hive.
#!/usr/bin/perl
my %monthNum=(
"Jan" => 1,
"Feb" => 2,
"Mar" => 3,
"Apr" => 4,
"May" => 5,
"Jun" => 6,
"Jul" => 7,
"Aug" => 8,
"Sep" => 9,
"Oct" => 10,
"Nov" => 11,
"Dec" => 12,
);
while (defined($line = )) {
if (($host,$user,$apache,$rfc931,$method, $url, $ver, $status,$size,$referrer,$agent,$cookies,$ptime) = $line =~ m/^(\S+) (\S+) (\S+) \[(\S+ \S+)\] "(\S+) (.*?) HTTP\/([0-9\.]*)" (\d+) (\d+|-) "([^"]*)" "([^"]*)" "([^"]*)" (\d+|-)$/) {
# everything found -- nothing to be done
}elsif (($host,$user,$apache,$rfc931, $method, $url, $ver, $status,$size,$referrer,$agent,$cookies) = $line =~ m/^(\S+) (\S+) (\S+) \[(\S+ \S+)\] "(\S+) (.*?) HTTP\/([0-9\.]*)" (\d+) (\d+|-) "([^"]*)" "([^"]*)" "([^"]*)"$/) {
$ptime="";
}
$ts="";
# Converting date to yyyy-MM-dd hh:mm:ss
if (($day, $monthname, $year, $hour, $minute, $sec)= $rfc931 =~/^(\d{2})[\/-]([^\/-]+)[\/-](\d{4}):(\d{2}):(\d{2}):(\d{2})/) {
$month=$monthNum{$monthname};
$ts=sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year, $month, $day, $hour, $minute, $sec);
}
$agent=lc($agent);
$user=lc($user);
print "$host\t$apache\t$user\t$ts\t$method\t$url\t$status\t$referrer\t$agent\t$cookies\t$ptime\n"
}
Here's the identify_sessions_and_clickstream.pl Perl script. Again, same disclaimer about my Perl as above. Also, please excuse the extremely naive way in which I'm trying to construct the clickstream. That work in progress.
This script depends on the fact that the incoming data is sorted on Apache user-id and timestamp (that's what the DISTRIBUTE BY & SORT BY in the Hive query achieve). Apart from identifying 60min sessions I'm cleaning up the URLs to make them more 'generic' (removing query strings, removing trip IDs etc.)
#!/usr/bin/perl
use Date::Parse;
$session_duration=60*60; # in seconds
$prev_apache=undef;
$prev_ts=undef;
$pv_number=undef;
$session_id=undef;
$session_num=undef;
$session_start=undef;
$clickstream='';
while (defined($line = )) {
chomp($line);
($ip_address, $apache, $user, $ts, $method, $url, $status, $referrer, $agent, $cookies, $ptime) = split(/\t/, $line);
$url, $status, $referrer, $agent, $cookies, $ptime\n";
$url =~ s/(.*?)\?.*/$1/i;
$url =~ s/^\/(trains|flights)\/itinerary\/.*?\/(.*?)/\/$1\/itinerary\/itinerary-id\/$2/;
$url =~ s/^\/(trains|flights)\/itinerary\/(\d+)$/\/$1\/itinerary\/itinerary-id/;
$url =~ s/^\/(activate|reactivate|reset)\/.*/\/$1/;
$url =~ s/^\/share.*/\/share/;
$url =~ s/^\/account\/trips\/.*/\/account\/trips\/trip-id/;
$url =~ s/^\/trains\/stations\/[0-9]*/\/trains\/stations\/numeric-id/;
$url =~ s/^\/trains\/stations\/[A-Za-z]*/\/trains\/stations\/alphanumeric-id/;
$url =~ s/^\/hotels\/info\/.*/\/hotels\/info\/hotel-id/;
$url =~ s/^\/places\/hotels\/.*\/images.*/\/places\/hotels\/images/;
$url =~ s/^\/places\/hotels\/images.*/\/places\/hotels\/images/;
$url =~ s/^\/newsletters\/images\/.*/\/newsletters\/images/;
$url =~ s/^\/index.*/\//;
$url =~ s/\/$//; # remove trailing slashes
if(!defined($prev_apache) || $prev_apache ne $apache){
if($apache eq '-' or $apache eq '') {
# TODO -- use IP address & user_agent to identify sessions?
$pv_number='';
$session_start='';
$session_id='';
$clickstream='';
} else {
$pv_number=1;
$session_start=$ts;
$session_num=1;
$session_id="$apache|$session_start";
$clickstream="$url"
}
} elsif($prev_apache eq $apache) {
if((str2time($ts)-str2time($prev_ts))<=$session_duration) { $pv_number=$pv_number+1; if($pv_number<70) clickstream="$clickstream|$url" pv_number="1;" session_num="$session_num+1;" session_id="$apache|$session_start" clickstream="$url" prev_apache="$apache;" prev_ts="$ts;">
[1] A small warning here, if your Hive session is running on one of your active nodes. Beware that because of HDFS default policy your Hive node's disk is going to be filled up first. This might lead to a disk space problem if your HDFS partition is being used by other processes as well. I ran into this and had to run the HDFS balancer, which, btw, is extremely slow! (even after increasing the dfs.balance.bandwidthPerSec property).