Tuesday, February 21, 2012

Set operations in Apache Pig


Simple set operation examples

While writing Apache Pig scripts, I realized that in many cases the result I was after was attainable through a series of set operations performed on various relations. It’s not very clear from the documentation how to perform these operations. I googled a bit and found this PDF from Tufts University on ‘Advanced Pig’. In a nutshell, COGROUP is your friend. Here are some simple examples that show how you can perform set operations in Pig using COGROUP.

Let’s assume we have 2 relations TEST1 and TEST2. We load TEST1 from test1.txt containing:


1,aaaa
2,bbbb
3,cccc
4,dddd
5,eeee
6,ffff
7,gggg
8,hhhh
9,iiii

TEST1 = LOAD 's3://mybucket/test1.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);

We load TEST2 from test2.txt containing:

7,ggggggg
8,hhhhhhh
9,iiiiiii
10,jjjjjjj
11,kkkkkkk

TEST2 = LOAD 's3://mybucket/test2.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);


We use COGROUP to generate a new relation. COGROUP is similar to JOIN, in that it takes one or more fields of each of its member relations. Here is how we cogroup based on the id field of TEST1 and also id of TEST2:

CGRP = COGROUP TEST1 BY id, TEST2 BY id;
DUMP CGRP;

(1,{(1,aaaa)},{})
(2,{(2,bbbb)},{})
(3,{(3,cccc)},{})
(4,{(4,dddd)},{})
(5,{(5,eeee)},{})
(6,{(6,ffff)},{})
(7,{(7,gggg)},{(7,ggggggg)})
(8,{(8,hhhh)},{(8,hhhhhhh)})
(9,{(9,iiii)},{(9,iiiiiii)})
(10,{},{(10,jjjjjjj)})
(11,{},{(11,kkkkkkk)})

If we DESCRIBE the new relation CGRP we get:

CGRP: {group: chararray,TEST1: {(id: chararray,value: chararray)},TEST2: {(id: chararray,value: chararray)}}

What is important to notice is that the second element of each tuple from the new relation is a bag of tuples from TEST1 containing the id value by which we cogrouped, and the third element of each tuple is a bag of tuples from TEST2 containing that same id value. These bags are empty if TEST1 or TEST2 do not contain a given id value. Based on this, we can perform the set operations I mentioned.

To perform set intersection (based on the id field), we only keep those tuples which have non-empty bags for both TEST and TEST2:


INTERSECT = FILTER CGRP BY NOT IsEmpty(TEST1) AND NOT IsEmpty(TEST2);
INTERSECT_ID = FOREACH INTERSECT GENERATE group AS id;   
DUMP INTERSECT_ID;

(7)
(8)
(9)


To perform the set difference TEST1 - TEST2 (based again on the id field), we keep only those tuples which have empty bags for TEST2 (which means those particular id values are in TEST1, but not in TEST2:

TEST1_MINUS_TEST2 = FILTER CGRP BY IsEmpty(TEST2);
TEST1_MINUS_TEST2_ID = FOREACH TEST1_MINUS_TEST2 GENERATE group AS id;
DUMP TEST1_MINUS_TEST2_ID;

(1)
(2)
(3)
(4)
(5)
(6)


The difference going the other way (TEST2 - TEST1) is similar. We keep only those tuples which have empty bags for TEST1:

TEST2_MINUS_TEST1 = FILTER CGRP BY IsEmpty(TEST1);
TEST2_MINUS_TEST1_ID = FOREACH TEST2_MINUS_TEST1 GENERATE group AS id;
DUMP TEST2_MINUS_TEST1_ID;

(10)
(11)


Note that if we wanted the set union based on the id field, we could simply generate the ‘group’ element of the CGRP relation:

UNION_ID = FOREACH CGRP GENERATE group AS id;
DUMP UNION_ID;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)


To perform the set intersection operation, we could also do a JOIN of TEST1 and TEST2 on the id field:

J = JOIN TEST1 BY id, TEST2 BY id;
DESCRIBE J;
DUMP J;

J: {TEST1::id: chararray,TEST1::value: chararray,TEST2::id: chararray,TEST2::value: chararray}

(7,gggg,7,ggggggg)
(8,hhhh,8,hhhhhhh)
(9,iiii,9,iiiiiii)


After the JOIN, we keep only the first field of the J relation (the id field):

J_ID = FOREACH J GENERATE $0;
DUMP J_ID;

(7)
(8)
(9)


To perform the set union operation, we could do a UNION of TEST1 and TEST2:

U = UNION TEST1, TEST2;
DESCRIBE U;
DUMP U;

U: {id: chararray,value: chararray}

(1,aaaa)
(2,bbbb)
(3,cccc)
(4,dddd)
(5,eeee)
(6,ffff)
(7,gggg)
(8,hhhh)
(9,iiii)
(7,ggggggg)
(8,hhhhhhh)
(9,iiiiiii)
(10,jjjjjjj)
(11,kkkkkkk)


However, note that the tuples containing common id values (7, 8 and 9) are duplicated at this point. So to generate the true set union, we need to keep only the distinct id values:

U_ID = FOREACH U GENERATE $0;
U_ID_DISTINCT = DISTINCT U_ID;
DUMP U_ID_DISTINCT;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)

More ‘real life’ set operation examples

The following examples slightly more realistic. At least they’re based on real data -- the GeoWordNet datasets. As stated in this “Background Knowledge Datasets” document:

“A geo-spatial ontology is an ontology consisting of geo-spatial classes (e.g. lake, city), entities (e.g., Lago di Molveno, Trento), their metadata (e.g. latitude and longitude coordinates) and relations between them (e.g., part-of, instance-of). GeoWordNet is a multilingual geo-spatial ontology built from the full integration of WordNet, GeoNames and the Italian part of MultiWordNet.”

The GeoWordNet dataset contains several CSV files which can be either imported in a relational database, or, in our case, loaded into Pig as relations:

concept = LOAD 's3://mybucket/geowordnet/concept.csv.gz' USING PigStorage(',') as (
 con_id: int,
 name: chararray,
 gloss:chararray,
 lang: chararray,
 provenance: chararray);

relation= LOAD 's3://mybucket/geowordnet/relation.csv.gz' USING PigStorage(',') as (
         src_con_id: int,
         trg_con_id: int,
         name: chararray,
         gloss:chararray,
         lang: chararray);

entity = LOAD 's3://mybucket/geowordnet/entity.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray,
         con_id: int,
         lang: chararray,
         latitude: chararray,
         longitude: chararray,
         provenance: chararray);

part_of = LOAD 's3://mybucket/geowordnet/part_of.csv.gz' USING PigStorage(',') as (
         src_entity_id: int,
         trg_entity_id: int);

alternative_name_eng = LOAD 's3://mybucket/geowordnet/alternative_name_eng.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);

alternative_name_ita = LOAD 's3://mybucket/geowordnet/alternative_name_ita.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);


Example 1

-- Find entities with both alternative english AND italian names
COGRP1 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
INTERSECT = FILTER COGRP1 BY NOT IsEmpty(alternative_name_eng) AND NOT IsEmpty(alternative_name_ita);
R1 = FOREACH INTERSECT GENERATE FLATTEN(alternative_name_eng), FLATTEN(alternative_name_ita);


Example 2

-- Find entities with alternative english names but with no alternative italian names
COGRP2 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
DIFF2 = FILTER COGRP2 BY IsEmpty(alternative_name_ita);
R2 = FOREACH DIFF2 GENERATE FLATTEN(alternative_name_eng);


Example 3

-- Find entities with alternative italian names but with no alternative english names
COGRP3 = COGROUP alternative_name_ita BY entity_id, alternative_name_eng BY entity_id;
DIFF3 = FILTER COGRP3 BY IsEmpty(alternative_name_eng);
R3 = FOREACH DIFF3 GENERATE FLATTEN(alternative_name_ita);

Example 4

-- Find entities with alternative english OR italian names
U = UNION alternative_name_eng, alternative_name_ita;
J = JOIN entity BY entity_id, U BY entity_id;
R4 = FOREACH J GENERATE entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude, U::name;

Example 5

-- Find entities with NO alternative english and NO italian names (by doing set difference)
COGRP5 = COGROUP entity BY entity_id, U BY entity_id;
DIFF5 = FILTER COGRP5 BY IsEmpty(U);
R5 = FOREACH DIFF5 GENERATE FLATTEN(entity);


Although not strictly set-operation-related, here are some more things you can find out from the GeoWordNet dataset by means of JOINs between the appropriate relations:

-- Find relations between concepts
J1 = JOIN concept BY con_id, relation BY src_con_id;
J2 = JOIN J1 by trg_con_id, concept by con_id;
R6 = FOREACH J2 GENERATE J1::concept::con_id, J1::concept::name, J1::concept::gloss, J1::concept::lang, J1::concept::provenance, J1::relation::src_con_id, J1::relation::trg_con_id, J1::relation::name, J1::relation::gloss, J1::relation::lang, concept::con_id, concept::name, concept::gloss, concept::lang, concept::provenance;

-- Find entities which are part of other entities
J3 = JOIN entity BY entity_id, part_of BY src_entity_id;
J4 = JOIN J3 by trg_entity_id, entity by entity_id;
R7 = FOREACH J4 GENERATE J3::entity::name, J3::entity::con_id, J3::entity::lang, J3::entity::latitude, J3::entity::longitude, 'is part of', entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude;


Thursday, February 09, 2012

Handling date/time in Apache Pig

A common usage scenario for Apache Pig is to analyze log files. Most log files contain a timestamp of some sort -- hence the need to handle dates and times in your Pig scripts. I'll present here a few techniques you can use.

Mail server logs

The first example I have is a Pig script which analyzes the time it takes for a mail server to send a message. The script is available here as a gist.

We start by registering the piggybank jar and defining the functions we'll need. I ran this using Elastic MapReduce, and all these functions are available in the piggybank that ships with EMR.

REGISTER file:/home/hadoop/lib/pig/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();             
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME();
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT();
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT();


Since the mail log timestamps don't contain the year, we declare a variable called YEAR which by default is set to the current year via the Unix 'date' command. The variable can also be set when the Pig script is called by running "pig -p YEAR=2011 mypigscript.pig".

%default YEAR `date +%Y`;

We read in the mail logs and extract the lines containing the source of a given message ('from' lines). An example of such a line:

Dec  2 15:13:52 mailserver1 sendmail[1882]: pB2KCqu1001882: from=<info@example.com>, size=9544, class=0, nrcpts=1, msgid=<201112022012.pB2KCqu1001882@mailserver1.example.com>, proto=ESMTP, daemon=MTA, relay=relay1.example.com [10.0.20.6]

To split the line into its various elements, we use the EXTRACT function and a complicated regular expression. Note that in Pig the backslash needs to be escaped:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
SRC = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+from=<([^>]+)>,\\s+size=(\\d+),\\s+class=(\\d+),\\s+nrcpts=(\\d+),\\s+msgid=<([^>]+)>.*relay=(\\S+)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 src: chararray,
 size: chararray,
 classnumber: chararray,
 nrcpts: chararray,
 msgid: chararray,
 relay: chararray
);

For this particular exercise we don't need all the fields of the SRC relation. We keep only a few:

T1 = FOREACH SRC GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp;
FILTER_T1 = FILTER T1 BY NOT sendmailid IS NULL;
DUMP FILTER_T1;

Note that we use the FORMAT function to generate a timestamp string out of the month, day and time fields, and we also add the YEAR variable. The FILTER_T1 relation contains tuples such as:

(pB2KDpaN007050,2011-Dec-2 15:13:52)
(pB2KDpaN007054,2011-Dec-2 15:13:53)
(pB2KDru1003569,2011-Dec-2 15:13:54)

We now use the DATE_TIME function which takes as input our generated timestamp and the date format string representing the timestamp ('yyyy-MMM-d HH:mm:ss'), and returns a DateTime string in Joda-Time format/ ISO 8601 format.

R1 = FOREACH FILTER_T1 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt;
DUMP R1;

The R1 relation contains tuples such as:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z)
(pB2KDru1003569,2011-12-02T15:13:54.000Z)

Note that the timestamp string "2011-Dec-2 15:13:52" got converted into a canonical ISO 8601 DateTime string "2011-12-02T15:13:52.000Z".

Now we can operate on the DateTime strings by using the ISOToUnix function, which takes a DateTime and returns the Unix epoch in milliseconds (which we divide by 1000 to obtain seconds):

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch1 = FOREACH R1 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 as epoch:long;
DUMP toEpoch1;

The toEpoch1 relation contains tuples of the form:

(pB2KDpaN007050,2011-12-02T15:13:52.000Z,1322838832)
(pB2KDpaN007054,2011-12-02T15:13:53.000Z,1322838833)
(pB2KDru1003569,2011-12-02T15:13:54.000Z,1322838834)

We now perform similar operations on lines containing destination email addresses:

DEST = FOREACH RAW_LOGS GENERATE                                                 
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+to=<([^>]+)>,\\s+delay=([^,]+),\\s+xdelay=([^,]+),.*relay=(\\S+)\\s+\\[\\S+\\],\\s+dsn=\\S+,\\s+stat=(.*)')
)
AS (
 month: chararray,
 day: chararray,
 time: chararray,
 mailserver: chararray,
 pid: chararray,
 sendmailid: chararray,
 dest: chararray,
 delay: chararray,
 xdelay: chararray,
 relay: chararray,
 stat: chararray
);


T2 = FOREACH DEST GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp, dest, stat;
FILTER_T2 = FILTER T2 BY NOT sendmailid IS NULL;

R2 = FOREACH FILTER_T2 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt, dest, stat;

-- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch2 = FOREACH R2 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 AS epoch:long, dest, stat;

At this point we have 2 relations, toEpoch1 and toEpoch2, which we can join by sendmailid:

R3 = JOIN toEpoch1 BY sendmailid, toEpoch2 BY sendmailid;

The relation R3 will contain tuples of the form

(sendmailid, datetime1, epoch1, sendmailid, datetime2, epoch2, dest, stat)

We generate another relation by keeping the sendmailid, the delta epoch2 - epoch1, the destination email and the status of the delivery. We also order by the epoch delta:

R4 = FOREACH R3 GENERATE $0, $5 - $2, $6, $7;
R5 = ORDER R4 BY $1 DESC;

R5 contains tuples such as:

(pB2KDqo5007488,2,user1@earthlink.net,Sent (1rwzuwyl3Nl36v0 Message accepted for delivery))
(pB2KDru1003560,1,user2@yahoo.com,Sent (ok dirdel))
(pB2KCrvm030964,0,user3@hotmail.com,Sent ( <201112022012.pB2KCrvm030964> Queued mail for delivery))

At this point we can see which email deliveries took longest, and try to identify patterns (maybe certain mail domains make it harder to deliver messages, or maybe email addresses are misspelled, etc).

Nginx logs

In the second example, I'll show how to do some date conversions on Nginx access log timestamps. The full Pig script is available here as a gist.

We parse the Nginx access log lines similarly to the mail log lines in the first example:

RAW_LOGS = LOAD '$INPUT' as (line:chararray);
LOGS_BASE = FOREACH RAW_LOGS GENERATE                                            
FLATTEN(                                                                         
 EXTRACT(line, '(\\S+) - - \\[([^\\[]+)\\]\\s+"([^"]+)"\\s+(\\d+)\\s+(\\d+)\\s+"([^"]+)"\\s+"([^"]+)"\\s+"([^"]+)"\\s+(\\S+)')
)
AS (
 ip: chararray,
 timestamp: chararray,
 url: chararray,
 status: chararray,
 bytes: chararray,
 referrer: chararray,
 useragent: chararray,
 xfwd: chararray,
 reqtime: chararray
);
DATE_URL = FOREACH LOGS_BASE GENERATE timestamp;
F = FILTER DATE_URL BY NOT timestamp IS NULL;

The timestamp is of the form "30/Sep/2011:00:10:02 -0700" so we use the appropriate DATE_TIME formatting string 'dd/MMM/yyyy:HH:mm:ss Z' to convert it to an ISO DateTime. Note that we need to specify the timezone with Z:

R1 = FOREACH F GENERATE timestamp, DATE_TIME(timestamp, 'dd/MMM/yyyy:HH:mm:ss Z') as dt;
DUMP R1;

R1 contains tuples of the form:

(30/Sep/2011:00:19:35 -0700,2011-09-30T00:19:35.000-07:00)
(30/Sep/2011:00:19:36 -0700,2011-09-30T00:19:36.000-07:00)
(30/Sep/2011:00:19:37 -0700,2011-09-30T00:19:37.000-07:00)

At this point, if we wanted to convert from DateTime to Unix epoch in seconds, we could use ISOToUnix like we did for the mail logs:

toEpoch = FOREACH R1 GENERATE dt, ISOToUnix(dt) / 1000 as epoch:long;

However, let's use another function called FORMAT_DT to convert from the above DateTime format to another format of the type 'MM/dd/yyyy HH:mm:ss Z'. The first argument to FORMAT_DT is the desired format for the date/time, and the second argument is the original DateTime format:

FDT = FOREACH R1 GENERATE FORMAT_DT('MM/dd/yyyy HH:mm:ss Z', dt) as fdt;
DUMP FDT;

The FDT relation now contains tuples such as:

(09/30/2011 00:19:35 -0700)
(09/30/2011 00:19:36 -0700)
(09/30/2011 00:19:37 -0700)

We can now use a handy function called CustomFormatToISO to convert from any custom date/time format (such as the one we generated in FDT) back to a canonical ISO DateTime format:

toISO = FOREACH FDT GENERATE fdt, CustomFormatToISO(fdt, 'MM/dd/yyyy HH:mm:ss Z');
DUMP toISO;

(09/30/2011 00:19:35 -0700,2011-09-30T07:19:35.000Z)
(09/30/2011 00:19:36 -0700,2011-09-30T07:19:36.000Z)
(09/30/2011 00:19:37 -0700,2011-09-30T07:19:37.000Z)

Note how the custom DateTime string "09/30/2011 00:19:35 -0700" got transformed into the canonical ISO DateTime string "2011-09-30T07:19:35.000Z".

Converting Unix epoch to DateTime

Some log files have timestamps in Unix epoch format. If you want to transform them into DateTime, you can use the UnixToISO function:

DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();

Here is an input file:

$ cat unixtime.txt
1320777563
1320777763
1320779563
1320787563

And here is a Pig script which converts the epoch into DateTime strings. Note that UnixToISO expects the epoch in milliseconds, and our input is in seconds, so we have to multiply each input value by 1000 to get to milliseconds:

UNIXTIMES = LOAD 's3://mybucket.com/unixtime.txt' as (unixtime:long);
D = FOREACH UNIXTIMES GENERATE UnixToISO(unixtime * 1000);
DUMP D;

(2011-11-08T18:39:23.000Z)
(2011-11-08T18:42:43.000Z)
(2011-11-08T19:12:43.000Z)
(2011-11-08T21:26:03.000Z)