Wednesday, July 27, 2011

Processing mail logs with Elastic MapReduce and Pig

These are some notes I took while trying out Elastic MapReduce (EMR), and more specifically its Pig functionality, by processing sendmail mail logs. A big help was Eric Lubow's blog post on EMR and Pig. Before I go into  details, here's my general processing flow:

  • N mail servers (running sendmail) send their mail logs to a central server running syslog-ng.
  • A process running on the central logging server tails the aggregated mail log (at 5 minute intervals), parses the lines it finds, extracts relevant information from each line, and saves the output in JSON format to a local file (actually there are 2 types of files generated, one for sender information and one for recipient information, corresponding to the 'from' and 'to' lines in the mail log -- see below)
  • Another process compresses the generated files in bzip2 format and uploads them to S3.
I have 2 sets of files, one set with names similar to "from-2011-07-12-20-58" and containing JSON records of the following form, one per line:

{"nrcpts": "1", "src": "info@example.com", "sendmailid": "p6D0r0u1006229", "relay": "app03.example.com", "classnumber": "0", "msgid": "WARQZCXAEMSSVWPPOOYZXRLQIKMFUY.155763@example.com", "
pid": "6229", "month": "Jul", "time": "20:53:00", "day": "12", "mailserver": "mail5", "size": "57395"}

The second set contains files with names similar to "to-2011-07-12-20-58" and containing JSON records of the following form, one per line:

{"sendmailid": "p6D0qwvm006395", "relay": "gmail-smtp-in.l.google.com.", "dest": "somebody@gmail.com", "pid": "6406", "stat": "Sent (OK 1310518380 pd12si6025606vcb.162)", "month": "Jul", "delay": "00:00:02", "time": "20:53:00", "xdelay": "00:00:02", "day": "12", "mailserver": "mail2"}

For the initial EMR/Pig setup, I followed "Parsing Logs with Apache Pig and Elastic MapReduce". It's fairly simple to end up with an EC2 instance running Hadoop and Pig that you can play with.

I then ssh-ed into the EMR master instance (note that it was still shown in 'Waiting' state in the EMR console, but once it got assigned an IP and internal name I was able to ssh into it).

In order for Pig to be able to process input in JSON format, you need to use Kevin Weil's elephant-bird library. I followed Eric Lubow's post to get that set up:

$ mkdir git && mkdir pig-jars
$ cd git && wget --no-check-certificate https://github.com/kevinweil/elephant-bird/tarball/eb1.2.1_with_jsonloader
$ tar xvfz eb1.2.1_with_jsonloader
$ cd kevinweil-elephant-bird-ecf8356/
$ cp lib/google-collect-1.0.jar ~/pig-jars/
$ cp lib/json-simple-1.1.jar ~/pig-jars/
$ ant nonothing
$ cd build/classes/
$ jar -cf ../elephant-bird-1.2.1-SNAPSHOT.jar com
$ cp ../elephant-bird-1.2.1-SNAPSHOT.jar ~/pig-jars/

I then copied 3 elephant-bird jar files to S3 so I can register them every time I run Pig. I did that via the grunt command prompt:

$ pig -x local

grunt> cp file:///home/hadoop/pig-jars/google-collect-1.0.jar s3://MY_S3_BUCKET/jars/pig/
grunt> cp file:///home/hadoop/pig-jars/json-simple-1.1.jar s3://MY_S3_BUCKET/jars/pig/                                               
grunt> cp file:///home/hadoop/pig-jars/elephant-bird-1.2.1-SNAPSHOT.jar s3://MY_S3_BUCKET/jars/pig/         
At this point, I was ready to process some of the files I uploaded to S3. 
I first tried processing a single file, using Pig's local mode (which doesn't involve HDFS). It turns out that Pig doesn't load compressed files correctly via elephant-bird when you run in local mode, so I tested this on an uncompressed file previously uploaded to S3:
$ pig -x local

grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/google-collect-1.0.jar;                                                                   
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/json-simple-1.1.jar;
grunt> REGISTER s3://MY_S3_BUCKET/jars/pig/elephant-bird-1.2.1-SNAPSHOT.jar;grunt> json = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-12/to-2011-07-12-16-49' USING com.twitter.elephantbird.pig.load.JsonLoader();
Note that I used the JSON loader from the elephant-bird JAR file.
I wanted to know the top 3 mail servers from the file I loaded (this is again heavily inspired by Eric Lubow's example in his blog post):
grunt> mailservers = FOREACH json GENERATE $0#'mailserver' AS mailserver;
grunt> mailserver_count = FOREACH (GROUP mailservers BY $0) GENERATE $0, COUNT($1) AS cnt;
grunt> mailserver_sorted_count = LIMIT(ORDER mailserver_count BY cnt DESC) 3;
grunt> DUMP mailserver_sorted_count;

I won't go into detail as far as the actual Pig operations I ran -- I recommend going through some Pig Latin tutorials or buying the O'Reilly 'Programming Pig' book. Suffice to say that I extracted the 'mailserver' JSON field, then I grouped the records by mail server and counted how many there are in each group. Finally, I dumped the 3 top mail servers found.

Here's a slightly more interesting exercise: finding out the top 10 mail recipients by looking at all the to-* files uploaded to S3 (still uncompressed in this case):

grunt> to = LOAD 's3://MY_S3_BUCKET/mail_logs/2011-07-13/to*' USING com.twitter.elephantbird.pig.load.JsonLoader();
grunt> to_emails = FOREACH to GENERATE $0#'dest' AS dest;                                                                      
grunt> to_count = FOREACH (GROUP to_emails BY $0) GENERATE $0, COUNT($1) AS cnt;                                              
grunt> to_sorted_count = LIMIT(ORDER to_count BY cnt DESC) 10;                                                                                  
grunt> DUMP to_sorted_count;


I tried the same processing steps on bzip2-compressed files using Pig's Hadoop mode (which you invoke by just running 'pig' and not 'pig -x local'). The files were loaded correctly this time, but the MapReduce phase failed with messages similar to this in  /mnt/var/log/apps/pig.log:



Pig Stack Trace
---------------
ERROR 6015: During execution, encountered a Hadoop error.
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias to_sorted_count
at org.apache.pig.PigServer.openIterator(PigServer.java:482)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:546)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:241)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:75)
at org.apache.pig.Main.main(Main.java:374)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 6015: During execution, encountered a Hadoop error.
at .apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at .apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:474)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:109)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:255)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:244)
at .apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:94)
at .apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at .apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:363)
at .apache.hadoop.mapred.MapTask.run(MapTask.java:312)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.pig.impl.io.NullableBytesWritable, recieved org.apache.pig.impl.io.NullableText
... 9 more

A quick Google search revealed JIRA Pig ticket #919 which offered a workaround. Basically this happens when a value coming out of a map is used in a group/cogroup/join. By default the type of that value is bytearray, and you need to cast it to chararray to make things work (I confess I didn't dig too much into the nitty-gritty of this issue yet, I was just happy I made it work).

So what I had to do was to modify a single line and cast the value used in the GROUP BY clause to chararray:

grunt> to_count = FOREACH (GROUP to_emails BY (chararray)$0) GENERATE $0, COUNT($1) AS cnt;  

At this point, I was able to watch Elastic MapReduce in action, slower than in local mode becase I only had 1 m1.small instance. I'll try it next with several instances and hopefully see a near-linear improvement.

That's it for now. This was just a toy example, but it got me started with EMR and Pig. Hopefully I'll follow up with more interesting log processing and analysis.

Friday, July 22, 2011

Results of a survey of the SoCal Piggies group

My colleague Warren Runk had the idea of putting together a survey to be sent to the mailing list of the SoCal Python Interest Group (aka SoCal Piggies), with the purpose of finding out which topics or activities would be most interesting to the members of the group in terms of future meetings. We had 10 topics in the survey, and people responded by choosing their top 5. We also had free-form response fields for 2 questions: "What do you like most about the meetings?" and "What meeting improvements are most important to you?".

We had 26 responses. Here are the votes results for the 10 topics we proposed:

#1 (18 votes): "Good practice, pitfall avoidance, and module introductions for beginners"


#2 (17 votes): "5 minute lightning talks"

#3 - #4 (15 votes): "Excellent code examples from established Python projects"
and "New and upcoming Python open source projects"


#5 (14 votes): "30 minute presentations"


#6 (13 votes): "Ice breakers/new member introductions"


#7 (12 votes): "Algorithm discussions and dissections"


#8 (11 votes): "Good testing practices and pointers to new methods/tools"


#9 (10 votes): "Moderated relevant/cutting edge general tech discussions"


#10 (9 votes): "Short small group programming exercises"

It's pretty clear that people are interested most of all in good Python programming practices and practical examples of 'excellent' code from established projects. Presentations are popular too, with lightning talks edging the longer 30-minute talks. A pretty good percentage of the people attending our meetings are beginners, so we're going to try to focus on making our meetings more beginner-friendly.

As far as what people like most about the meetings, here are a few things:
  • "I love hearing about how Python is being used in multiple locations throughout large corporations.  It helps me to promote Python at every opportunity when I can say that Python is being used at Acme Corp for XYZ!"
  • "High level introductions to Python modules. Often this is not the main thrust of a  talk, but the speaker chose some module for a given task and that helps me expand my horizon."
  • "Becoming aware of how various companies use python, which libraries and tools are used most often, the opportunity to connect with members during breaks."
  • "I like being exposed to things I don't normally see at work of if I've seen them I get to see them from a different angle.  "
  • "I don't have other geeks at my office so I like having the chance to hang out and get to know other Python programmers."
  • ...and many people expressed their satisfaction in seeing Raymod Hettinger's presentation at Disney Animation Studios (thanks to Paul Hildebrandt for putting that together!)
Here's what people said when asked about possible improvements:
  • "More best practices and module intros."
  • "Keep the meetings loose, don't have too many controls. "
  • "In addition to the aptly proposed "ice-breakers / introductions" how can we current members more-actively welcome beginners?"
  • "Time (and some format) to discuss the issues brought up in the talks. Sometimes I think it'd be useful for the group to get more directly involved in vetting/providing critique for some of the decisions a speaker made. Controversial points made in talks are great, but sometimes I think everyone might benefit from a few other perspectives."
  • "Friendlier onboarding of new members would be great."
  • "Keeping the total noobs in mind"
  • "I would like introductions. I have met a couple people at each of the meetings that I have attended, but I would also like to know who else is there."
  • "I would like the opportunity to meet resourceful programmers and learn techniques and abilities that I can't pick up from youtube or online tutorials!"
  • "I think we should try to come up with and stick with a consistent format. I like the discussion-style presentation so long as it does not detract from the topic at hand. I think we need to make sure that people stick with shorter presentations, so that there is plenty of time for Q&A without the risk of running on too long.  30 minutes should really be 30 minutes! "
  • "It would be good to identify the difficulty/skill level of a presentation ahead of time so that beginners are not scared off or at least know what they're getting into. Perhaps we could try to always mix it up by warming up with a beginner/intermediate preso and follow up with an intermediate/advanced."
I think you can see a theme here -- friendliness and attention towards beginners is a wish that many people have. I believe in the past we tended to ignore this side in our meetings, so we definitely need to do a better job at it.

We had a meeting last night where we discussed some of these topics. We tried to appoint point persons for given topics. These persons would be responsible for doing research on that topic (for example 'New and upcoming Python open source projects') and give a short presentation to the group at every meeting, while also looking for other group members to delegate this responsibility to in the future. I think this 'lieutenant' system will work well, but time will tell. My personal observation from the 7 years I've been organizing this group is that the hardest part is to get people to volunteer in any capacity, and most of all in presenting to the group. But this infusion of new ideas is very welcome, and I hope it will invigorate the participation in our group.

I hope the results of this survey and the feedback we got will be useful to other Python user groups out there.

I want to thank Warren Runk and Danny Greenfeld for their feedback, ideas and participation in making the SoCal Piggies Group better.

Wednesday, July 20, 2011

Accessing the data center from the cloud with OpenVPN

This post was inspired by a recent exercise I went through at the prompting of my colleague Dan Mesh. The goal was to have Amazon EC2 instances connect securely to servers at a data center using OpenVPN.

In this scenario, we have a server within the data center running OpenVPN in server mode. The server has a publicly accessible IP (via a firewall NAT) with port 1194 exposed via UDP. Cloud instances which run OpenVPN in client mode are connecting to the server, get a route pushed to them to an internal network within the data center, and are then able to access servers on that internal network over a VPN tunnel.

Here are some concrete details about the network topology that I'm going to discuss.

Server A at the data center has an internal IP address of 10.10.10.10 and is part of the internal network 10.10.10.0/24. There is a NAT on the firewall mapping external IP X.Y.Z.W to the internal IP of server A. There is also a rule that allows UDP traffic on port 1194 to X.Y.Z.W.

I have an EC2 instance from which I want to reach server B on the internal data center network, with IP 10.10.10.20.

Install and configure OpenVPN on server A

Since server A is running Ubuntu (10.04 to be exact), I used this very good guide, with an important exception: I didn't want to configure the server in bridging mode, I preferred the simpler tunneling mode. In bridging mode, the internal network which server A is part of (10.10.10.0/24 in my case) is directly exposed to OpenVPN clients. In tunneling mode, there is a tunnel created between clients and server A on a separated dedicated network. I preferred the tunneling option because it doesn't require any modifications to the network setup of server A (no bridging interface required), and because it provides better security for my requirements (I can target individual servers on the internal network and configure them to be accessed via VPN). YMMV of course.

For the initial installation and key creation for OpenVPN, I followed the guide. When it came to configuring the OpenVPN server, I created these entries in /etc/openvpn/server.conf:

server 172.16.0.0 255.255.255.0
push "route 10.10.10.0 255.255.255.0"
tls-auth ta.key 0 

The first directive specifies that the OpenVPN tunnel will be established on a new 172.16.0.0/24 network. The server will get the IP 172.16.0.1, while OpenVPN clients that connect to the server will get 172.16.0.6 etc.

The second directive pushes a static route to the internal data center network 10.10.10.0/24 to all connected OpenVPN clients. This way each client will know how to get to machines on that internal network, without the need to create static routes manually on the client.

The tls_auth entry provides extra security to help prevent DoS attacks and UDP port flooding.

Note that I didn't have to include any bridging-related scripts or other information in server.conf.

At this point, if you start the OpenVPN service on server A via 'service openvpn start', you should see an extra tun0 network interface when you run ifconfig. Something like this:


tun0      Link encap:UNSPEC  HWaddr 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00  
          inet addr:172.16.0.1  P-t-P:172.16.0.2  Mask:255.255.255.255
          UP POINTOPOINT RUNNING NOARP MULTICAST  MTU:1500  Metric:1
          RX packets:2 errors:0 dropped:0 overruns:0 frame:0
          TX packets:2 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:100 
          RX bytes:168 (168.0 B)  TX bytes:168 (168.0 B)

Also, the routing information will now include the 172.16.0.0 network:

# netstat -rn
Kernel IP routing table
Destination     Gateway         Genmask         Flags   MSS Window  irtt Iface
172.16.0.2      0.0.0.0         255.255.255.255 UH        0 0          0 tun0
172.16.0.0      172.16.0.2      255.255.255.0   UG        0 0          0 tun0
...etc

Install and configure OpenVPN on clients

Here again I followed the Ubuntu OpenVPN guide. The steps are very simple:

1) apt-get install openvpn

2) scp the following files (which were created on the server during the OpenVPN server install process above) from server A to the client, into the /etc/openvpn directory: 

ca.crt
ta.key
client_hostname.crt 
client_hostname.key


3) Customize client.conf:

# cp /usr/share/doc/openvpn/examples/sample-config-files/client.conf /etc/openvpn

Edit client.conf and specify:

remote X.Y.Z.W 1194     (where X.Y.Z.W is the external IP of server A)

cert client_hostname.crt
key client_hostname.key
tls-auth ta.key 1

Now if you start the OpenVPN service on the client via 'service openvpn start', you should see a tun0 interface when you run ifconfig:


tun0      Link encap:UNSPEC  HWaddr 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00  
          inet addr:172.16.0.6  P-t-P:172.16.0.5  Mask:255.255.255.255
          UP POINTOPOINT RUNNING NOARP MULTICAST  MTU:1500  Metric:1
          RX packets:2 errors:0 dropped:0 overruns:0 frame:0
          TX packets:2 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:100 
          RX bytes:168 (168.0 B)  TX bytes:168 (168.0 B)

You should also see routing information related to both the tunneling network 172.16.0.0/24 and to the internal data center network 10.10.10.0/0 (which was pushed from the server):

# netstat -rn
Kernel IP routing table
Destination     Gateway         Genmask         Flags   MSS Window  irtt Iface
172.16.0.5      0.0.0.0         255.255.255.255 UH        0 0          0 tun0
172.16.0.1      172.16.0.5      255.255.255.255 UGH       0 0          0 tun0
10.0.10.0       172.16.0.5      255.255.255.0   UG        0 0          0 tun0
....etc

At this point, the client and server A should be able to ping each other on their 172.16 IP addresses. From the client you should be able to ping server A's IP 172.16.0.1, and from server A you should be able to ping the client's IP 172.16.0.6.

Create static route to tunneling network on server B and enable IP forwarding on server A

Remember that the goal was for the client to access server B on the internal data center network, with IP address 10.10.10.20. For this to happen, I needed to add a static route on server B to the tunneling network 172.16.0.0/24, with server A's IP 10.10.10.10 as the gateway:

# route add -net 172.16.0.0/24 gw 10.10.10.10

The final piece of the puzzle is to allow server A to act as a router at this point, by enabling IP forwarding (which is disabled by default). So on server A I did:

# sysctl -w net.ipv4.ip_forward=1
# echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf

At this point, I was able to access server B from the client by using server B's 10.10.10.20 IP address.

We've just started to experiment with this setup, so I'm not yet sure if it's production ready. I wanted to jot down these things though because they weren't necessarily obvious, despite some decent blog posts and OpenVPN documentation. Hopefully they'll help somebody else out there too.