Thursday, February 09, 2012

Handling date/time in Apache Pig

A common usage scenario for Apache Pig is to analyze log files. Most log files contain a timestamp of some sort -- hence the need to handle dates and times in your Pig scripts. I'll present here a few techniques you can use.

Mail server logs

The first example I have is a Pig script which analyzes the time it takes for a mail server to send a message. The script is available here as a gist.

We start by registering the piggybank jar and defining the functions we'll need. I ran this using Elastic MapReduce, and all these functions are available in the piggybank that ships with EMR.

REGISTER file:/home/hadoop/lib/pig/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();             
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME();
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT();
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT();


Since the mail log timestamps don't contain the year, we declare a variable called YEAR which by default is set to the current year via the Unix 'date' command. The variable can also be set when the Pig script is called by running "pig -p YEAR=2011 mypigscript.pig".

%default YEAR `date +%Y`;

We read in the mail logs and extract the lines containing the source of a given message ('from' lines). An example of such a line:

Dec  2 15:13:52 mailserver1 sendmail[1882]: pB2KCqu1001882: from=<info@example.com>, size=9544, class=0, nrcpts=1, msgid=<201112022012.pB2KCqu1001882@mailserver1.example.com>, proto=ESMTP, daemon=MTA, relay=relay1.example.com [10.0.20.6]

To split the line into its various elements, we use the EXTRACT function and a complicated regular expression. Note that in Pig the backslash needs to be escaped:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
SRC = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+from=<([^>]+)>,\\s+size=(\\d+),\\s+class=(\\d+),\\s+nrcpts=(\\d+),\\s+msgid=<([^>]+)>.*relay=(\\S+)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 src: chararray,
 size: chararray,
 classnumber: chararray,
 nrcpts: chararray,
 msgid: chararray,
 relay: chararray
);

For this particular exercise we don't need all the fields of the SRC relation. We keep only a few:

T1 = FOREACH SRC GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp;
FILTER_T1 = FILTER T1 BY NOT sendmailid IS NULL;
DUMP FILTER_T1;

Note that we use the FORMAT function to generate a timestamp string out of the month, day and time fields, and we also add the YEAR variable. The FILTER_T1 relation contains tuples such as:

(pB2KDpaN007050,2011-Dec-2 15:13:52)
(pB2KDpaN007054,2011-Dec-2 15:13:53)
(pB2KDru1003569,2011-Dec-2 15:13:54)

We now use the DATE_TIME function which takes as input our generated timestamp and the date format string representing the timestamp ('yyyy-MMM-d HH:mm:ss'), and returns a DateTime string in Joda-Time format/ ISO 8601 format.

R1 = FOREACH FILTER_T1 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt;
DUMP R1;

The R1 relation contains tuples such as:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z)
(pB2KDru1003569,2011-12-02T15:13:54.000Z)

Note that the timestamp string "2011-Dec-2 15:13:52" got converted into a canonical ISO 8601 DateTime string "2011-12-02T15:13:52.000Z".

Now we can operate on the DateTime strings by using the ISOToUnix function, which takes a DateTime and returns the Unix epoch in milliseconds (which we divide by 1000 to obtain seconds):

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch1 = FOREACH R1 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 as epoch:long;
DUMP toEpoch1;

The toEpoch1 relation contains tuples of the form:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z,1322838832)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z,1322838833)
(pB2KDru1003569,2011-12-02T15:13:54.000Z,1322838834)

We now perform similar operations on lines containing destination email addresses:

DEST = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+to=<([^>]+)>,\\s+delay=([^,]+),\\s+xdelay=([^,]+),.*relay=(\\S+)\\s+\\[\\S+\\],\\s+dsn=\\S+,\\s+stat=(.*)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 dest: chararray,
 delay: chararray,
 xdelay: chararray,
 relay: chararray,
 stat: chararray
);


T2 = FOREACH DEST GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp, dest, stat;
FILTER_T2 = FILTER T2 BY NOT sendmailid IS NULL;

R2 = FOREACH FILTER_T2 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt, dest, stat;

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch2 = FOREACH R2 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 AS epoch:long, dest, stat;

At this point we have 2 relations, toEpoch1 and toEpoch2, which we can join by sendmailid:

R3 = JOIN toEpoch1 BY sendmailid, toEpoch2 BY sendmailid;

The relation R3 will contain tuples of the form

(sendmailid, datetime1, epoch1, sendmailid, datetime2, epoch2, dest, stat)

We generate another relation by keeping the sendmailid, the delta epoch2 - epoch1, the destination email and the status of the delivery. We also order by the epoch delta:

R4 = FOREACH R3 GENERATE $0, $5 - $2, $6, $7;
R5 = ORDER R4 BY $1 DESC;

R5 contains tuples such as:

(pB2KDqo5007488,2,user1@earthlink.net,Sent (1rwzuwyl3Nl36v0 Message accepted for delivery))
(pB2KDru1003560,1,user2@yahoo.com,Sent (ok dirdel))
(pB2KCrvm030964,0,user3@hotmail.com,Sent ( <201112022012.pB2KCrvm030964> Queued mail for delivery))

At this point we can see which email deliveries took longest, and try to identify patterns (maybe certain mail domains make it harder to deliver messages, or maybe email addresses are misspelled, etc).

Nginx logs

In the second example, I'll show how to do some date conversions on Nginx access log timestamps. The full Pig script is available here as a gist.

We parse the Nginx access log lines similarly to the mail log lines in the first example:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
LOGS_BASE = FOREACH RAW_LOGS GENERATE                                            
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+) - - \\[([^\\[]+)\\]\\s+"([^"]+)"\\s+(\\d+)\\s+(\\d+)\\s+"([^"]+)"\\s+"([^"]+)"\\s+"([^"]+)"\\s+(\\S+)')
)
AS (
 ip: chararray,
 timestamp: chararray,
 url: chararray,
 status: chararray,
 bytes: chararray,
 referrer: chararray,
 useragent: chararray,
 xfwd: chararray,
 reqtime: chararray
);
DATE_URL = FOREACH LOGS_BASE GENERATE timestamp;
F = FILTER DATE_URL BY NOT timestamp IS NULL;

The timestamp is of the form "30/Sep/2011:00:10:02 -0700" so we use the appropriate DATE_TIME formatting string 'dd/MMM/yyyy:HH:mm:ss Z' to convert it to an ISO DateTime. Note that we need to specify the timezone with Z:

R1 = FOREACH F GENERATE timestamp, DATE_TIME(timestamp, 'dd/MMM/yyyy:HH:mm:ss Z') as dt;
DUMP R1;

R1 contains tuples of the form:

(30/Sep/2011:00:19:35 -0700,2011-09-30T00:19:35.000-07:00)
(30/Sep/2011:00:19:36 -0700,2011-09-30T00:19:36.000-07:00)
(30/Sep/2011:00:19:37 -0700,2011-09-30T00:19:37.000-07:00)

At this point, if we wanted to convert from DateTime to Unix epoch in seconds, we could use ISOToUnix like we did for the mail logs:

toEpoch = FOREACH R1 GENERATE dt, ISOToUnix(dt) / 1000 as epoch:long;

However, let's use another function called FORMAT_DT to convert from the above DateTime format to another format of the type 'MM/dd/yyyy HH:mm:ss Z'. The first argument to FORMAT_DT is the desired format for the date/time, and the second argument is the original DateTime format:

FDT = FOREACH R1 GENERATE FORMAT_DT('MM/dd/yyyy HH:mm:ss Z', dt) as fdt;
DUMP FDT;

The FDT relation now contains tuples such as:

(09/30/2011 00:19:35 -0700)
(09/30/2011 00:19:36 -0700)
(09/30/2011 00:19:37 -0700)

We can now use a handy function called CustomFormatToISO to convert from any custom date/time format (such as the one we generated in FDT) back to a canonical ISO DateTime format:

toISO = FOREACH FDT GENERATE fdt, CustomFormatToISO(fdt, 'MM/dd/yyyy HH:mm:ss Z');
DUMP toISO;

(09/30/2011 00:19:35 -0700,2011-09-30T07:19:35.000Z)
(09/30/2011 00:19:36 -0700,2011-09-30T07:19:36.000Z)
(09/30/2011 00:19:37 -0700,2011-09-30T07:19:37.000Z)

Note how the custom DateTime string "09/30/2011 00:19:35 -0700" got transformed into the canonical ISO DateTime string "2011-09-30T07:19:35.000Z".

Converting Unix epoch to DateTime

Some log files have timestamps in Unix epoch format. If you want to transform them into DateTime, you can use the UnixToISO function:

DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();

Here is an input file:

$ cat unixtime.txt
1320777563
1320777763
1320779563
1320787563

And here is a Pig script which converts the epoch into DateTime strings. Note that UnixToISO expects the epoch in milliseconds, and our input is in seconds, so we have to multiply each input value by 1000 to get to milliseconds:

UNIXTIMES = LOAD 's3://mybucket.com/unixtime.txt' as (unixtime:long);
D = FOREACH UNIXTIMES GENERATE UnixToISO(unixtime * 1000);
DUMP D;

(2011-11-08T18:39:23.000Z)
(2011-11-08T18:42:43.000Z)
(2011-11-08T19:12:43.000Z)
(2011-11-08T21:26:03.000Z)

8 comments:

test123 said...

How fast can you analyze logs like that ?

Grig Gheorghiu said...

LCF -- with Elastic MapReduce, it depends on what size of a cluster you use. Analyzing 1 million lines of logs takes ~ 1/2 hr with 2 m1.xlarge cluster nodes.

Gheorghe Gheorghiu said...

Am tot vrut sa-ti spun ca ma bucur ca ai timp sa postezi pe Agile !

software test automation just learning said...

wow this is extremely helpful. thanks for doing the research, finding COGROUP, and sharing your work with us. saves me a lot of time.

Universal Localhost said...

Thanks for the post.
It was quite helpful as I didn't know how to play around with date.

Anonymous said...

This has been such a helpful page for constructing pig queries. Thank you!

Murali Rao said...

Nice article on usage of date functions. Thank You !

Unknown said...

how to find difference between two datetime and my result shoud be in datetime format only
so that I can get hour from result set. Can anyone assist on this

Modifying EC2 security groups via AWS Lambda functions

One task that comes up again and again is adding, removing or updating source CIDR blocks in various security groups in an EC2 infrastructur...