Friday, July 24, 2009

Using Hive for weblog analysis

Introduction


I've been playing around with Hadoop since the last fortnight to see how it performs with our weblog data processing jobs (Apache access logs). Right now we're using a blink-and-it-breaks system running a bunch of custom Perl scripts for log processing and MySQL for data storage. It's running on a single server class machine. It takes it about 3-4 hours to extract, transform, and load (into MySQL) one day of weblogs (this includes identifying sessions based on a 30min timeout). Another 1-2 hours to aggregate data and generate various CSV reports.

So, a total of 6-7 hours, when it's not crashing.

With Hadoop + Hive I've brought the first phase down from 3-4 hours to under an hour! Anywhere between 15 minutes to 45 minutes to copy log files into a raw Hive table (depending upon how much replication you've configured your cluster to have). After that, 12 minutes to extract, transform, and load into a new table (including session identification).

I gave Cloudbase a shot before Hive. Cloudbase was *dead* easy to setup and get off the ground (I love their simple approach to UDFs & UDTs). However, it's performance was not as good as Hive. On the other hand, I spent quite a lot of time trying to even import my custom log formats into Hive (I'm looking at you GenericUDF). But once I got started, man, was I blown away! Take a look at my Twitter updates on Cloudbase and Hive comparisons.

Cluster setup: I managed to corner 4 desktop machines in the office for my experimental cluster. They are Core-2 Duo 2.4 GHz machines with 1GB of RAM and standard SATA hard disks. All four are connected to each other using a switch.

Setting up the Hive tables


Here's how I've set up my Hive tables:

The following table (raw) stores the raw, unprocessed log files. I've partitioned the table by date so that the ETL operations can be run on bite sized chunks consisting of a day's worth of data.
create table raw(line string) partitioned by(dt string)
row format delimited fields terminated by '\t' lines terminated by '\n';


The following table (hits) contains the processed & filtered hits, partitioned by date. Within each partition the rows are clustered by Apache user-id (aid) and within each cluster they are again sorted by page view times (ts). According to Hive Wiki this clustering & sorting can improve efficiency in certain queries. Although it does lead to an increase during the ETL phase. I'm really not sure what the buckets do - and 1,000 is just a number I pulled out of thin air.
create table streamed_hits(ip_address string, aid string, uid string,
ts string, method string, uri string,
response string, session_id string,
session_start string, pv_number int,
clickstream string)
partitioned by(dt string)
clustered by (aid) sorted by (aid, ts) into 1000 buckets
row format delimited fields terminated by '\t' lines terminated by '\n';


Importing the raw data


Here's how one import data into the raw table. The import process is a simple HDFS file copy operation[1]. The more replication you have, the larger number of nodes the data needs to be copied to. Hence, the more time it takes during this operation.
load data local inpath '/weblogs/20090602-access.log'
into table raw partition(dt='2009-06-02');

load data local inpath '/weblogs/20090603-access.log'
into table raw partition(dt='2009-06-03');


Processing the raw data (ETL)


Here's the Hive query for the ETL operation. I'm making sure that uninteresting rows are discarded upfront using the WHERE clause in the inner query. I had earlier put this in the outer query (which was causing LOTS of uninteresting rows to go through the parsing logic). The (apparently, in hindsight) stupid placement of the WHERE clause was bumping up the completion time of this query from 15mins to 2+ hours!

I'm using a custom map script to read my log files, which are in a 'non-standard' format. There is another way to do this - UDFs & SerDe - but according to the discussion I had on the mailing list, these features are not fit for newbie consumption yet. The DISTRUBUTE BY & SORT BY is essential for my session & clickstream identification script to work (thanks to the Hive Wiki).
from
(from raw
select transform line using 'parse_logs.pl' as ip_address, aid, uid, ts, method, uri,
response, referer, user_agent, cookies, ptime
where lower(line) rlike '^(\\S+) (\\S+) (\\S+) \\[(.*?)\\] "(.*?)" (\\d+) (\\d+|-) "(.*?)" ".*?(mozilla|msie|opera).*?".*'
and not line rlike '^(\\S+) (\\S+) (\\S+) \\[(.*?)\\] "(\\S+) (/images.*?|/styles.*?|/javascripts.*?|/adserver.*?|.*?favicon.*?) (\\S+)".*'
and dt='2009-06-30'
distribute by aid sort by aid, ts asc) parsed
insert overwrite table streamed_hits partition(dt='2009-06-30')
select transform parsed.ip_address, parsed.aid, parsed.uid, parsed.ts, parsed.method, parsed.uri,
parsed.response, parsed.referer, parsed.user_agent, parsed.cookies, parsed.ptime
using 'identify_sessions_and_clickstream.pl' as ip_address, aid, uid, ts, method, uri,
response, session_id, session_start, pv_number, clickstream;


Custom map/reduce scripts used


Here's the parse_logs.pl Perl script. Please excuse my Perl - I'm not a Perl programmer. I was just trying to reuse the funky regular expressions that we had lying around in our current log processing system. You use plug in any script, that reads/writes from standard input/output, into Hive.

#!/usr/bin/perl

my %monthNum=(
"Jan" => 1,
"Feb" => 2,
"Mar" => 3,
"Apr" => 4,
"May" => 5,
"Jun" => 6,
"Jul" => 7,
"Aug" => 8,
"Sep" => 9,
"Oct" => 10,
"Nov" => 11,
"Dec" => 12,
);

while (defined($line = )) {
if (($host,$user,$apache,$rfc931,$method, $url, $ver, $status,$size,$referrer,$agent,$cookies,$ptime) = $line =~ m/^(\S+) (\S+) (\S+) \[(\S+ \S+)\] "(\S+) (.*?) HTTP\/([0-9\.]*)" (\d+) (\d+|-) "([^"]*)" "([^"]*)" "([^"]*)" (\d+|-)$/) {
# everything found -- nothing to be done
}elsif (($host,$user,$apache,$rfc931, $method, $url, $ver, $status,$size,$referrer,$agent,$cookies) = $line =~ m/^(\S+) (\S+) (\S+) \[(\S+ \S+)\] "(\S+) (.*?) HTTP\/([0-9\.]*)" (\d+) (\d+|-) "([^"]*)" "([^"]*)" "([^"]*)"$/) {
$ptime="";
}

$ts="";
# Converting date to yyyy-MM-dd hh:mm:ss
if (($day, $monthname, $year, $hour, $minute, $sec)= $rfc931 =~/^(\d{2})[\/-]([^\/-]+)[\/-](\d{4}):(\d{2}):(\d{2}):(\d{2})/) {
$month=$monthNum{$monthname};
$ts=sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year, $month, $day, $hour, $minute, $sec);
}


$agent=lc($agent);
$user=lc($user);
print "$host\t$apache\t$user\t$ts\t$method\t$url\t$status\t$referrer\t$agent\t$cookies\t$ptime\n"
}


Here's the identify_sessions_and_clickstream.pl Perl script. Again, same disclaimer about my Perl as above. Also, please excuse the extremely naive way in which I'm trying to construct the clickstream. That work in progress.

This script depends on the fact that the incoming data is sorted on Apache user-id and timestamp (that's what the DISTRIBUTE BY & SORT BY in the Hive query achieve). Apart from identifying 60min sessions I'm cleaning up the URLs to make them more 'generic' (removing query strings, removing trip IDs etc.)
#!/usr/bin/perl

use Date::Parse;

$session_duration=60*60; # in seconds
$prev_apache=undef;
$prev_ts=undef;
$pv_number=undef;
$session_id=undef;
$session_num=undef;
$session_start=undef;
$clickstream='';

while (defined($line = )) {
chomp($line);
($ip_address, $apache, $user, $ts, $method, $url, $status, $referrer, $agent, $cookies, $ptime) = split(/\t/, $line);
$url, $status, $referrer, $agent, $cookies, $ptime\n";
$url =~ s/(.*?)\?.*/$1/i;
$url =~ s/^\/(trains|flights)\/itinerary\/.*?\/(.*?)/\/$1\/itinerary\/itinerary-id\/$2/;
$url =~ s/^\/(trains|flights)\/itinerary\/(\d+)$/\/$1\/itinerary\/itinerary-id/;
$url =~ s/^\/(activate|reactivate|reset)\/.*/\/$1/;
$url =~ s/^\/share.*/\/share/;
$url =~ s/^\/account\/trips\/.*/\/account\/trips\/trip-id/;
$url =~ s/^\/trains\/stations\/[0-9]*/\/trains\/stations\/numeric-id/;
$url =~ s/^\/trains\/stations\/[A-Za-z]*/\/trains\/stations\/alphanumeric-id/;
$url =~ s/^\/hotels\/info\/.*/\/hotels\/info\/hotel-id/;
$url =~ s/^\/places\/hotels\/.*\/images.*/\/places\/hotels\/images/;
$url =~ s/^\/places\/hotels\/images.*/\/places\/hotels\/images/;
$url =~ s/^\/newsletters\/images\/.*/\/newsletters\/images/;
$url =~ s/^\/index.*/\//;
$url =~ s/\/$//; # remove trailing slashes

if(!defined($prev_apache) || $prev_apache ne $apache){
if($apache eq '-' or $apache eq '') {
# TODO -- use IP address & user_agent to identify sessions?
$pv_number='';
$session_start='';
$session_id='';
$clickstream='';
} else {
$pv_number=1;
$session_start=$ts;
$session_num=1;
$session_id="$apache|$session_start";
$clickstream="$url"
}
} elsif($prev_apache eq $apache) {
if((str2time($ts)-str2time($prev_ts))<=$session_duration) { $pv_number=$pv_number+1; if($pv_number<70) clickstream="$clickstream|$url" pv_number="1;" session_num="$session_num+1;" session_id="$apache|$session_start" clickstream="$url" prev_apache="$apache;" prev_ts="$ts;">



[1] A small warning here, if your Hive session is running on one of your active nodes. Beware that because of HDFS default policy your Hive node's disk is going to be filled up first. This might lead to a disk space problem if your HDFS partition is being used by other processes as well. I ran into this and had to run the HDFS balancer, which, btw, is extremely slow! (even after increasing the dfs.balance.bandwidthPerSec property).

9 comments:

  1. Saurabh - Great post. Wondering why you choose to apply your parse_logs.pl script within HIVE versus using the Hadoop streaming interface?

    ReplyDelete
  2. I'm not sure I fully understand what you mean by "Hadoop streaming interface" in this context. Isn't that what Hive would be using internally to let me use the Perl script?

    If your point is that for pre-processing logs one doesn't require to use Hive, I'm sure there are better ways to do this. It's just that I wanted to get the job done in a reasonable time frame. I didn't have the luxury of time to experiment & learn multiple tools to achieve one task.

    ReplyDelete
    Replies
    1. I think he meant using flume. Flume will work as a source/sink, taking logs from local fs, putting them in hdfs.

      Delete
    2. I think he meant using flume. Flume will work as a source/sink, taking logs from local fs, putting them in hdfs.

      Delete
  3. Great post! Just one thing: it looks like the rendering of identify_sessions_and_clickstream.pl is getting mangled at the end?

    ReplyDelete
  4. Thx for the post. We have a scenario where we need to parse a custom log (something other than apache access log), in that case should we be writing our own adapter / parsing script ?

    ReplyDelete
  5. Prash: If the log is tab separated, there is a possibility of importing into Hive directly and doing most of the processing using Hive queries. If the log is NOT tab separate, then yes, you have to write your own parsing script.

    ReplyDelete
  6. Thank you for the wonderful work. I would like to know on your prototyping cluster setup. What hardware and software did you used in your initial experimental cluster. I have some old (not very old - each of 4GB RAM with a decent processor etc., and working fine till now) Dell Windows XP machines and want to convert them to a Red Hat Linux for a Hadoop cluster for my experimental purposes. Can you give me some suggestions on how to proceed on this plan?

    ReplyDelete
  7. could you add the sample logs for doing the queries as you did

    ReplyDelete