Tag Archives: Cloud Computing

Hadoop Developer Training by Matt

100% PowerPoint free!  Learn at your own pace with 10 hours of video training and 15+ hours of in-depth exercises created by Matt Pouttu-Clarke and available exclusively through Harvard Innovation Labs on the Experfy Training Platform.

  • Learn to develop industrial strength MapReduce applications hands-on with tricks of the trade you will find nowhere else.
  • Apply advanced concepts such as Monte Carlo Simulations, Intelligent Hashing, Push Predicates, and Partition Pruning.
  • Learn to produce truly reusable User Defined Functions (UDFs) which stand the test of time and work seamlessly with multiple Hadoop tools and distributions.
  • Learn the latest industry best practices on how to utilize Hadoop ecosystem tools such as Hive, Pig, Flume, Sqoop, and Oozie in an Enterprise context.

Click here for more info!

Hyperdimensionality and Big Data

In this video training, Matt explains how hyperdimentional reasoning implicitly plays a part in all big data analyses and how today’s analytics and deep learning can utilize hyperdimensionality to improve accuracy and reduce algorithmic blind spots.

Watch on YouTube:

Oracle SWiS DAX APIs: instant classic

The Software In Silicon Data Analytic Accelerator (SWiS DAX) APIs released by Oracle this week signify a sea change for big data and fast data analytic processing.  SWiS-DAXNatively accelerated common analytic functions usable in C, Python, and Java have already shown a 6x lift for a Spark cube building applicationApache Flink and Apache Drill completely eclipse Spark performance so it will be very interesting to see upcoming benchmarks of these higher performing frameworks on SWiS DAX.  There is nothing to keep any vendor or group from bench marking with these APIs as they will work with any C, Python, or Java application.

I’m also looking forward to testing performance of SWiS DAX on non-partitionable data sets in a big memory SMP architecture as well.  The easy problems are partitionable, and true data discovery should allow any-to-any relations without injecting a-priori partitioning assumptions.

It seems that Oracle’s long standing commitment to developing Sun’s Sparc processors is about to pay off in a very big way for big data and fast data analysts.

The sound of one hand clapping?

bubble-burstTo me this weekend wasn’t the Panthers vs. Broncos match-up for Super Bowl 50, or when we found out that Bernie Sanders won the New Hampshire primary.  Although both of these were hoooge: it WAS when these parallel but significant facts emerged:

  1. Google makes it’s historical first open source contribution to the Apache Foundation in the form of Apache Beam
  2. Apache Beam supports three run time engines: Google Cloud, Apache Spark, and Apache Flink
  3. Independent reproducible academic research shows that Apache Flink handily out-performs Apache Spark on in-memory terasort workload
  4. Google releases a rigorous point-by-point comparison showing that Apache Beam’s programming model requires less code than Apache Spark to do the same tasks

So for whoever drank the Spark cool-aid let me translate: you write more code to do things more slowly *and* now have the privilege of competing head-to-head with Google.

This is what’s called a bubble, folks.

Please VC funders: end this quickly it’s more painless that way.  And don’t put Spark on your resume because then people might notice the cool-aid stains.

Apache Beam vs Apache Spark comparison

Google recently released a detailed comparison of the programming models of Apache Beam vs. Apache Spark. FYI: Apache Beam used to be called Cloud DataFlow before it was open sourced by Google:


Beam vs Spark
Spark requires more code than Beam for the same tasks

Here’s a link to the academic paper by Google describing the theory underpinning the Apache Beam execution model:


When combined with Apache Spark’s severe tech resourcing issues caused by mandatory Scala dependencies, it seems that Apache Beam has all the bases covered to become the de facto streaming analytic API.  The cool thing is that by using Apache Beam you can switch run time engines between Google Cloud, Apache Spark, and Apache Flink.  A generic streaming API like Beam also opens up the market for others to provide better and faster run times as drop-in replacements.  Google is the perfect stakeholder because they are playing the cloud angle and don’t seem to be interested in supporting on-site deployments.  Hats off Google, and may the best Apache Beam run time win!

Feel the Beam! Google Casts a new Light on Big Data

using-lasers-to-preserve-mt-rushmoreApache Beam from Google finally provides robust unification of batch and real-time Big Data.  This framework replaced MapReduce, FlumeJava, and Millwheel at Google.  Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.  Anyone else seeing the future of Big Data in a new light?  I know I am…

Academic underpinning: http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Google’s comparison of Apache Beam vs. Apache Spark: https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison

Streaming Feature Extraction for Unions with statzall

unions-cSupporting unions for fields with multiple types makes for more robust and automated feature extraction.  For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.

Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding.  This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt.  With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.

In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP.  Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.

Titan: Cassandra vs. Hazelcast persistence benchmark

10 node load test comparison using Amazon EC2 SSD-based instances.  1 billion vertices and 1 billion edges processed for each test run.  Used the titan-loadtest project to run each test.


Experiment maximizes data locality by co-locating load generation, Titan graph database, and Cassandra/Hazelcast within the same JVM instance while partitioning data across a cluster. Exploration of methods for tuning garbage collection, Titan, and Cassandra for the peer computing use case.

The following components were utilized during the experiment:

Technology Version
RHEL x64 HVM AMI 6.4
Oracle JDK x64 1.7_45
Apache Cassandra 1.2.9
Hazelcast 3.1.1
Titan 0.3.2

Each test iteration has 6 read ratio phases starting with 0% reads (100% writes) all the way up to 90% reads and 10% writes.  For all tests, the persistence implementation executes in the same JVM as Titan to avoid unnecessary context switching and serialization overhead.  Tests were conducted using an Amazon placement group to ensure instances resided on the same subnet.  The storage was formatted with 4K blocks and used the noop scheduler to improve latency.

For each phase, new vertices were added with one edge linking back to a previous vertex.  No tests of update or delete were conducted.

Please see the titan-loadtest project above for all Cassandra and Hazelcast settings and configurations used in the test.


Titan 10 node Summary
Titan 10 node Summary

Please note: the results are listed in rates of thousands of vertices per second and include the creation of an edge as well as a vertex. Also, the Hazelcast SSD x1 results used a custom flash storage module for Hazelcast developed privately so those results are not replicable without that module installed.


Hazelcast performed better than Cassandra for all tests and demonstrated one order of magnitude better performance on reads.  Surprisingly, Hazelcast slightly outperformed Cassandra for writes as well.

Large Java Heap with the G1 Collector – Part 1

Demonstration of efficient garbage collection on JVM heap sizes in excess of 128 gigabytes.  Garbage collection behavior analyzed via a mini-max optimization strategy.  Study measures maximum throughput versus minimum garbage collection pause to find optimization “sweet spot” across experimental phases.  Replicable results via well-defined open source experimental methods executable on Amazon EC2 hardware. 

Experimental Method


  • Demonstrate maximum feasible JVM size on current cloud hardware (specific to Amazon for now) using the G1 Garbage Collector (link).
  • Vary the JVM heap size (-Xmx) exponentially to find performance profile and breaking points.
  • Vary the ratio of new versus old generation objects exponentially.
  • Using in-memory workload to stress the JVM (avoids network or disk waits).
  • Produce replicable results on commodity hardware, open source operating systems, and open source tools.
  • Provide gap-free data for analysis, in spite of garbage collection pauses.

Not (Yet) in Scope

In followup to this study, subsequent efforts may include:

  • Vary the number of old objects up to maximum possible memory capacity.
  • Vary the number of processing threads to starve the concurrent G1 algorithm of CPU cycles.
  • Vary the object size.
  • Vary the field size within the object.
  • Vary the G1 JVM parameters.

Tools and Versions

  1. Amazon Linux AMI (link)
  2. One cr1.8xlarge instance (link): 244 GiB RAM, 2 x Intel Xeon E5-2670 (link)
  3. Oracle JDK 1.7_15

JVM Parameters

This experiment varied the JVM heap size but kept other parameters constant.

  • -Xmx16g, -Xmx32g, -Xmx64g, -Xmx128g, -Xmx212g
  • -XX:+UseG1GC
  • -XX:InitiatingHeapOccupancyPercent=0
  • -XX:MaxGCPauseMillis=200
  • -XX:MaxTenuringThreshold=25
  • -XX:ParallelGCThreads=32
  • -XX:ConcGCThreads=32
  • -XX:G1ReservePercent=10
  • -XX:G1HeapRegionSize=32m

Program Parameters

These parameters were kept constant for this study.

  • statInterval = 1000: the reporting interval for measures in milliseconds.
  • numThreads = 24: number of processing threads.
  • objectsPerThread = 8,000,000: number of objects to produce for each phase.
  • maxOld = 800,000: max number of old references to maintain, after which old references are overwritten with new values.

Experimental Phases

For each JVM heap size, the following phases are executed.  For each phase, after maxOld reached object references get overwritten and must be garbage collected.

  • RANDOM_WRITE_0: All of the objects produced are instant garbage, references not kept.
  • RANDOM_WRITE_1: All of the objects references kept.
  • RANDOM_WRITE_2: 1/2 of object references kept.
  • RANDOM_WRITE_4: 1/4 of object references kept.
  • RANDOM_WRITE_8: 1/8 of object references kept.

Experiment Results

Sum of Garbage Collection Pause
Sum of Garbage Collection Pause

The graph above shows the sum total of *all* garbage collection pauses across each experimental phase.

Min of Object Throughput
Min of Object Throughput (per second)

The graph above shows the *min* throughput for each second across each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The line chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The radar chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.


Java garbage collection test class:

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

public class GCTest {

    public static String configName = "Test";
    public static int statInterval = 1000;
    public static int numThreads = 4;
    public static int objectPerThread = 500000;
    public static int maxOld = 1000000;

    public static enum ExperimentPhase {

    public static int phaseIdx;
    public static ExperimentPhase phase;

    public static CyclicBarrier barrier;

    public static int[] oldToYoungRatios = new int[] { 0, 8, 4, 2, 1 };

    public static volatile boolean done = false;

    public static AtomicLong totalOfInterval = new AtomicLong();

    public static Object[] oldArr = new Object[maxOld];

    public static ConcurrentHashMap<Integer, Map<String, Object>> oldMap = new ConcurrentHashMap<Integer, Map<String, Object>>(

     * Generates maximum amount of objects with nested references. Saves some
     * references so they go to the old generation.
     * @author pouttum
    public static class GarbageThread extends Thread {

        protected int threadNo;

        public GarbageThread(int threadNo) {
            this.threadNo = threadNo;

        public void run() {


            // Incremental update phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = getDoc(x);


            // Incremental clear phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = null;

            // Random write / update phase
            for (int r = 0; r < oldToYoungRatios.length; r++) {
                for (int x = 0; x < objectPerThread; x++) {
                    Map<String, Object> doc = getDoc(x);
                    if (oldToYoungRatios[r] > 0
                            && (oldToYoungRatios[r] == 1 || (x
                                    % oldToYoungRatios[r] == 0))) {
                        int index = (int) (Math.ceil(random() * maxOld));
                        oldMap.put(index, doc);


            // Random read phase
            for (int x = 0; x < objectPerThread; x++) {
                int index = (int) (Math.ceil(random() * maxOld));

        protected void await() {
            try {
            } catch (Exception e) {

        protected HashMap<String, Object> getDoc(int x) {
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("value1", "value1" + String.valueOf(x));
            doc.put("value2", "value2" + String.valueOf(x));
            doc.put("value3", "value3" + String.valueOf(x));
            doc.put("value4", "value4" + String.valueOf(x));
            doc.put("value5", "value5" + String.valueOf(x));
            doc.put("value6", "value6" + String.valueOf(x));
            doc.put("value7", "value7" + String.valueOf(x));
            doc.put("value8", "value8" + String.valueOf(x));
            doc.put("value9", "value9" + String.valueOf(x));
            return doc;

        protected double random() {
            return Math.random();


     * Calculates ongoing stats and keeps history on the stat interval.
     * @author pouttum
    public static class StatThread extends Thread {

        public void run() {
            Date previousDate = new Date();
            long adjStatInterval = statInterval;
            int intervalCount = 0;
            do {
                try {
                } catch (InterruptedException e) {
                    done = true;
                adjStatInterval = statInterval;
                long intervalTotal = totalOfInterval.getAndSet(0L);
                Date date = new Date();
                double intervalSeconds = (date.getTime() - previousDate
                        .getTime()) / 1000d;

                StringBuilder stats = new StringBuilder(1024);
                float statIntervalDouble = statInterval / 1000f;
                double gcPause = intervalSeconds - statIntervalDouble;
                if (intervalSeconds > statIntervalDouble * 2) {
                    double x = statIntervalDouble * 2;
                    for (; x < intervalSeconds; x += statIntervalDouble) {
                                configName, phase, ++intervalCount, 0,
                        gcPause -= statIntervalDouble;
                if (gcPause > 0.0d) { // Credit the next interval with some of
                                        // the count of this interval
                    adjStatInterval -= gcPause * 1000L;
                    long intervalTotalAdj = Math
                            .round((gcPause / statIntervalDouble)
                                    * intervalTotal);
                    intervalTotal -= intervalTotalAdj;
                        configName, phase, ++intervalCount, intervalTotal,
                        Math.max(gcPause, 0.0d)));
                previousDate = date;
            } while (!done);


     * * @param args
     * @throws InterruptedException
     * */
    public static void main(String[] args) throws Exception {
        if (args.length == 5) {
            configName = args[0];
            statInterval = Integer.parseInt(args[1]);
            numThreads = Integer.parseInt(args[2]);
            objectPerThread = Integer.parseInt(args[3]);
            maxOld = Integer.parseInt(args[4]);
        barrier = new CyclicBarrier(numThreads,
                new Runnable() {
                    public void run() {
                        phase = ExperimentPhase.values()[phaseIdx++];
        GarbageThread[] threads = new GarbageThread[numThreads];
        for (int x = 0; x < threads.length; x++) {
            threads[x] = new GarbageThread(x);
        StatThread statThread = new StatThread();
        for (int x = 0; x < threads.length; x++) {
            try {
            } catch (InterruptedException e) {
        done = true;


Compile the code:

jdk1.7.0_15/bin/javac GCTest.java

Unix script run.sh:

jdk1.7.0_15/bin/java -server -Xms16g -Xmx16g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg16 1000 24 8000000 800000 > baseline.csv
jdk1.7.0_15/bin/java -server -Xms32g -Xmx32g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg32 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms64g -Xmx64g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg64 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms128g -Xmx128g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg128 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms212g -Xmx212g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest old_ratio_8 1000 24 8000000 800000 >> baseline.csv

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

 * Monitor EMR spending using the AWS SDK for Java.
 * @author mpouttuclarke
public class EMRMonitor
    public static class InstanceRate
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;

         * @return the normalizedHours
        public int getNormalizedHours()
            return normalizedHours;

         * @return the emrRate
        public double getEmrRate()
            return emrRate;

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));

     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
    private static void init()
        throws Exception
        AWSCredentials credentials =
            new PropertiesCredentials(
        emr = new AmazonElasticMapReduceClient(credentials);

    public static void main(String[] args)
        throws Exception
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
                slaveInstanceType = masterInstanceType;
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.toString().replaceAll("\\s+", " ")));

     * @param rate
     * @param instanceHours
     * @return
    public static double getInstanceCharge(String instanceType, double instanceHours)
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();

     * @param detail
     * @param rate
     * @return
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;