Tag Archives: Map Reduce

Hadoop Developer Training by Matt

100% PowerPoint free!  Learn at your own pace with 10 hours of video training and 15+ hours of in-depth exercises created by Matt Pouttu-Clarke and available exclusively through Harvard Innovation Labs on the Experfy Training Platform.

  • Learn to develop industrial strength MapReduce applications hands-on with tricks of the trade you will find nowhere else.
  • Apply advanced concepts such as Monte Carlo Simulations, Intelligent Hashing, Push Predicates, and Partition Pruning.
  • Learn to produce truly reusable User Defined Functions (UDFs) which stand the test of time and work seamlessly with multiple Hadoop tools and distributions.
  • Learn the latest industry best practices on how to utilize Hadoop ecosystem tools such as Hive, Pig, Flume, Sqoop, and Oozie in an Enterprise context.

Click here for more info!

Hyperdimensionality and Big Data

In this video training, Matt explains how hyperdimentional reasoning implicitly plays a part in all big data analyses and how today’s analytics and deep learning can utilize hyperdimensionality to improve accuracy and reduce algorithmic blind spots.

Watch on YouTube:

Apache Beam vs Apache Spark comparison

Google recently released a detailed comparison of the programming models of Apache Beam vs. Apache Spark. FYI: Apache Beam used to be called Cloud DataFlow before it was open sourced by Google:

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison1

Beam vs Spark
Spark requires more code than Beam for the same tasks

Here’s a link to the academic paper by Google describing the theory underpinning the Apache Beam execution model:

http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

When combined with Apache Spark’s severe tech resourcing issues caused by mandatory Scala dependencies, it seems that Apache Beam has all the bases covered to become the de facto streaming analytic API.  The cool thing is that by using Apache Beam you can switch run time engines between Google Cloud, Apache Spark, and Apache Flink.  A generic streaming API like Beam also opens up the market for others to provide better and faster run times as drop-in replacements.  Google is the perfect stakeholder because they are playing the cloud angle and don’t seem to be interested in supporting on-site deployments.  Hats off Google, and may the best Apache Beam run time win!

Feel the Beam! Google Casts a new Light on Big Data

using-lasers-to-preserve-mt-rushmoreApache Beam from Google finally provides robust unification of batch and real-time Big Data.  This framework replaced MapReduce, FlumeJava, and Millwheel at Google.  Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.  Anyone else seeing the future of Big Data in a new light?  I know I am…

Academic underpinning: http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Google’s comparison of Apache Beam vs. Apache Spark: https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison

Streaming Feature Extraction for Unions with statzall

unions-cSupporting unions for fields with multiple types makes for more robust and automated feature extraction.  For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.

Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding.  This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt.  With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.

In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP.  Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.

Capacity Planning with YARN

The YARN Application Master provides all the raw data to accurately estimate the resources required for your big data application to meet it’s SLAs when deployed to production.  By identifying crucial counters and deriving resource ratios by task and for the application as a whole we can even infer run times from a smaller test environment to a larger production footprint.

job-tracker-counters
Example of Hadoop MapReduce application counters

All YARN frameworks provide similar counters, however we will be using the popular Hadoop MapReduce framework as an example.  We can also get the same values displayed on the web interface above directly from the MapReduce API.  The following counters drive the capacity plan:

Counter Description Usage
TOTAL_LAUNCHED_MAPS Map tasks launched Used as a divisor to obtain avg map metrics
TOTAL_LAUNCHED_REDUCES Reduce tasks launched Used as a divisor to obtain avg reduce metrics
MILLIS_MAPS Total time spent by all maps (ms) Used as a numerator to obtain avg map task time
MILLIS_REDUCES Total time spent by all reduces (ms) Used as a numerator to obtain avg reduce task time
The following counters calculate twice, once for all Mappers and once for all Reducers, it’s important not to mix ratios across task types.
CPU_MILLISECONDS CPU time used Used as a numerator to obtain avg task CPU
COMMITTED_HEAP_BYTES RAM used Used as a numerator to obtain avg task RAM
FILE_READ_OPS Read Operations Used as a numerator to obtain avg task read ops
FILE_WRITE_OPS Write Operations Used as a numerator to obtain avg task write ops
FILE_BYTES_READ Read Bytes Used as a numerator to obtain avg task read bytes
FILE_BYTES_WRITTEN Write Bytes Used as a numerator to obtain avg task write bytes

The primary assumption when inferring between environments is that the data being operated on remains the same.  If the input data differs between environments then results may skew, especially for reducers.

Calculating Resource to Task Type Ratios

By calculating ratios, we can then scale the run time and other resources up and down depending on available task slots and quotas in the target environment.

Ratio Method
Time spent per map (ms) MILLIS_MAPS / TOTAL_LAUNCHED_MAPS
Time spent per reduce (ms) MILLIS_REDUCES / TOTAL_LAUNCHED_REDUCES
CPU used per map (ms) CPU_MILLISECONDS (for maps) / TOTAL_LAUNCHED_MAPS
CPU used per reduce (ms) CPU_MILLISECONDS / TOTAL_LAUNCHED_REDUCES
RAM used per map COMMITTED_HEAP_BYTES (for maps) / TOTAL_LAUNCHED_MAPS
RAM used per reduce COMMITTED_HEAP_BYTES (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Operations per map FILE_READ_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Read Operations per reduce FILE_READ_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Operations per map FILE_WRITE_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Write Operations per reduce FILE_WRITE_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Bytes per map FILE_BYTES_READ (for maps) / TOTAL_LAUNCHED_MAPS
Read Bytes per reduce FILE_BYTES_READ (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Bytes per map FILE_BYTES_WRITTEN (for maps) / TOTAL_LAUNCHED_MAPS
Write Bytes per reduce FILE_BYTES_WRITTEN (for reduces) / TOTAL_LAUNCHED_REDUCES

Capacity Scaling

We can now scale parallel task quotas and other resource quotas up and down to calculate the impact on the job for a particular environment. For example, wall clock time for the map phase can vary from all tasks running in parallel ( t = MILLIS_MAPS / TOTAL_LAUNCHED_MAPS ) all the way down to a single task running in parallel ( t = MILLIS_MAPS ). Similarly for all other variables.  For resource constraints, dividing by the most severe restriction governs the cost to total run time.  For example, if we enforce a quota restricting CPU time to CPU_MILLISECONDS * .5 then MILLIS_MAPS will be increased to MILLIS_MAPS / .5.  This would occur if for example the max mappers per node were increased to twice the number of cores.  Resource to Task Type Ratios come in handy for impact assessment and prediction based on any conceivable environmental constraint.

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}

How to Run an Elastic MapReduce Job Using the Java SDK

The current Amazon EMR documentation provides a lot of coverage of how to run a Hadoop custom jar using the AWS management console. This post covers how you do the same thing directly using the EC2 SDK for Java on Eclipse.

Prerequisites

To run the sample code, you need to set up and/or have knowledge of the following:

Running the Example Code

In order to run the example code below, please follow these setup steps:

  1. In Eclipse, select File -> New -> Other, and then search for AWS Java Project.
  2. Click Next, select a project name, select any other examples you want (S3, etc.) and enter your AWS Credentials.
  3. Click Next, then Finish.
  4. When the new project opens, right click on the (default package) -> New -> Class
  5. Enter the class name “ElasticMapReduceApp” and click Finish.
  6. Copy and paste the sample code below into the new class.
  7. Replace the string “your-bucket-name” with your S3 bucket name.
  8. Run the class.  It will report status as it runs.

After the run, the job output and job logs should appear as sub-directories under your S3 bucket.

Sample Code

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;
import com.amazonaws.services.elasticmapreduce.model.JobFlowExecutionState;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;

/**
 * Run the Amazon Cloudburst example directly using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class ElasticMapReduceApp
{

    private static final String HADOOP_VERSION = "0.20";
    private static final int INSTANCE_COUNT = 1;
    private static final String INSTANCE_TYPE = InstanceType.M1Large.toString();
    private static final UUID RANDOM_UUID = UUID.randomUUID();
    private static final String FLOW_NAME = "cloudburst-" + RANDOM_UUID.toString();
    private static final String BUCKET_NAME = "your-bucket-name";
    private static final String S3N_HADOOP_JAR =
        "s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar";
    private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/";
    private static final String[] JOB_ARGS =
        new String[] { "s3n://elasticmapreduce/samples/cloudburst/input/s_suis.br",
                      "s3n://elasticmapreduce/samples/cloudburst/input/100k.br",
                      "s3n://" + BUCKET_NAME + "/" + FLOW_NAME, "36", "3", "0",
                      "1",
                      "240",
                      "48",
                      "24", "24", "128", "16" };
    private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
    private static final List<JobFlowExecutionState> DONE_STATES = Arrays
        .asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED,
                                             JobFlowExecutionState.FAILED,
                                             JobFlowExecutionState.TERMINATED });
    static AmazonElasticMapReduce emr;

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));

        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args) throws Exception {

        System.out.println("===========================================");
        System.out.println("Welcome to the Elastic Map Reduce!");
        System.out.println("===========================================");

        init();

        try {
            // Configure instances to use
            JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
            System.out.println("Using EMR Hadoop v" + HADOOP_VERSION);
            instances.setHadoopVersion(HADOOP_VERSION);
            System.out.println("Using instance count: " + INSTANCE_COUNT);
            instances.setInstanceCount(INSTANCE_COUNT);
            System.out.println("Using master instance type: " + INSTANCE_TYPE);
            instances.setMasterInstanceType(INSTANCE_TYPE);
            System.out.println("Using slave instance type: " + INSTANCE_TYPE);
            instances.setSlaveInstanceType(INSTANCE_TYPE);

            // Configure the job flow
            System.out.println("Configuring flow: " + FLOW_NAME);
            RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
            System.out.println("\tusing log URI: " + S3N_LOG_URI);
            request.setLogUri(S3N_LOG_URI);

            // Configure the Hadoop jar to use
            System.out.println("\tusing jar URI: " + S3N_HADOOP_JAR);
            HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
            System.out.println("\tusing args: " + ARGS_AS_LIST);
            jarConfig.setArgs(ARGS_AS_LIST);
            StepConfig stepConfig =
                new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),
                               jarConfig);
            request.setSteps(Arrays.asList(new StepConfig[] { stepConfig }));

            //Run the job flow
            RunJobFlowResult result = emr.runJobFlow(request);

            //Check the status of the running job
            String lastState = "";
            STATUS_LOOP: while (true)
            {
                DescribeJobFlowsRequest desc =
                    new DescribeJobFlowsRequest(
                                                Arrays.asList(new String[] { result.getJobFlowId() }));
                DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
                for (JobFlowDetail detail : descResult.getJobFlows())
                {
                    String state = detail.getExecutionStatusDetail().getState();
                    if (isDone(state))
                    {
                        System.out.println("Job " + state + ": " + detail.toString());
                        break STATUS_LOOP;
                    }
                    else if (!lastState.equals(state))
                    {
                        lastState = state;
                        System.out.println("Job " + state + " at " + new Date().toString());
                    }
                }
                Thread.sleep(10000);
            }
        } catch (AmazonServiceException ase) {
                System.out.println("Caught Exception: " + ase.getMessage());
                System.out.println("Reponse Status Code: " + ase.getStatusCode());
                System.out.println("Error Code: " + ase.getErrorCode());
                System.out.println("Request ID: " + ase.getRequestId());
        }
    }

    /**
     * @param state
     * @return
     */
    public static boolean isDone(String value)
    {
        JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
        return DONE_STATES.contains(state);
    }
}

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  }
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)
1
2
3

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.

Top 10 Things I Love About Cascading + Hadoop on the Cloud

My recent experience with Map-Reduce on the Cloud really makes me happy I never got away from the tech… here’s why:

  1. No fixed infrastructure investments means that performance optimizations and tuning directly effect the bottom line.
  2. No longer have to estimate hardware for a system that isn’t built yet.
  3. Everything is open source so I don’t have to deal with software vendors.  I can fix things myself if I have to, rely on the community for help, and third party paid support is available if needed.
  4. Leverage the same software stack used by millions of other cloud developers.  No vapor-ware survives here…
  5. Completely open MapReduce implementation within Hadoop allows cool solutions like using an embedded Java database for looking up dimension data.
  6. Everything is testable using automated unit testing in the local JVM, leading to code robustness and quality far exceeding any data processing stack I’ve worked on.  To be able to debug the entire Hadoop technology stack on the local JVM is priceless.
  7. Any Java developer can pick up Cascading in a couple of months and be productive.
  8. Cascading joins offer more flexibility than SQL joins.  For example, Cascading OuterJoin supports empty data on either side of the join condition.
  9. Everything is streaming and INSERT only so data processing best practices are self-evident and mandatory in MapReduce, where the same best practices are optional in SQL-based systems.  For example, a true INSERT only database model is fairly rare in the Oracle world; however, in MapReduce an INSERT only model is the only option available.
  10. I can draw any Cascading data processing design using a simple flow chart, and have direct correlation between the design and the finished product every time.

To top it all off, Cascading is a great path for entry level college grads to get into big data processing.  Admittedly, you need to know a lot to make an entire distributed data processing system work on a day in and day out basis.  But you only need to know the fundamentals of any JVM-based language (Scala, Jython, JRuby, Groovy, Clojure, etc) to learn how to process big data with Cascading.  This is the first time in a very long time that any of my employers has been talking about bringing on interns or new grads.