Cascading TapsMap and SubAssembly: Best Practices

Cascading Pipes provide a lot of power to the experienced user due to all the things they take care of automatically; however, the down side of this is that we can easily do things the hard way without knowing it. This can effect the performance, maintainability, re-usability of flows and can trigger the need for significant re-factoring later on. One best practice to look at is how to use the taps map and SubAssembly together to provide separation of concerns between the taps (storing the data) and the pipes (processing the data). Using taps map and SubAssembly properly produces fully re-usable pipes completely independent of the input and output data formats.

Flow Before Taps Map and SubAssembly

So as we can see from the flow above, we are selecting employees with salary < 1000 and counting them by department. The code might look something like this:

//Define source tap
Tap source =
    new Hfs(new TextDelimited(new Fields("emp_name",
                                         "salary",
                                         "dept_id"),
                              "\t"),
            "emp.tsv"
    );

//Wire the pipe
Pipe pipe = new Each("emp",
                     new Fields("salary"),
                     new ExpressionFilter("salary < 1000",
                                          Double.class));
pipe = new GroupBy(pipe, new Fields("dept_id"));
pipe = new Every(pipe, new Count());

//Define the sink tap
Tap sink =
    new Hfs(new TextDelimited(new Fields("dept_id",
                                         "count"),
                              "\t"),
            "output",
            true
    );

//Create the flow
Flow flow = new FlowConnector().connect(source,
                                        sink,
                                        pipe);

//Run the flow
flow.complete();

While the code is simple enough, it suffers from several issues with performance, re-usability, and maintainability:

  1. Supports only TSV as the input and output format
  2. It assumes the URLs of the input as “emp.tsv”
  3. Output directory is assumed to be “output”
  4. Field names are hard-coded
  5. There is no way to wire the pipe into another pipes except by serializing input and output files (via a Cascade)

Passing parameters or configuration to the job could resolve first four issues, however the last issue produces large performance penalties and has no easy solution without some major re-factoring.

Suggested Re-factoring

By implementing a SubAssembly and utilizing a Taps Map, we can dramatically improve both the maintainability and performance of the code.

Implementing a SubAssembly

Cascading SubAssembly PipePlease note the following when creating the SubAssembly:

  1. Provide a default constructor with default tap names.
  2. Create String constants for all head and tail pipes for use in the TapsMap.
  3. Provide a secondary constructor to allow linking and re-using multiple SubAssemblies.

Example SubAssembly code:

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * Abstracts Pipe from Taps.
 * @author mpouttuclarke
 *
 */
public class EmpSubAssembly
 extends SubAssembly
{
 public static final String INPUT_PIPE_NAME = "input";
 public static final String OUTPUT_PIPE_NAME = "output";
 private static final long serialVersionUID = 1L;

 /**
 * Default constructor.
 */
 public EmpSubAssembly()
 {
   this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);
 }

 /**
 * Provides a way to rename the head and tail pipes.
 * @param input
 */
 public EmpSubAssembly(Pipe input, String tailName)
 {
   super();

   // Wire the pipe
   Pipe pipe = new Each(input,
                        new Fields("salary"),
                        new ExpressionFilter("salary < 1000",
                                             Double.class));
   pipe = new GroupBy(pipe, new Fields("dept_id"));
   pipe = new Every(pipe, new Count());
   pipe = new Pipe(tailName, pipe); //Name each tail

   setTails(pipe);
 }

}

Binding Taps with a TapsMap

Now we can bind the taps with a TapsMap, allowing separation of concerns between the data locations and format and the pipe. This means we can re-wire the pipe SubAssembly with any other pipes or SubAssemblies we wish, without effecting performance of the application as a whole.

Example of a TapsMap:

 //Define source tap
 Tap source =
   new Hfs(new TextDelimited(new Fields("emp_name",
                                        "salary",
                                        "dept_id"),
           "\t"),
           "emp.tsv"
 );

 //Define the sink
 Tap sink =
   new Hfs(new TextDelimited(new Fields("dept_id",
                                        "count"),
           "\t"),
           "output",
           true
 );

 //Create input taps map (same for output if multiple tail pipes)
 Map tapsMap = new HashMap();
 tapsMap.put(EmpSubAssembly.INPUT_PIPE_NAME, source);

 //SubAssembly
 Pipe pipe = new EmpSubAssembly();

 //Create the flow
 Flow flow =
   new FlowConnector().connect(tapsMap,
                               sink,
                               pipe);

 //Run the flow
 flow.complete()

Results of the Re-factoring

After the changes, we can take full advantage of the Flow auto-wiring capabilities of Cascading:

  1. We can re-wire and re-use any number of pipes and SubAssemblies in a single Flow.
  2. We can change the data locations and formats at will without effecting the functionality of the pipes.
  3. Cascading takes care of mapping the taps, no matter how deeply nested within multiple SubAssemblies the heads and tails are.
  4. We can re-name the pipes at will to prevent naming conflicts across a given Flow.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s