Category Archives: Map Reduce

Map reduce algorithm introduced by Google to support massively parallel processing across huge data sets, and later implemented by the open source Hadoop framework.

Game Over for VMs, what’s next?

That includes the Java VM.  Yes, you heard it.  I’ve been writing Java since 1996 and in 2016 I can officially say that all the reasons I supported Java for all these years no longer apply.  I accurately predicted the rise of Java when the technology was literally a laughing stock, and I have stuck with it for very good reasons until now.  What’s Changed?  And more importantly: What’s Next?

FYI: this post relates to mission critical enterprise software, not desktop software

What’s Changed?

  • Back in the day, there were legions of different processors, endian issues, and not even agreement on how big a “byte” was.  Now only three viable von Neumann architectures exist:  Intel, Intel, and Intel.  The von Neumann architecture itself is dead too.  We’ll get to that.
  • In yesteryear, networks were slow so you had to move the code around to avoid moving the data across the network.  Java Applets and Hadoop are good examples of this.  Java was ideal for this because of platform independence, dynamic class loading, and dynamic compilation to native code.  Now it is actually the software which is slowing the networks today, not the other way around.  We’ll get to that.
  • In the old days, operating systems vied for superiority, spreading FUD as they went.  No one knew who would win (nail-biter).  Now there are only three operating systems vying for dominance: and they are all flavors of Linux.

What’s Next?

Spinning up a Linux container has literally almost no overhead, and yet has enterprise class resource management, security, and robustness.  The industry currently focuses on microservices as a design pattern for client side applications, however this pattern applies equally to server-side applications as well.  New Linux flavors like CoreOS and Alpine build on this concept where everything except the kernel is a microservice operating in a container.  This allows very high levels of performance, security, and efficiency in the kernel that all the other services rely upon.  These new server-side microservice platforms provide all the enterprise class deployment, management, monitoring, security, and interoperability that the Java Platform delivered 21 years ago, without the need for a virtual machine of any kind.  Server-side microservices provide both resource isolation and maximum performance at the same time: at a level which neither the Java VM nor any VM can come close to matching.  And what would be the language of choice for implementing these world changing server-side microservices?

The OS itself is implemented in C, so naturally any server-side microservice not implemented in C will have a very hard time competing with those who are.  Note that the container model completely eliminates the normal native package management hell associated with C, even to the point where an “Apple Store” for containers was recently announced by Docker.

Marketplaces like the Docker Store allow purchasing an entire server cluster pre-configured with server-side microservices of your choice on any cloud platform or even in a local bare metal data center.  The same solution also solves the cloud vendor lock-in that many companies have been struggling with.  Like I said: GAME OVER

On a final note: von Neumann microprocessors no longer fit Moore’s Law and price / performance ratios been degrading for some time now.  The data volumes and low latency requirements of the Internet-of-Things will soon place unbearable pressures on the von Neumann microprocessor model.  Programmable hardware such as FPGA have traditionally required learning different languages and complete software re-write to take advantage of programmable processor architecture.  Seymour Cray founded a company 20 years ago with this exact realization, and the graphic below says it all.  If you want to leverage the power of the Saturn FPGA cartridge, you’ll be writing your code in C, thank you very much 🙂

saturn-i-comparisons-041013-2-e1422647383272

 

 

Hadoop Developer Training by Matt

100% PowerPoint free!  Learn at your own pace with 10 hours of video training and 15+ hours of in-depth exercises created by Matt Pouttu-Clarke and available exclusively through Harvard Innovation Labs on the Experfy Training Platform.

  • Learn to develop industrial strength MapReduce applications hands-on with tricks of the trade you will find nowhere else.
  • Apply advanced concepts such as Monte Carlo Simulations, Intelligent Hashing, Push Predicates, and Partition Pruning.
  • Learn to produce truly reusable User Defined Functions (UDFs) which stand the test of time and work seamlessly with multiple Hadoop tools and distributions.
  • Learn the latest industry best practices on how to utilize Hadoop ecosystem tools such as Hive, Pig, Flume, Sqoop, and Oozie in an Enterprise context.

Click here for more info!

Hyperdimensionality and Big Data

In this video training, Matt explains how hyperdimentional reasoning implicitly plays a part in all big data analyses and how today’s analytics and deep learning can utilize hyperdimensionality to improve accuracy and reduce algorithmic blind spots.

Watch on YouTube:

The sound of one hand clapping?

bubble-burstTo me this weekend wasn’t the Panthers vs. Broncos match-up for Super Bowl 50, or when we found out that Bernie Sanders won the New Hampshire primary.  Although both of these were hoooge: it WAS when these parallel but significant facts emerged:

  1. Google makes it’s historical first open source contribution to the Apache Foundation in the form of Apache Beam
  2. Apache Beam supports three run time engines: Google Cloud, Apache Spark, and Apache Flink
  3. Independent reproducible academic research shows that Apache Flink handily out-performs Apache Spark on in-memory terasort workload
  4. Google releases a rigorous point-by-point comparison showing that Apache Beam’s programming model requires less code than Apache Spark to do the same tasks

So for whoever drank the Spark cool-aid let me translate: you write more code to do things more slowly *and* now have the privilege of competing head-to-head with Google.

This is what’s called a bubble, folks.

Please VC funders: end this quickly it’s more painless that way.  And don’t put Spark on your resume because then people might notice the cool-aid stains.

Apache Beam vs Apache Spark comparison

Google recently released a detailed comparison of the programming models of Apache Beam vs. Apache Spark. FYI: Apache Beam used to be called Cloud DataFlow before it was open sourced by Google:

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison1

Beam vs Spark
Spark requires more code than Beam for the same tasks

Here’s a link to the academic paper by Google describing the theory underpinning the Apache Beam execution model:

http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

When combined with Apache Spark’s severe tech resourcing issues caused by mandatory Scala dependencies, it seems that Apache Beam has all the bases covered to become the de facto streaming analytic API.  The cool thing is that by using Apache Beam you can switch run time engines between Google Cloud, Apache Spark, and Apache Flink.  A generic streaming API like Beam also opens up the market for others to provide better and faster run times as drop-in replacements.  Google is the perfect stakeholder because they are playing the cloud angle and don’t seem to be interested in supporting on-site deployments.  Hats off Google, and may the best Apache Beam run time win!

Feel the Beam! Google Casts a new Light on Big Data

using-lasers-to-preserve-mt-rushmoreApache Beam from Google finally provides robust unification of batch and real-time Big Data.  This framework replaced MapReduce, FlumeJava, and Millwheel at Google.  Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.  Anyone else seeing the future of Big Data in a new light?  I know I am…

Academic underpinning: http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Google’s comparison of Apache Beam vs. Apache Spark: https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison

Streaming Feature Extraction for Unions with statzall

unions-cSupporting unions for fields with multiple types makes for more robust and automated feature extraction.  For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.

Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding.  This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt.  With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.

In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP.  Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.

Capacity Planning with YARN

The YARN Application Master provides all the raw data to accurately estimate the resources required for your big data application to meet it’s SLAs when deployed to production.  By identifying crucial counters and deriving resource ratios by task and for the application as a whole we can even infer run times from a smaller test environment to a larger production footprint.

job-tracker-counters
Example of Hadoop MapReduce application counters

All YARN frameworks provide similar counters, however we will be using the popular Hadoop MapReduce framework as an example.  We can also get the same values displayed on the web interface above directly from the MapReduce API.  The following counters drive the capacity plan:

Counter Description Usage
TOTAL_LAUNCHED_MAPS Map tasks launched Used as a divisor to obtain avg map metrics
TOTAL_LAUNCHED_REDUCES Reduce tasks launched Used as a divisor to obtain avg reduce metrics
MILLIS_MAPS Total time spent by all maps (ms) Used as a numerator to obtain avg map task time
MILLIS_REDUCES Total time spent by all reduces (ms) Used as a numerator to obtain avg reduce task time
The following counters calculate twice, once for all Mappers and once for all Reducers, it’s important not to mix ratios across task types.
CPU_MILLISECONDS CPU time used Used as a numerator to obtain avg task CPU
COMMITTED_HEAP_BYTES RAM used Used as a numerator to obtain avg task RAM
FILE_READ_OPS Read Operations Used as a numerator to obtain avg task read ops
FILE_WRITE_OPS Write Operations Used as a numerator to obtain avg task write ops
FILE_BYTES_READ Read Bytes Used as a numerator to obtain avg task read bytes
FILE_BYTES_WRITTEN Write Bytes Used as a numerator to obtain avg task write bytes

The primary assumption when inferring between environments is that the data being operated on remains the same.  If the input data differs between environments then results may skew, especially for reducers.

Calculating Resource to Task Type Ratios

By calculating ratios, we can then scale the run time and other resources up and down depending on available task slots and quotas in the target environment.

Ratio Method
Time spent per map (ms) MILLIS_MAPS / TOTAL_LAUNCHED_MAPS
Time spent per reduce (ms) MILLIS_REDUCES / TOTAL_LAUNCHED_REDUCES
CPU used per map (ms) CPU_MILLISECONDS (for maps) / TOTAL_LAUNCHED_MAPS
CPU used per reduce (ms) CPU_MILLISECONDS / TOTAL_LAUNCHED_REDUCES
RAM used per map COMMITTED_HEAP_BYTES (for maps) / TOTAL_LAUNCHED_MAPS
RAM used per reduce COMMITTED_HEAP_BYTES (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Operations per map FILE_READ_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Read Operations per reduce FILE_READ_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Operations per map FILE_WRITE_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Write Operations per reduce FILE_WRITE_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Bytes per map FILE_BYTES_READ (for maps) / TOTAL_LAUNCHED_MAPS
Read Bytes per reduce FILE_BYTES_READ (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Bytes per map FILE_BYTES_WRITTEN (for maps) / TOTAL_LAUNCHED_MAPS
Write Bytes per reduce FILE_BYTES_WRITTEN (for reduces) / TOTAL_LAUNCHED_REDUCES

Capacity Scaling

We can now scale parallel task quotas and other resource quotas up and down to calculate the impact on the job for a particular environment. For example, wall clock time for the map phase can vary from all tasks running in parallel ( t = MILLIS_MAPS / TOTAL_LAUNCHED_MAPS ) all the way down to a single task running in parallel ( t = MILLIS_MAPS ). Similarly for all other variables.  For resource constraints, dividing by the most severe restriction governs the cost to total run time.  For example, if we enforce a quota restricting CPU time to CPU_MILLISECONDS * .5 then MILLIS_MAPS will be increased to MILLIS_MAPS / .5.  This would occur if for example the max mappers per node were increased to twice the number of cores.  Resource to Task Type Ratios come in handy for impact assessment and prediction based on any conceivable environmental constraint.

Hermetic Lambdas: a Solution for Big Data Dependency Collisions

Dependency collisions furnish plenty of headaches and late nights while implementing Lambda Architecture.  Java class loading containers tend toward complexity and many frameworks such as Hadoop, JCascalog, and Storm provide tons of transitive dependencies but no dependency isolation for third party Lambda code deployed in the same JVM.  So currently to benefit from running in the same JVM, all the transitive dependencies come along for the ride.  This impacts:

  • Portability: Lambda logic shared in the batch layer and the speed layer must cope with at least two different sets of dependencies.
  • Manageability: framework version upgrades may cause hard failures or other incompatibilities in Lambda logic due to changing dependency versions.
  • Performance: solving the problem by forcing JVM isolation incurs overhead from IPC or network chatter.  Likewise, utilizing scripting or dynamic languages merely hides the problem rather than solving it, while imposing additional performance penalties.
  • Complexity: developing Lambdas requires knowledge and accommodation of all the dependencies of all frameworks targeted for deployment.

Many developers and architects rightfully shy away from deploying bloated dependency container frameworks such as Spring or OSGI inside of a big data framework.  Big data and fast data provide enough complexity already.  So lets look at how we can use a simple pattern, along with basic core Java libraries, to avoid “Dependency Hell“.

Signs you may be encountering this problem include these types of exceptions occurring after deployment:

  • java.lang.NoSuchMethodError: when a method no longer exists which your code depends on
  • java.lang.ClassNotFoundException: when a class your code uses cannot be found in the deployed class path
  • java.lang.IllegalAccessException: when conflicting versions of an API mark methods or fields private

Please reference my github project for a complete working implementation along with unit tests.


Shows how the Lambda executes within the big data framework in an isolated dependency context
Lambda Execution Model

Hermetic Classloading

The Java Service Loader framework introduced in JDK 1.6 provides a way to dynamically bind an implementation class to an interface while specifying an alternate class loader.  Loading a Lamda implementation with ServiceLoader forces all code called within the context of the Lamda to use the same class loader.  This allows creating a service which supports parent-first class loading, child-first class loading, or child-only (or hermetic) class loading.  In this case, the hermetic loader prevents any possible dependency collisions.  To create a hermetic loader, we need simply utilize the built-in URLClassLoader in Java.  Our implementation jar can reside on the local file system, on a web server, or in HDFS: anywhere a URL can point to.  For the parent class loader, we specify the Java bootstrap class loader.  So we can implement a hermetic class loading pattern in one line of code:

ServiceLoader<Map> loader = ServiceLoader.load(Map.class, 
    new URLClassLoader(urls, Map.class.getClassLoader()));

Note that we intentionally avoid calling ClassLoader.getSystemClassLoader() in order to prevent the calling context (such as Hadoop or Hive) form polluting the Lambda class path.  Core packages such as java.lang and java.util use the bootstrap class loader, which only carries core Java dependencies shipped as part of the JDK.  The diagram above shows how the LambdaBoot framework fits within a big data framework such as Hadoop.

Mapping the World

In the example above, we use a Map interface to interact with the Lambda.  This allows us to avoid having a separate jar containing a Service Provider Interface (SPI).  Instead, we can subclass the Map and provide any behavior desired by intercepting get and put calls to specific keys.  By optionally returning a Future from a get or put call on the Map, we get asynchronous interaction as well if desired.  In addition, the non-sensitive Map keys can provide metadata facilities.  In the example of linear regression implemented as a Map, we use a conflicting version of Apache Commons Math not yet supported by Hadoop to calculate the regression.

Shows how the LambdaBoot framework can work with deployment tools such as Jenkins, Puppet, and Git
Lambda Deployment Model

Implications

Using a very simple pattern, we can deploy Hermetic Lambdas to any Java environment without fear of dependency collisions.  The ServiceLoader also acts as a service registry, allowing us to browse metadata about available Lambdas, dynamically load new Lamdas, or update existing Lambdas.

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}