Refactoring a Hadoop Job with Cascading

This post covers an approach to quickly and efficiently convert an existing Hadoop MapReduce job to a Cascading flow, with a focus on leveraging existing test cases and preserving backward compatibility where ever possible.

Areas of Change

With the assumption that the MapReduce job follows the example of existing documentation and books on Hadoop, we need to re-factor the job driver code in specific ways to convert to a Cascading flow.  We must re-factor the following areas out of the job driver and into a separate class:

  1. The job driver typically handle preconditions and validation of external dependencies (such as DistributedCache objects, etc.)
  2. The creation of the JobConf typically occurs in the job driver class, this conflicts with Cascading’s separation of the job configuration logic from the job execution logic
  3. The job driver typically handles post conditions, however we need to use Cascading’s FlowListener interface to handle this
  4. The job driver may handle deleting the job output directory, while Cascading handles this automatically

Approach

The overall approach below preserves as much backward compatibility as possible while leveraging existing unit tests.

  1. Create a JobHelper class to provide separation of concerns required by Cascading.  The JobHelper plays the same role as the View Helper in a MVC application, as it provides clean separation between the view (job execution context) and the model (job business logic).
  2. Move all preconditions, post-conditions, and job configuration to the JobHelper.
  3. Re-test the existing job driver with the re-factored code and existing unit test cases.  When you are sure that the existing job driver works with the JobHelper, then you are ready to move to Cascading.
  4. Implement cascading.flow.FlowListener and call the appropriate JobHelper pre and post-condition methods in the various event handlers provided (I named the methods the same so this should be obvious!).
  5. Create a new unit test extending CascadingTestCase.
    1. Create a new Cascading MapReduceFlow, passing the JobConf from the JobHelper.  Use the deleteSinkOnInit constructor to indicate if you want to delete the output directory automatically.
    2. Add your FlowListener implementation to the MapReduceFlow using the addListener method.
    3. Call Flow.complete() to execute the flow.
  6. Optionally, re-factor the existing job driver to execute the Cascading flow.

You should now have two different versions of the same job, one using a job driver and one using Cascading.  Also, you can still call the existing job driver code from the command line as before, as well as having the flexibility to add the job to a Cascading flow!

Code Example

Here’s an example interface for a JobHelper:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

/**
 * This interface provides a bridge between a Hadoop job driver and Cascading.
 * Serves a role very similar to ViewHelper in MVC.
 * @author mpouttuclarke
 *
 */
@SuppressWarnings("deprecation")
public interface JobHelper {
 /**
 * Logic to execute to initialize the job flow.  Should create the JobConf
 * here for later use, but may also create DistributedCache, etc.
 * @param config
 * @param parms
 * @throws IOException
 */
 public void initialize(Configuration config, Object...parms) throws IOException;
 /**
 * Logic to execute after job initialization but before job execution.
 * @throws IOException
 */
 public void onStarting() throws IOException;
 /**
 * Gets the JobConf created created earlier, used to run the job in
 * either raw Hadoop or Cascading.
 * @return
 */
 public JobConf getJobConf();
 /**
 * Logic to execute if the job is stopped.
 * @throws IOException
 */
 public void onStopping() throws IOException;
 /**
 * Logic to execute if the job completes.
 * @throws IOException
 */
 public void onCompleted() throws IOException;
 /**
 * What to do if a fatal error occurs in the job.
 * @param error
 */
 public boolean onThrowable(Throwable throwable);
}

References

Here’s an example of how to run a raw MapReduce job in a Cascade as some background. If we added a JobHelper and FlowListener to this example we could remove all the JobConf code out of the job driver and make any M/R job a first class participant in a Cascade.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s