Refactoring a Hadoop Job with Cascading

This post covers an approach to quickly and efficiently convert an existing Hadoop MapReduce job to a Cascading flow, with a focus on leveraging existing test cases and preserving backward compatibility where ever possible.

Areas of Change

With the assumption that the MapReduce job follows the example of existing documentation and books on Hadoop, we need to re-factor the job driver code in specific ways to convert to a Cascading flow.  We must re-factor the following areas out of the job driver and into a separate class:

  1. The job driver typically handle preconditions and validation of external dependencies (such as DistributedCache objects, etc.)
  2. The creation of the JobConf typically occurs in the job driver class, this conflicts with Cascading’s separation of the job configuration logic from the job execution logic
  3. The job driver typically handles post conditions, however we need to use Cascading’s FlowListener interface to handle this
  4. The job driver may handle deleting the job output directory, while Cascading handles this automatically

Approach

The overall approach below preserves as much backward compatibility as possible while leveraging existing unit tests.

  1. Create a JobHelper class to provide separation of concerns required by Cascading.  The JobHelper plays the same role as the View Helper in a MVC application, as it provides clean separation between the view (job execution context) and the model (job business logic).
  2. Move all preconditions, post-conditions, and job configuration to the JobHelper.
  3. Re-test the existing job driver with the re-factored code and existing unit test cases.  When you are sure that the existing job driver works with the JobHelper, then you are ready to move to Cascading.
  4. Implement cascading.flow.FlowListener and call the appropriate JobHelper pre and post-condition methods in the various event handlers provided (I named the methods the same so this should be obvious!).
  5. Create a new unit test extending CascadingTestCase.
    1. Create a new Cascading MapReduceFlow, passing the JobConf from the JobHelper.  Use the deleteSinkOnInit constructor to indicate if you want to delete the output directory automatically.
    2. Add your FlowListener implementation to the MapReduceFlow using the addListener method.
    3. Call Flow.complete() to execute the flow.
  6. Optionally, re-factor the existing job driver to execute the Cascading flow.

You should now have two different versions of the same job, one using a job driver and one using Cascading.  Also, you can still call the existing job driver code from the command line as before, as well as having the flexibility to add the job to a Cascading flow!

Code Example

Here’s an example interface for a JobHelper:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

/**
 * This interface provides a bridge between a Hadoop job driver and Cascading.
 * Serves a role very similar to ViewHelper in MVC.
 * @author mpouttuclarke
 *
 */
@SuppressWarnings("deprecation")
public interface JobHelper {
 /**
 * Logic to execute to initialize the job flow.  Should create the JobConf
 * here for later use, but may also create DistributedCache, etc.
 * @param config
 * @param parms
 * @throws IOException
 */
 public void initialize(Configuration config, Object...parms) throws IOException;
 /**
 * Logic to execute after job initialization but before job execution.
 * @throws IOException
 */
 public void onStarting() throws IOException;
 /**
 * Gets the JobConf created created earlier, used to run the job in
 * either raw Hadoop or Cascading.
 * @return
 */
 public JobConf getJobConf();
 /**
 * Logic to execute if the job is stopped.
 * @throws IOException
 */
 public void onStopping() throws IOException;
 /**
 * Logic to execute if the job completes.
 * @throws IOException
 */
 public void onCompleted() throws IOException;
 /**
 * What to do if a fatal error occurs in the job.
 * @param error
 */
 public boolean onThrowable(Throwable throwable);
}

References

Here’s an example of how to run a raw MapReduce job in a Cascade as some background. If we added a JobHelper and FlowListener to this example we could remove all the JobConf code out of the job driver and make any M/R job a first class participant in a Cascade.

Notes from Hadoop World 2010 NYC

Recently attended Hadoop World 2010, NYC. Here are my thoughts on the presentations I attended:

Keynote
Tim O’Riley
Tim covered some of the roles Hadoop can play as part of the Internet Operating System, and how data can be correlated across many channels to yield unexpected information. For example, utilizing log analytics to develop a smart phone that can recognize it’s owner by the way they move (motion sensors) and use the phone, or how analysis of power grid spikes can tell companies which appliances you have in your home and when you use them. Essentially he showed how Hadoop is helping herald in an era where logs correlated across multiple channels yields an unprecedented amount of detailed information about individuals and their behavior.

Hadoop Analytics: More Methods, Less Madness
Shevek Mankin, Karmasphere
Shevek presented essentially the same information you can get on the web about his products.  However, in a followup conversation with him I asked him about compatibility with with the .21 Hadoop API and he replied that they are writing the code now.
Not sure when the release date is… following up to see if anything has changed with recent releases after HWNYC.

Making Hadoop Security Work in Your IT Environment
Todd Lipcon, Cloudera
Todd covered how to implement security in Hadoop. Eagerly awaiting the video feed on this one. Main highlights include the addition of strong authentication throughout the framework components and ACLs for Job Queues. Todd also details some tricks and gotchas on integrating Hadoop Security with Active Directory. Hadoop uses Kerberos so the integration flows more smoothly than one might expect!

Hadoop: Best Practices and Real Experience Going From 5 to 500 Nodes
Phil Day, HP
Phil described the challenges and learning experiences HP has had in deploying large Hadoop clusters on time and on budget. Of particular interest was the ability of HP to build and test whole racks of servers at the factory and then ship them and plug them in at the data center (HP Factory Express). Normally I don’t appreciate vendor tag lines but this one is extremely relevant given the issues a small deviation in configuration can cause in a cluster that large! Phil also noted that since the recovery features in Hadoop are so advanced: a failed job can continue on and on and produce reams of logs before complete failure occurs. This emphasizes the need for a management console able to sift through reams of logs to isolate the original failure root cause. Also of interest was some of the build details he had for the data-intensive cluster they designed:

  • The use of 1Gb network cards for the data nodes to save costs, but retaining 10Gb cards for the network intensive components such as the name node
  • Using 6 local drives for each Intel Xeon quad core to increase data throughput
  • Using what he called Edge Nodes to control access to the cluster

HBase in Production at Facebook
Jonathan Grey, Facebook
Brilliant presentation by Jonathan Grey about how Facebook is using HBase. Particularly of interest was using the counters feature in HBase to track real time statistics from incoming logs. Also, that Facebook is using HBase in the online applications as well as the batch applications was news to me. Apparently some parts of the FB site use HBase directly, which is a testament to it’s availability and performance characteristics.
In the batch world, they are loading into HBase tables from the web logs via cascading and then fronting the HBase tables with Hive. It was great to hear talks from these guys (esp, Yahoo, Facebook, and StumbleUpon) about how they are using these tools together. Even though they are all open source it is not always obvious how to use the tools together in a real world architecture. Bravo to all of you guys for being so open about real world usage!
Also of note:

  • Facebook will be distributing it’s own version of Hadoop via Github
  • They are running HA for Name Node via an Avatar Node
  • They are using 16 drives at 2 TB each per data node

ZooKeeper in Online Systems, Feed Processing, and Cluster Management
Mahadev Konar, Yahoo!
Very interesting presentation from Mahadev about how Yahoo! is using ZooKeeper for command, control, and meta-data during feed processing. Mainly the most interesting part was how they are using BitTorrent for data feeds (I wonder which client they are using, maybe Vuze). The torrent meta-data is published on ZooKeeper and subscriber nodes process the feed and update status on ZooKeeper as well. ZooKeeper is also used for heartbeats and coordinating feed re-processing logic in case a node dies. Also of note was Yahoo!’s use of BookKeeper for ultra high performance durable publish and subscribe (bye bye message queues). And that Yahoo! recommends a 5 node ZooKeeper cluster but has seen installations up to 12 nodes. ZooKeeper is Hadoop’s diamond in the rough =)

Cloudera Roadmap Review
Charles Zedlewski, Cloudera
Charles went through all the cool management features being added to Cloudera Enterprise. Of course he emphasized that this does not mean the free version is dumbed down, just does not include all the management stuff for big clusters (see Phil Day’s presentation on the importance of management software for large clusters).

Apache Hadoop in the Enterprise
Arun Murthy, Yahoo!
This presentation was HUGE! (that’s the running joke because he said HUGE a lot)
Anyway on a serious note Yahoo! did put in a huge effort to productionize Hadoop 0.20 by introducing 4400 patches. Totally awesome for the community. On top of adding security and the Capacity Scheduler, very cool stuff. Of note also is the upcoming addition of Name Node Federation where a name node is mapped to a HDFS path so you can run as many name nodes as you wish and a failure means only that path is down.
He had a funny comment about using HDFS ls (directory list) on a 4000 node cluster can take down the name node. They use ZooKeeper to check if a file exists instead. Just one of those nice to knows =)
Also, I had an offline conversation about adding CPU and I/O caps to jobs in the Capacity Scheduler (infinite loops in Hadoop jobs can be nasty). I’ll be sending them a followup email soon because they had some other tricks they mentioned for implementing CPU caps, whether or not they actually end up adding it to the Capacity Scheduler.

Mixing Real-Time Needs and Batch Processing: How StumbleUpon Built an Advertising Platform using HBase and Hadoop
Jean-Daniel Cryans, StumbleUpon
StumbleUpon is using a vary similar architecture to Facebook, with the difference that HBase really seems to be at the core of StumbleUpon’s online presence. With an 11 million user base this is another very significant testimony to the availability and performance of HBase. They are using the atomic increment functionality in HBase to track users in real time from web logs (similar to Facebook). They also created opentsdb, based on HBase, to help monitor their environment. Also of note: they are using cascading to join HBase to native Hadoop formats such as sequence files and MapFiles.

Putting Analytics in Big Data Analysis
Richard Daley, Pentaho
Apparently Richard had to bow out at the last minute, but the guy they had presenting for him did a fairly good job of getting into the nitty grits of Pentaho for Hadoop. I have to say that the integration was so smooth: and this was a real time demonstration that this guy had to throw together in an hour (that actually worked!). Good litmus test for the tool: I ended up wishing I had this kind of environment to build, run, and monitor Hadoop jobs. Don’t get me wrong, I love the command line but I do not like busy work. So, very excited to get my hands on that software some time soon.

So, fantastic experience to learn from the people who hit the wall on big data years ago so we don’t have to!

Inverse REST

The principles of REST allow HTTP to scale to Internet user volumes, and code-on-demand provides one of the building blocks of REST.  Code-on-demand allows rich content on the browser via JavaScript or browser plug-ins, and this technique has matured so much that it requires minimal server interaction to run even the most sophisticated applications.

In short, code-on-demand enables scaling across across users by moving the code (logic) to the consumer, instead of requiring the consumer to make a remote request to the code.

It follows logically that any successful approach to scaling across big data requires inverting REST and executing the code as close to the data as possible, rather than trying to move big data to the code.

Likewise, the web architecture scales across users by utilizing caching to provide data locality for shared data.  Scaling across big data also requires data locality for reference data. We must move our small data close to our big data to scale efficiently.

The major flaw designed into many current big data applications involves the failure to utilize the inverse of REST: SOA and integration vendors sell customers on the idea that big data problems can be solved by moving all data through a layer of middle-ware: whether this be a J2EE application server, a service bus, or an integration tool.  I have literally spent years of my career trying to tune and optimize middle-ware solutions for big data.  That’s why I can say definitively that the middle-ware concept does very well at selling a lot of consulting hours, software licenses, and hardware.  What it does not do is scale to big data.

You could code all the big data logic in stored procedures, assuming willingness to embed business logic into a closed system, and assuming that a database will scale to your data volumes.  Database vendors are only beginning to utilize Inverse REST: evaluating filters , transformations, and lookups in the storage pipeline is a new (or non-existent) feature in most DBMS systems.  Yet another opportunity for vendor lock-in.

Hadoop Map Reduce follows an open system implementation of inverse REST.

Regardless of who wins that battle between RDBMS and Map/Reduce one thing is certain: anyone not leveraging the principles of Inverse REST will be left in the dust.

Google, Yahoo, Facebook, StumbleUpon, and others have already hit the wall, and it’s only a matter of time before we all do.

Think outside received beliefs