Archive for the ‘ Map Reduce ’ Category

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}

How to Run an Elastic MapReduce Job Using the Java SDK

The current Amazon EMR documentation provides a lot of coverage of how to run a Hadoop custom jar using the AWS management console. This post covers how you do the same thing directly using the EC2 SDK for Java on Eclipse.

Prerequisites

To run the sample code, you need to set up and/or have knowledge of the following:

Running the Example Code

In order to run the example code below, please follow these setup steps:

  1. In Eclipse, select File -> New -> Other, and then search for AWS Java Project.
  2. Click Next, select a project name, select any other examples you want (S3, etc.) and enter your AWS Credentials.
  3. Click Next, then Finish.
  4. When the new project opens, right click on the (default package) -> New -> Class
  5. Enter the class name “ElasticMapReduceApp” and click Finish.
  6. Copy and paste the sample code below into the new class.
  7. Replace the string “your-bucket-name” with your S3 bucket name.
  8. Run the class.  It will report status as it runs.

After the run, the job output and job logs should appear as sub-directories under your S3 bucket.

Sample Code

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;
import com.amazonaws.services.elasticmapreduce.model.JobFlowExecutionState;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;

/**
 * Run the Amazon Cloudburst example directly using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class ElasticMapReduceApp
{

    private static final String HADOOP_VERSION = "0.20";
    private static final int INSTANCE_COUNT = 1;
    private static final String INSTANCE_TYPE = InstanceType.M1Large.toString();
    private static final UUID RANDOM_UUID = UUID.randomUUID();
    private static final String FLOW_NAME = "cloudburst-" + RANDOM_UUID.toString();
    private static final String BUCKET_NAME = "your-bucket-name";
    private static final String S3N_HADOOP_JAR =
        "s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar";
    private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/";
    private static final String[] JOB_ARGS =
        new String[] { "s3n://elasticmapreduce/samples/cloudburst/input/s_suis.br",
                      "s3n://elasticmapreduce/samples/cloudburst/input/100k.br",
                      "s3n://" + BUCKET_NAME + "/" + FLOW_NAME, "36", "3", "0",
                      "1",
                      "240",
                      "48",
                      "24", "24", "128", "16" };
    private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
    private static final List<JobFlowExecutionState> DONE_STATES = Arrays
        .asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED,
                                             JobFlowExecutionState.FAILED,
                                             JobFlowExecutionState.TERMINATED });
    static AmazonElasticMapReduce emr;

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));

        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args) throws Exception {

        System.out.println("===========================================");
        System.out.println("Welcome to the Elastic Map Reduce!");
        System.out.println("===========================================");

        init();

        try {
            // Configure instances to use
            JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
            System.out.println("Using EMR Hadoop v" + HADOOP_VERSION);
            instances.setHadoopVersion(HADOOP_VERSION);
            System.out.println("Using instance count: " + INSTANCE_COUNT);
            instances.setInstanceCount(INSTANCE_COUNT);
            System.out.println("Using master instance type: " + INSTANCE_TYPE);
            instances.setMasterInstanceType(INSTANCE_TYPE);
            System.out.println("Using slave instance type: " + INSTANCE_TYPE);
            instances.setSlaveInstanceType(INSTANCE_TYPE);

            // Configure the job flow
            System.out.println("Configuring flow: " + FLOW_NAME);
            RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
            System.out.println("\tusing log URI: " + S3N_LOG_URI);
            request.setLogUri(S3N_LOG_URI);

            // Configure the Hadoop jar to use
            System.out.println("\tusing jar URI: " + S3N_HADOOP_JAR);
            HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
            System.out.println("\tusing args: " + ARGS_AS_LIST);
            jarConfig.setArgs(ARGS_AS_LIST);
            StepConfig stepConfig =
                new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),
                               jarConfig);
            request.setSteps(Arrays.asList(new StepConfig[] { stepConfig }));

            //Run the job flow
            RunJobFlowResult result = emr.runJobFlow(request);

            //Check the status of the running job
            String lastState = "";
            STATUS_LOOP: while (true)
            {
                DescribeJobFlowsRequest desc =
                    new DescribeJobFlowsRequest(
                                                Arrays.asList(new String[] { result.getJobFlowId() }));
                DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
                for (JobFlowDetail detail : descResult.getJobFlows())
                {
                    String state = detail.getExecutionStatusDetail().getState();
                    if (isDone(state))
                    {
                        System.out.println("Job " + state + ": " + detail.toString());
                        break STATUS_LOOP;
                    }
                    else if (!lastState.equals(state))
                    {
                        lastState = state;
                        System.out.println("Job " + state + " at " + new Date().toString());
                    }
                }
                Thread.sleep(10000);
            }
        } catch (AmazonServiceException ase) {
                System.out.println("Caught Exception: " + ase.getMessage());
                System.out.println("Reponse Status Code: " + ase.getStatusCode());
                System.out.println("Error Code: " + ase.getErrorCode());
                System.out.println("Request ID: " + ase.getRequestId());
        }
    }

    /**
     * @param state
     * @return
     */
    public static boolean isDone(String value)
    {
        JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
        return DONE_STATES.contains(state);
    }
}

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  }
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)
1
2
3

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.

Top 10 Things I Love About Cascading + Hadoop on the Cloud

My recent experience with Map-Reduce on the Cloud really makes me happy I never got away from the tech… here’s why:

  1. No fixed infrastructure investments means that performance optimizations and tuning directly effect the bottom line.
  2. No longer have to estimate hardware for a system that isn’t built yet.
  3. Everything is open source so I don’t have to deal with software vendors.  I can fix things myself if I have to, rely on the community for help, and third party paid support is available if needed.
  4. Leverage the same software stack used by millions of other cloud developers.  No vapor-ware survives here…
  5. Completely open MapReduce implementation within Hadoop allows cool solutions like using an embedded Java database for looking up dimension data.
  6. Everything is testable using automated unit testing in the local JVM, leading to code robustness and quality far exceeding any data processing stack I’ve worked on.  To be able to debug the entire Hadoop technology stack on the local JVM is priceless.
  7. Any Java developer can pick up Cascading in a couple of months and be productive.
  8. Cascading joins offer more flexibility than SQL joins.  For example, Cascading OuterJoin supports empty data on either side of the join condition.
  9. Everything is streaming and INSERT only so data processing best practices are self-evident and mandatory in MapReduce, where the same best practices are optional in SQL-based systems.  For example, a true INSERT only database model is fairly rare in the Oracle world; however, in MapReduce an INSERT only model is the only option available.
  10. I can draw any Cascading data processing design using a simple flow chart, and have direct correlation between the design and the finished product every time.

To top it all off, Cascading is a great path for entry level college grads to get into big data processing.  Admittedly, you need to know a lot to make an entire distributed data processing system work on a day in and day out basis.  But you only need to know the fundamentals of any JVM-based language (Scala, Jython, JRuby, Groovy, Clojure, etc) to learn how to process big data with Cascading.  This is the first time in a very long time that any of my employers has been talking about bringing on interns or new grads.

Hadoop Jobs Grow 175,000% since 2006

According to Indeed, we have seen stellar growth of 175,000% in Hadoop job listings since 2006. The trend curve shows consistent, long term growth prospects:
Hadoop Relative Job Growth circa 2006

Given the comparatively lackluster job growth performance of back end technology mainstays like Oracle (30%), IBM (29%), and Microsoft (49%), it looks like back end engineers may need to start catching the wave…

Oracle Relative Job Growth circa 2006

IBM Relative Job Growth circa 2006

Microsoft Relative Job Growth circa 2006

Cascading TapsMap and SubAssembly: Best Practices

Cascading Pipes provide a lot of power to the experienced user due to all the things they take care of automatically; however, the down side of this is that we can easily do things the hard way without knowing it. This can effect the performance, maintainability, re-usability of flows and can trigger the need for significant re-factoring later on. One best practice to look at is how to use the taps map and SubAssembly together to provide separation of concerns between the taps (storing the data) and the pipes (processing the data). Using taps map and SubAssembly properly produces fully re-usable pipes completely independent of the input and output data formats.

Flow Before Taps Map and SubAssembly

So as we can see from the flow above, we are selecting employees with salary < 1000 and counting them by department. The code might look something like this:

//Define source tap
Tap source =
    new Hfs(new TextDelimited(new Fields("emp_name",
                                         "salary",
                                         "dept_id"),
                              "\t"),
            "emp.tsv"
    );

//Wire the pipe
Pipe pipe = new Each("emp",
                     new Fields("salary"),
                     new ExpressionFilter("salary < 1000",
                                          Double.class));
pipe = new GroupBy(pipe, new Fields("dept_id"));
pipe = new Every(pipe, new Count());

//Define the sink tap
Tap sink =
    new Hfs(new TextDelimited(new Fields("dept_id",
                                         "count"),
                              "\t"),
            "output",
            true
    );

//Create the flow
Flow flow = new FlowConnector().connect(source,
                                        sink,
                                        pipe);

//Run the flow
flow.complete();

While the code is simple enough, it suffers from several issues with performance, re-usability, and maintainability:

  1. Supports only TSV as the input and output format
  2. It assumes the URLs of the input as “emp.tsv”
  3. Output directory is assumed to be “output”
  4. Field names are hard-coded
  5. There is no way to wire the pipe into another pipes except by serializing input and output files (via a Cascade)

Passing parameters or configuration to the job could resolve first four issues, however the last issue produces large performance penalties and has no easy solution without some major re-factoring.

Suggested Re-factoring

By implementing a SubAssembly and utilizing a Taps Map, we can dramatically improve both the maintainability and performance of the code.

Implementing a SubAssembly

Cascading SubAssembly PipePlease note the following when creating the SubAssembly:

  1. Provide a default constructor with default tap names.
  2. Create String constants for all head and tail pipes for use in the TapsMap.
  3. Provide a secondary constructor to allow linking and re-using multiple SubAssemblies.

Example SubAssembly code:

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * Abstracts Pipe from Taps.
 * @author mpouttuclarke
 *
 */
public class EmpSubAssembly
 extends SubAssembly
{
 public static final String INPUT_PIPE_NAME = "input";
 public static final String OUTPUT_PIPE_NAME = "output";
 private static final long serialVersionUID = 1L;

 /**
 * Default constructor.
 */
 public EmpSubAssembly()
 {
   this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);
 }

 /**
 * Provides a way to rename the head and tail pipes.
 * @param input
 */
 public EmpSubAssembly(Pipe input, String tailName)
 {
   super();

   // Wire the pipe
   Pipe pipe = new Each(input,
                        new Fields("salary"),
                        new ExpressionFilter("salary < 1000",
                                             Double.class));
   pipe = new GroupBy(pipe, new Fields("dept_id"));
   pipe = new Every(pipe, new Count());
   pipe = new Pipe(tailName, pipe); //Name each tail

   setTails(pipe);
 }

}

Binding Taps with a TapsMap

Now we can bind the taps with a TapsMap, allowing separation of concerns between the data locations and format and the pipe. This means we can re-wire the pipe SubAssembly with any other pipes or SubAssemblies we wish, without effecting performance of the application as a whole.

Example of a TapsMap:

 //Define source tap
 Tap source =
   new Hfs(new TextDelimited(new Fields("emp_name",
                                        "salary",
                                        "dept_id"),
           "\t"),
           "emp.tsv"
 );

 //Define the sink
 Tap sink =
   new Hfs(new TextDelimited(new Fields("dept_id",
                                        "count"),
           "\t"),
           "output",
           true
 );

 //Create input taps map (same for output if multiple tail pipes)
 Map tapsMap = new HashMap();
 tapsMap.put(EmpSubAssembly.INPUT_PIPE_NAME, source);

 //SubAssembly
 Pipe pipe = new EmpSubAssembly();

 //Create the flow
 Flow flow =
   new FlowConnector().connect(tapsMap,
                               sink,
                               pipe);

 //Run the flow
 flow.complete()

Results of the Re-factoring

After the changes, we can take full advantage of the Flow auto-wiring capabilities of Cascading:

  1. We can re-wire and re-use any number of pipes and SubAssemblies in a single Flow.
  2. We can change the data locations and formats at will without effecting the functionality of the pipes.
  3. Cascading takes care of mapping the taps, no matter how deeply nested within multiple SubAssemblies the heads and tails are.
  4. We can re-name the pipes at will to prevent naming conflicts across a given Flow.

Cascading.Avro

Cascading.Avro provides a Cascading Scheme implementation for Apache Avro.  This allows Cascading flows to read and write Avro encoded data with ease.  This post covers the basics of how to use Cascading.Avro and some of the gotchas involved in reading, writing, and transforming data using Avro in Cascading.  We will also walk through a benchmark using Avro versus CSV format to give an idea of the performance benefits and cost savings related to storing and processing data in Avro format.

Writing Avro Data With Cascading

First we need to define some Fields and data types to provide some meta data context.  Let’s assume that we are dealing with IP location data:

Fields fields =
 new Fields(
 "ipFrom",
 "ipTo",
 "countryCode",
 "countryName",
 "region",
 "city",
 "latitude",
 "longitude"
 );
Class<?>[] types =
 new Class<?>[] {
 Long.class,
 Long.class,
 String.class,
 String.class,
 String.class,
 String.class,
 Double.class,
 Double.class
 };

Then we can initialize an a Sink using AvroScheme, allowing us to write Cascading Tuples into an Avro format:

Tap avro = new Lfs(new AvroScheme(fields, types), "test");

Behind the scenes the AvroScheme automatically creates an Avro Schema using the defined fields and data types.  This saves time and effort involved with creating the Avro schema file.  Now we can write some Tuples to the Tap:

 TupleEntryCollector out = sink.openForWrite(new JobConf());

 TupleEntry t = new TupleEntry(fields, new Tuple(new Object[types.length]));
 t.set("ipFrom", 329807419L);
 t.set("ipTo", 329807422L);
 t.set("countryCode", "UC");
 t.set("countryName", "Undefined Country");
 t.set("region", "Undefined Region");
 t.set("city", "Undefined City");
 t.set("latitude", -10.88377366D);
 t.set("longitude", 40.38827661D);
 out.add(t);

 t.set("ipFrom", 245235454L);
 t.set("ipTo", 455322234L);
 t.set("countryCode", "UC2");
 t.set("countryName", "Undefined Country2");
 t.set("region", "Undefined Region2");
 t.set("city", "Undefined City2");
 t.set("latitude", 20.7372661D);
 t.set("longitude", -5.8483992D);
 out.add(t);

 out.close();

A few gotchas here:

  • If we do not pass a Tuple to the constructor for TupleEntry then we get a NullPointerException.
  • If we initialize the Tuple with the default constructor, or initialize with the wrong number of objects in the Object[] then we get the following exception: cascading.tuple.TupleException: failed to set a value, tuple may not be initialized with values, is zero length.
  • Avro is very strict with type conversions, so if we forget the L at the end of IpFrom value constants then we get the error: org.apache.avro.AvroRuntimeException: Not in union ["null","long"]: 329807419

I chalk these up to the APIs being fairly new, and while they functionally work as designed the “nice to haves” like automatic type conversions aren’t quite there yet.  All the APIs do encourage re-using value objects which really helps performance in streaming frameworks like Hadoop and Cascading.

After this Cascading has successfully written an Avro binary file to the test-avro directory with the file name part-00000.avro.  The file is in standard Avro platform independent format, and we can read it using any Avro client (Java, Scala, C, C++, others?).  Note that this is a marked advantage over sequence files in that we do not have to use the Hadoop file APIs to read them:

 File file = new File("test-avro/part-00000.avro");
 DataFileReader<GenericRecord> in =
   new DataFileReader<GenericRecord>(
     file, new GenericDatumReader<GenericRecord>());
 System.out.println(in.getSchema());  //Print the schema
 GenericRecord rec = new GenericData.Record(in.getSchema());
 while(in.hasNext()) {
   in.next(rec);  //re-use the same record instead of creating each time
   System.out.println(rec);  //Print the record
   System.out.println("Extracted IP From: " + rec.get("ipFrom"));
 }
 in.close();

And here is the program output:

{"type":"record","name":"CascadingAvroSchema","namespace":"","fields":[{"name":"ipFrom","type":["null","long"],"doc":""},{"name":"ipTo","type":["null","long"],"doc":""},{"name":"countryCode","type":["null","string"],"doc":""},{"name":"countryName","type":["null","string"],"doc":""},{"name":"region","type":["null","string"],"doc":""},{"name":"city","type":["null","string"],"doc":""},{"name":"latitude","type":["null","double"],"doc":""},{"name":"longitude","type":["null","double"],"doc":""}]}
{"ipFrom": 329807419, "ipTo": 329807422, "countryCode": "UC", "countryName": "Undefined Country", "region": "Undefined Region", "city": "Undefined City", "latitude": -10.88377366, "longitude": 40.38827661}
Extracted IP From: 329807419
{"ipFrom": 245235454, "ipTo": 455322234, "countryCode": "UC2", "countryName": "Undefined Country2", "region": "Undefined Region2", "city": "Undefined City2", "latitude": 20.7372661, "longitude": -5.8483992}
Extracted IP From: 245235454

Note that even though the data is in binary format, if you call toString() on a GenericRecord you get a valid JSON string!

Reading Avro Data With Cascading

As you could imagine, reading Avro data in Cascading.Avro is easier than writing it, since the JSON meta data precedes the data in all Avro files.   For this example, I’ll also demonstrate a useful feature of Avro called “projection”.  That means we can parse only a sub-set of the fields without selecting the whole set of fields.  Cascading.Avro supports projection by specifying a sub-set of the fields when defining the Tap.  Here’s how it works:

 Fields projectedFields =
   new Fields(
     "ipTo"
   );
 Class<?>[] projectedTypes =
   new Class<?>[] {
     Long.class
   };
 Tap source = new Lfs(new AvroScheme(projectedFields, projectedTypes), "test-avro");
 TupleEntryIterator tuplesIn = source.openForRead(new JobConf());
 while(tuplesIn.hasNext()) {
   System.out.println(tuplesIn.next());
 }
 tuplesIn.close();

And here’s the output:

fields: ['ipTo'] tuple: ['329807422']
fields: ['ipTo'] tuple: ['455322234']

There are two useful optimizations built into the Cascading.Avro projection process:

  1. Avro only parses the one field, saving memory and processor time.
  2. The fields are in native Cascading format, ready for use in later Sub-Assemblies or Flows.

Parsing Performance CSV vs. Avro

So I also did a simple test of parsing performance using on one hand a CSV file parsed with a BufferedReader and a Regex versus Avro.  I mainly wanted to see how Avro would perform versus CSV using plain old Java.  My data set was the IP2Location DB5 data set, which has ~3 million rows in CSV format.

Parsing Speed

CSV file with buffered reader and regex : 22150 ms
Avro file (no compression): 1644 ms
Avro is 13.5 times faster than CSV (1350%)

Output Size

Data size (CSV)  : 292,767,737 bytes
Data size (Avro) : 187,551,110 bytes
Space reduction : 35% (no compression)

Code Used for Benchmark

Code for CSV parsing:

 //Input setup
 BufferedReader read =
   new BufferedReader
     (new InputStreamReader
     (new FileInputStream("./target/test.csv")));
 long counter = 0L;
 String line = null;
 long start = System.currentTimeMillis();
 while((line = read.readLine()) != null) {
   String[] splits = line.split("^\"|\"\\,\"|\"\\n\"|\"$|\\n");
   if("BALI".equals(splits[5])) {
     counter++;
   }
 }
 read.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Count with Region : \"BALI\" = " + counter);

Code used for Avro parsing (with projection):

 //Schema setup (originally written as this schema)
 Schema schema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo.avsc"));
 //Projection (only decode Region field)
 Schema newSchema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo-region-only.avsc"));

 //Set up input
 File file = new File("./target/test.avro");
 DatumReader<GenericRecord> decoder =
   new GenericDatumReader<GenericRecord>(schema, newSchema);
 DataFileReader<GenericRecord> reader =
   new DataFileReader<GenericRecord>(file, decoder);

 //Prepare for scan
 GenericRecord rec = new GenericData.Record(newSchema);
 long counter = 0L;
 Utf8 bali = new Utf8("BALI");
 long start = System.currentTimeMillis();
 while(reader.hasNext()) {
   reader.next(rec);
   if(rec.get("Region").equals(bali)) {
     counter++;
   }
 }
 reader.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Projection : " + rec.toString());
 System.out.println("Count with Region : \"BALI\" = " + counter);

Schema for all data: ipinfo.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The geographic information related to an IP address",
 "fields" : [
 { "name" : "IpFrom",        "type" : "long" },
 { "name" : "IpTo",            "type" : "long" },
 { "name" : "CountryCode",    "type" : "string" },
 { "name" : "CountryName",    "type" : "string" },
 { "name" : "Region",        "type" : "string" },
 { "name" : "City",            "type" : "string" },
 { "name" : "Latitude",        "type" : "double" },
 { "name" : "Longitude",        "type" : "double" }
 ]
}

Schema for Region projection:  ipinfo-region-only.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The Region information related to an IP address",
 "fields" : [
 { "name" : "Region",        "type" : "string" }
 ]
}

Conclusion

So from our examples we can see the following about Cascading.Avro:

  1. The Avro encoding provides superior encoding and decoding speed vs. text formats.
  2. Avro files take up less space, will use less memory, and will read and write faster from an IO perspective vs. text formats.
  3. Avro files provide the benefits of Sequence Files, without requiring Hadoop to create and read the files.  This could be critical for data collection since we could produce the Avro files while collecting data, providing a additional speed and scalability boost to data collectors.
  4. Cascading.Avro eliminates the tedious and error prone process of associating fields and data types with text files during a write.  We are encouraged to access fields by name rather than index, aiding maintainability and extensibility.
  5. Cascading.Avro automatically populates Cascading Fields and data types from the meta data of existing Avro files, so all we do is read the file and the meta data comes with it.
  6. We apply projection simply by specifying a sub-set of fields to select.  Cascading.Avro takes care of creating a projection schema which saves time and improves maintainability.
  7. We never even have to deal with Arvo schema files since Cascading.Avro automatically generates them from Cascading fields.

So in conclusion it looks like using Cascading.Avro is better, stronger, and faster than using either text files or sequence files in Cascading, and built-in schema evolution features of Avro make this a future proof data storage/processing model for Hadoop.

CouchDB and Hadoop

CouchDB, a top level Apache project, utilizes a RESTful interface to serve up dynamic JSON data on an Internet scale.  CouchDB provides ACID guarantees with no locking using Multi-version Concurrency control, and also scales via Shared-nothing deployment using multi-master replication.  Data access leverages a map/reduce query model expressed in JavaScript, as detailed below.  This page provides a brief overview of CouchDB and potential utilization in a Hadoop Amazon Elastic MapReduce deployment.

CouchDB by Example

Looking at the Apache pages utilizes some useful examples of how to use CouchDB.  However, I found this cool Mu Dynamics simulator which shows how to query CouchDB by example.  I found that the simulator quickly clarifies how the Map/Reduce query processing works with concrete problems.  A much faster way to kick the tires than building and installing from scratch…

Scalability and Performance

The Apache wiki provides some pointers on CouchDB performance.  It seems that striping (RAID 0) provides the best performance improvements in the benchmarks I’ve seen, esp in environments with many users and a lot of random reads (see below).  CouchDB doesn’t support parallel query out of the box but opening multiple sockets to the same server would allow parallel access to data for a single client.

The map/reduce query capability produces materialized views automatically: featuring automated incremental refresh.  Therefore if the data doesn’t change in a view then each query does not re-query the base data.  By default the first query after a committed change to the base data triggers the incremental refresh of the view (this is configurable).

In terms of scalability, multi-master replication provides multiple data copies across concurrent machines and/or data centers.  Via the stateless RESTful interface load balancing doesn’t get any easier…  Also, sharding is not supported out of the box but solutions like Lounge and Pillow look promising for supporting larger data sets.

Cloud Deployment

Check out this blog post on how to spin up a CouchDB cluster on Amazon EC2 using Puppet. Seems an ideal candidate for Elastic Load Balancing: the ability to automatically spin up instances as demand increases should really make CouchDB live up to it’s name!

Potential Use Cases With Hadoop

Use of CouchDB to serve key/value reference data in an Inverse REST architecture seems an ideal use case.  Since we could maintain the CouchDB data using a separate GUI we could correct reference data or meta data dynamically as the job is running.  No need to wait around until a large job has crashed and burned to fix reference data problems.  Probably also we would want to have an LRU cache on the client side (in the Hadoop job) to minimize network hits.

Conclusion

Although it may seem counter-intuitive, leveraging the multi-layered REST caching and load balancing architecture to move small data close to big data could make a lot of sense, especially in a cloud environment designed to adapt dynamically to large spikes.

Java Embedded DB for IP2Location in Hadoop

Since the IP geographic data operates on ranges of IP addresses, we cannot simply join the log data to the IP data (using RDBMS or MapReduce), and we cannot use a simple key/value cache to look-up the IP data.  Currently the entire IP2Location DB-5 data set consumes 1+ GB of memory when loaded into a Java object array.  While we can currently fit this in memory with the current infrastructure we will have issues if this data grows significantly beyond it’s current size.  Therefore we have to look at some alternatives for looking up the data without loading it all in memory.  We cannot use a middleware or shared database solution since these will quickly become overwhelmed by requests from our cluster.  Likewise, we cannot afford to take a network hit for every look-up and we currently cannot batch look-up requests to reduce network hits.   We need a shared-nothing architecture, and therefore copying a Java embedded database locally to pull our small data close to our big data seems the best approach.  This page evaluates several Java embedded databases as a fit for the use case.

This test is Hadoop/Cascading specific in that it uses the standard set of jar files included in the class path of Apache Hadoop 20.2 and Cascading 1.2.  For example, we used the hsqldb-1.8.0.10.jar bundled with Hadoop.  Leveraging an embedded DB which is not only thread safe but also concurrent will allow re-use of in-process memory cache across multiple threads using the Hadoop Task JVM Reuse feature.

Databases Evaluated

The following databases were evaluated:

  1. Baseline load into in-memory array.
  2. Apache Derby 10.7.1.1
  3. HSQLDB 1.8.0.10
  4. H2 1.2.147

Each of the databases were set up, configured, and queried using identical SQL scripts and JDBC code, except where specifically noted in the test results section.  Also, we require the exclusive use of a JDBC interface to the database so we can easily swap DBs based on performance and stability.

Test Results

Test results for the in-memory baseline and each of the embedded DBs follows:

DB Load Time Load Rows Load Rate IP Lookup Rate
Derby 1 min, 51 sec 3,237,642 29168 / sec 82486 / sec ****
H2 1 min, 10 sec 3,237,642 46252 / sec *** 61527 / sec
in-memory array 32 sec 3,237,642 101176 / sec 133 / sec
HSQLDB 3 min, 28 sec 3,237,642 15565 / sec * 1 /  22 sec **

* = -Xmx3G JVM setting was required to load the data

** = -Xmx1G JVM setting was required to query the data

*** = -Xmx1G JVM setting was required to load the data

**** = Only Derby supported concurrent thread access, and 4 threads were used in the IP Lookup test

Test Cases

Create Table

The table was created with the following columns as:

CREATE TABLE IP_INFO (
  IP_FROM BIGINT,
  IP_TO BIGINT,
  COUNTRY_CODE VARCHAR(2),
  COUNTRY_NAME VARCHAR(64),
  REGION VARCHAR(64),
  CITY VARCHAR(64),
  LATITUDE DOUBLE,
  LONGITUDE DOUBLE
)

Load Data

The following load scripts where used to load up to the maximum 3,237,642 rows.  Data set based on IP2Location DB-5.  Some of the Java database tested could not adequately handle the data size so for those databases a smaller number of rows were loaded, as noted in the test results section:

INSERT INTO IP_INFO (
  IP_FROM,
  IP_TO,
  COUNTRY_CODE,
  COUNTRY_NAME,
  REGION,
  CITY,
  LATITUDE,
  LONGITUDE
) VALUES (
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?
)

In addition, the following JDBC code to load the data utilizes prepared statements and JDBC batch processing:

Connection conn = DriverManager.getConnection(<db url>);

PreparedStatement ps = conn.prepareStatement(insertString);

// read data file and get fields for each line ...

// for each field, bind values to the PreparedStatement
ps.setLong(1, g.getIpFrom());

ps.addBatch();

// add batch for every line, and execute batch every 100 lines
if(lineNumber % 100 == 0) {
  ps.executeBatch();
}
// at the end, execute final batch and commit ...
ps.executeBatch();
conn.commit();

Create Index

A b-tree index on IP_TO is required to select the matching IP range:

CREATE INDEX IP_INFO_I01 ON IP_INFO ( IP_TO )

Select IP Number in Range

The first row matching the following SELECT query matches the geographic information for an IP address range.  Any additional rows are not read and are ignored:

SELECT *
FROM IP_INFO
WHERE IP_TO >= ?
ORDER BY IP_TO

The following JDBC code utilizes prepared statements and bind variables to select the first matching row:

PreparedStatement ps = conn.prepareStatement(<sql>);
ps.setFetchSize(1);
ps.setMaxRows(1); 

// the following code iterates for each look up test execution
long random = (long)(Math.random() * Integer.MAX_VALUE);
ps.setLong(1, random);
ResultSet rs = ps.executeQuery();
if(rs.next()) {
  // test getting fields, and verify result
}

Test System Configuration

Test was run on the following platform configuration:

Hardware Overview:

  Model Name:    MacBook Pro
  Model Identifier:    MacBookPro6,2
  Processor Name:    Intel Core i7
  Processor Speed:    2.66 GHz
  Number Of Processors:    1
  Total Number Of Cores:    2
  L2 Cache (per core):    256 KB
  L3 Cache:    4 MB
  Memory:    4 GB
  Processor Interconnect Speed:    4.8 GT/s
  Boot ROM Version:    MBP61.0057.B0C

  Serial-ATA:
    Intel 5 Series Chipset:

      Vendor:    Intel
      Product:    5 Series Chipset
      Link Speed:    3 Gigabit
      Negotiated Link Speed:    1.5 Gigabit
      Description:    AHCI Version 1.30 Supported

  Disk:
    Hitachi HTS545050B9SA02:

      Capacity:    500.11 GB (500,107,862,016 bytes)
      Model:    Hitachi HTS545050B9SA02
      Revision:    PB4AC60W

System Software Overview:

  System Version:    Mac OS X 10.6.5 (10H574)
  Kernel Version:    Darwin 10.5.0

JVM Overview:

  java version "1.6.0_22"
  Java(TM) SE Runtime Environment (build 1.6.0_22-b04-307-10M3261)
  Java HotSpot(TM) 64-Bit Server VM (build 17.1-b03-307, mixed mode)

Modifications to Tests

Some tests had to be modified, either due to time constraints (test could not run in a reasonable time) or because the test crashed or hung.

In Memory Array

We had to run the test with JVM parameter -Xmx2G to ensure all the data could be loaded into memory.  Since we are not using a DB, the query logic is simply to iterate through the array to find the first row with IP_TO >= target IP.

Please note: we cannot use a binary search algorithm since we do not know the specific value we are searching for a priori.  We considered writing a binary search variant to find the first element >= the target IP, however we could not find a library which does this and we don’t have the time to write and test one ourselves.

Derby

We had to set the block cache size above the default to improve cache hits for multi-threaded access:

System.getProperties().setProperty("derby.storage.pageCacheSize", "524288");

We set the JVM -xmx1G for the IP Lookups and used the JVM default 128M for the data load.

HSQLDB

We had to make many modifications to the code, the DML, and SQL as well as the JVM memory settings to make the data load work. We could not find a way to force HSql to use the b-tree index so the query performance represents the default plan selected by HSQLDB.

Here is a description of the changes and why they were made:

JVM Memory Settings

We started out with the 128M default JVM heap and received an out of memory error before we completely loaded 200,000 rows.  We had to conclude that HSql only saves to disk after a commit or checkpoint, and keeps the entire set of data buffered in memory.  This was confirmed when we added a CHECKPOINT after each 100,000 rows and still got the same “java.lang.OutOfMemoryError: Java heap space”, but we did see data on disk after each checkpoint.

So we had to set memory to -Xmx3G to load the data set, and we had to take the CHECKPOINT out because it looks like HSql writes all the data to one big file.  So every time you do a CHECKPOINT the overhead becomes larger and larger to write the data.  It is also possible it’s writing the entire file with every checkpoint as well.  Definitely never use CHECKPOINT or incremental commits when writing data set on GB+ sizes, and make sure your available memory is 3 times the size of the data on disk after the load.

When querying the data, again we have the problem of HSql loading the entire data set into memory.  I.e. we could not query the data at all with the default 128MB of JVM memory allocated.  Setting -Xmx1G works around this problem.

Code and SQL Changes

We could not create the index due to this error:

java.sql.SQLException: java.io.IOException: S1000 Data file size limit is reached in statement [CREATE INDEX IP_INFO_I01 ON IP_INFO(IP_TO)]

We tried to load only 100,000 rows to see if the index is actually used but the best query rate we could get was 17 / sec so if the index was used the performance definitely would not scale to 320 times that data size.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

H2

H2 also required changes to the JVM settings to work properly.

JVM Memory Settings

With the default 128M JVM settings we were able to reach just over 400,000 rows loaded before running out of heap space.  Setting the heap to -Xmx1G fixed this issue.  Similarly to HSQLDB it looks like H2 loads the entire data set into memory for the transaction.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

Conclusion

Derby provides the best fit for this use case due to:

  1. Concurrency: multiple threads can query embedded Derby within the same JVM process.
  2. The performance of Derby in multi-threaded mode outstrips H2′s single threaded performance on a 2 core machine.  We expect this to scale linearly to our target 16 core production deployment.
  3. Sharing process memory across multiple threads allows us to scale look up data size while maximizing shared cache hits.

However, H2 demonstrated superior single-threaded performance and admirable load speeds and used 40% less space on-disk than Derby.

All-in-all Derby wins for this use case, and H2 gets admirable mention.

The Elephant and the Rhino

If we use JavaScript for Code On Demand couldn’t we also use it for Inverse Code on Demand using Hadoop?  It’s not as hard to find people who know JavaScript as it is to find people who know Groovy or Python.  Less of a learning curve for everyone involved if we use the most popular scripting language for our embedded business rules.   Plus if we build them right we could use the same business rule across our entire ultra high scale web presence: from the browser to the elastic map reduce

Mozilla Rhino doesn’t have a lot of marketing behind it, but it’s been around.  Feature rich, mature, and embeddable: Mozilla’s JavaScript engine looks like a winner. We could inject input splits and emit results with ease.

Plus if you thought the Bull in the china shop was fun, wait until you see the Elephant and the Rhino!

Make the Elephant and the Rhino your partner, and utterly destroy a china shop near you!  China Shops don’t scale anyway…

Tips for Implementing Rhino in Hadoop

Make sure to use the Java scripting API compilation option in the setup and cleanup method of your mapper or reducer. Please see this article on how to compile scripts.  This dramatically reduces the CPU requirements and execution time for scripts.

Follow

Get every new post delivered to your Inbox.