Tag Archives: Tips and Tricks

Why I tried Apache Spark, and moved on..

I tried Apache Spark, and moved on.  Here’s why:

Resourcing Issuesspark-do-not-enter

Apache Spark, written in Scala, causes severe resourcing issues for customers due to the additional technical skill requirements:

  1. Scala ranks #30 with 0.5 % in market saturation, while Java ranks #1 with 21.5% of the market, a difference of 4300%: http://www.tiobe.com/index.php/content/paperinfo/tpci/index.html
  2. Introduction of native Functional Programming constructs into the Java language with release 1.8 practically eliminates the business case for Scala altogether: https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html
  3. Scala works best with IntelliJ Idea IDE, which has licensing costs and is extremely unlikely to replace free Eclipse tooling at any large company
  4. Scala is among a crowd of strong contenders and faces a moving target as Java has gained 5% in market share between 2015 and 2016.  To put this in perspective, Scala has less market share than Lisp

Consistency and Integrity Issues

Trying to get Spark to meet rigorous standards of data consistency and integrity proves difficult.  Apache Spark’s design originates from companies who consider Data Consistency and Data Integrity secondary concerns, while most industries consider these primary concerns.  For example, achieving at-most-once and at-least-once consistency from Spark requires numerous workarounds and hacks: http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Dependency Hell with a Vengeance

Apache Spark (and Scala) import a huge number of transitive dependencies compared to other alternative technologies.  Programmers must master all of those dependencies in order to master Spark.  No wonder very few true experts in Spark exist in the market today.

What’s the Alternative to Spark?

For real-time in-memory processing Use Case: data grids, once the purview of blue chip commercial vendors, now have very strong open source competition.  Primary contenders include Apache Ignite and Hazelcast.

For fast SQL analytics (OLAP) Use Case: Apache Drill provides similar performance to Spark SQL with a much simpler, more efficient, and more robust footprint.  Apache Kylin from eBay looks to become a major OLAP player very quickly, although I have not used it myself.

For stream processing Use Case: Apache Beam from Google looks likely to become the de-facto streaming workhorse, unseating both Apache Flink and Spark Streaming.  Major big data vendors have already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.

If you try these alternative technologies, and compare to Spark, I’m sure you’ll agree that Spark isn’t worth the headache.

 

Streaming Feature Extraction for Unions with statzall

unions-cSupporting unions for fields with multiple types makes for more robust and automated feature extraction.  For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.

Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding.  This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt.  With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.

In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP.  Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.

Capacity Planning with YARN

The YARN Application Master provides all the raw data to accurately estimate the resources required for your big data application to meet it’s SLAs when deployed to production.  By identifying crucial counters and deriving resource ratios by task and for the application as a whole we can even infer run times from a smaller test environment to a larger production footprint.

job-tracker-counters
Example of Hadoop MapReduce application counters

All YARN frameworks provide similar counters, however we will be using the popular Hadoop MapReduce framework as an example.  We can also get the same values displayed on the web interface above directly from the MapReduce API.  The following counters drive the capacity plan:

Counter Description Usage
TOTAL_LAUNCHED_MAPS Map tasks launched Used as a divisor to obtain avg map metrics
TOTAL_LAUNCHED_REDUCES Reduce tasks launched Used as a divisor to obtain avg reduce metrics
MILLIS_MAPS Total time spent by all maps (ms) Used as a numerator to obtain avg map task time
MILLIS_REDUCES Total time spent by all reduces (ms) Used as a numerator to obtain avg reduce task time
The following counters calculate twice, once for all Mappers and once for all Reducers, it’s important not to mix ratios across task types.
CPU_MILLISECONDS CPU time used Used as a numerator to obtain avg task CPU
COMMITTED_HEAP_BYTES RAM used Used as a numerator to obtain avg task RAM
FILE_READ_OPS Read Operations Used as a numerator to obtain avg task read ops
FILE_WRITE_OPS Write Operations Used as a numerator to obtain avg task write ops
FILE_BYTES_READ Read Bytes Used as a numerator to obtain avg task read bytes
FILE_BYTES_WRITTEN Write Bytes Used as a numerator to obtain avg task write bytes

The primary assumption when inferring between environments is that the data being operated on remains the same.  If the input data differs between environments then results may skew, especially for reducers.

Calculating Resource to Task Type Ratios

By calculating ratios, we can then scale the run time and other resources up and down depending on available task slots and quotas in the target environment.

Ratio Method
Time spent per map (ms) MILLIS_MAPS / TOTAL_LAUNCHED_MAPS
Time spent per reduce (ms) MILLIS_REDUCES / TOTAL_LAUNCHED_REDUCES
CPU used per map (ms) CPU_MILLISECONDS (for maps) / TOTAL_LAUNCHED_MAPS
CPU used per reduce (ms) CPU_MILLISECONDS / TOTAL_LAUNCHED_REDUCES
RAM used per map COMMITTED_HEAP_BYTES (for maps) / TOTAL_LAUNCHED_MAPS
RAM used per reduce COMMITTED_HEAP_BYTES (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Operations per map FILE_READ_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Read Operations per reduce FILE_READ_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Operations per map FILE_WRITE_OPS (for maps) / TOTAL_LAUNCHED_MAPS
Write Operations per reduce FILE_WRITE_OPS (for reduces) / TOTAL_LAUNCHED_REDUCES
Read Bytes per map FILE_BYTES_READ (for maps) / TOTAL_LAUNCHED_MAPS
Read Bytes per reduce FILE_BYTES_READ (for reduces) / TOTAL_LAUNCHED_REDUCES
Write Bytes per map FILE_BYTES_WRITTEN (for maps) / TOTAL_LAUNCHED_MAPS
Write Bytes per reduce FILE_BYTES_WRITTEN (for reduces) / TOTAL_LAUNCHED_REDUCES

Capacity Scaling

We can now scale parallel task quotas and other resource quotas up and down to calculate the impact on the job for a particular environment. For example, wall clock time for the map phase can vary from all tasks running in parallel ( t = MILLIS_MAPS / TOTAL_LAUNCHED_MAPS ) all the way down to a single task running in parallel ( t = MILLIS_MAPS ). Similarly for all other variables.  For resource constraints, dividing by the most severe restriction governs the cost to total run time.  For example, if we enforce a quota restricting CPU time to CPU_MILLISECONDS * .5 then MILLIS_MAPS will be increased to MILLIS_MAPS / .5.  This would occur if for example the max mappers per node were increased to twice the number of cores.  Resource to Task Type Ratios come in handy for impact assessment and prediction based on any conceivable environmental constraint.

Large Java Heap with the G1 Collector – Part 1

Demonstration of efficient garbage collection on JVM heap sizes in excess of 128 gigabytes.  Garbage collection behavior analyzed via a mini-max optimization strategy.  Study measures maximum throughput versus minimum garbage collection pause to find optimization “sweet spot” across experimental phases.  Replicable results via well-defined open source experimental methods executable on Amazon EC2 hardware. 

Experimental Method

Goals

  • Demonstrate maximum feasible JVM size on current cloud hardware (specific to Amazon for now) using the G1 Garbage Collector (link).
  • Vary the JVM heap size (-Xmx) exponentially to find performance profile and breaking points.
  • Vary the ratio of new versus old generation objects exponentially.
  • Using in-memory workload to stress the JVM (avoids network or disk waits).
  • Produce replicable results on commodity hardware, open source operating systems, and open source tools.
  • Provide gap-free data for analysis, in spite of garbage collection pauses.

Not (Yet) in Scope

In followup to this study, subsequent efforts may include:

  • Vary the number of old objects up to maximum possible memory capacity.
  • Vary the number of processing threads to starve the concurrent G1 algorithm of CPU cycles.
  • Vary the object size.
  • Vary the field size within the object.
  • Vary the G1 JVM parameters.

Tools and Versions

  1. Amazon Linux AMI (link)
  2. One cr1.8xlarge instance (link): 244 GiB RAM, 2 x Intel Xeon E5-2670 (link)
  3. Oracle JDK 1.7_15

JVM Parameters

This experiment varied the JVM heap size but kept other parameters constant.

  • -Xmx16g, -Xmx32g, -Xmx64g, -Xmx128g, -Xmx212g
  • -XX:+UseG1GC
  • -XX:InitiatingHeapOccupancyPercent=0
  • -XX:MaxGCPauseMillis=200
  • -XX:MaxTenuringThreshold=25
  • -XX:ParallelGCThreads=32
  • -XX:ConcGCThreads=32
  • -XX:G1ReservePercent=10
  • -XX:G1HeapRegionSize=32m

Program Parameters

These parameters were kept constant for this study.

  • statInterval = 1000: the reporting interval for measures in milliseconds.
  • numThreads = 24: number of processing threads.
  • objectsPerThread = 8,000,000: number of objects to produce for each phase.
  • maxOld = 800,000: max number of old references to maintain, after which old references are overwritten with new values.

Experimental Phases

For each JVM heap size, the following phases are executed.  For each phase, after maxOld reached object references get overwritten and must be garbage collected.

  • RANDOM_WRITE_0: All of the objects produced are instant garbage, references not kept.
  • RANDOM_WRITE_1: All of the objects references kept.
  • RANDOM_WRITE_2: 1/2 of object references kept.
  • RANDOM_WRITE_4: 1/4 of object references kept.
  • RANDOM_WRITE_8: 1/8 of object references kept.

Experiment Results

Sum of Garbage Collection Pause
Sum of Garbage Collection Pause

The graph above shows the sum total of *all* garbage collection pauses across each experimental phase.

Min of Object Throughput
Min of Object Throughput (per second)

The graph above shows the *min* throughput for each second across each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The line chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The radar chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Addenda

Java garbage collection test class:

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

public class GCTest {

    public static String configName = "Test";
    public static int statInterval = 1000;
    public static int numThreads = 4;
    public static int objectPerThread = 500000;
    public static int maxOld = 1000000;

    public static enum ExperimentPhase {
        SEQUENTIAL_WRITE, SEQUENTIAL_CLEAR, RANDOM_WRITE_0, RANDOM_WRITE_8, RANDOM_WRITE_4, RANDOM_WRITE_2, RANDOM_WRITE_1, RANDOM_READ
    };

    public static int phaseIdx;
    public static ExperimentPhase phase;

    public static CyclicBarrier barrier;

    public static int[] oldToYoungRatios = new int[] { 0, 8, 4, 2, 1 };

    public static volatile boolean done = false;

    public static AtomicLong totalOfInterval = new AtomicLong();

    public static Object[] oldArr = new Object[maxOld];

    public static ConcurrentHashMap<Integer, Map<String, Object>> oldMap = new ConcurrentHashMap<Integer, Map<String, Object>>(
            maxOld);

    /**
     * Generates maximum amount of objects with nested references. Saves some
     * references so they go to the old generation.
     * 
     * @author pouttum
     * 
     */
    public static class GarbageThread extends Thread {

        protected int threadNo;

        public GarbageThread(int threadNo) {
            super();
            this.threadNo = threadNo;
        }

        @Override
        public void run() {

            await();

            // Incremental update phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = getDoc(x);
                    totalOfInterval.incrementAndGet();
                }
            }

            await();

            // Incremental clear phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = null;
                    totalOfInterval.incrementAndGet();
                }
            }

            // Random write / update phase
            for (int r = 0; r < oldToYoungRatios.length; r++) {
                await();
                for (int x = 0; x < objectPerThread; x++) {
                    Map<String, Object> doc = getDoc(x);
                    totalOfInterval.incrementAndGet();
                    if (oldToYoungRatios[r] > 0
                            && (oldToYoungRatios[r] == 1 || (x
                                    % oldToYoungRatios[r] == 0))) {
                        int index = (int) (Math.ceil(random() * maxOld));
                        oldMap.put(index, doc);
                    }
                }
            }

            await();

            // Random read phase
            for (int x = 0; x < objectPerThread; x++) {
                totalOfInterval.incrementAndGet();
                int index = (int) (Math.ceil(random() * maxOld));
                oldMap.get(index);
            }
        }

        protected void await() {
            try {
                barrier.await();
            } catch (Exception e) {
            }
        }

        protected HashMap<String, Object> getDoc(int x) {
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("value1", "value1" + String.valueOf(x));
            doc.put("value2", "value2" + String.valueOf(x));
            doc.put("value3", "value3" + String.valueOf(x));
            doc.put("value4", "value4" + String.valueOf(x));
            doc.put("value5", "value5" + String.valueOf(x));
            doc.put("value6", "value6" + String.valueOf(x));
            doc.put("value7", "value7" + String.valueOf(x));
            doc.put("value8", "value8" + String.valueOf(x));
            doc.put("value9", "value9" + String.valueOf(x));
            return doc;
        }

        protected double random() {
            return Math.random();
        }

    };

    /**
     * Calculates ongoing stats and keeps history on the stat interval.
     * 
     * @author pouttum
     * 
     */
    public static class StatThread extends Thread {

        @Override
        public void run() {
            Date previousDate = new Date();
            long adjStatInterval = statInterval;
            int intervalCount = 0;
            do {
                try {
                    Thread.sleep(adjStatInterval);
                } catch (InterruptedException e) {
                    done = true;
                }
                adjStatInterval = statInterval;
                long intervalTotal = totalOfInterval.getAndSet(0L);
                Date date = new Date();
                double intervalSeconds = (date.getTime() - previousDate
                        .getTime()) / 1000d;

                StringBuilder stats = new StringBuilder(1024);
                float statIntervalDouble = statInterval / 1000f;
                double gcPause = intervalSeconds - statIntervalDouble;
                if (intervalSeconds > statIntervalDouble * 2) {
                    double x = statIntervalDouble * 2;
                    for (; x < intervalSeconds; x += statIntervalDouble) {
                        stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                                configName, phase, ++intervalCount, 0,
                                statIntervalDouble));
                        gcPause -= statIntervalDouble;
                    }
                }
                if (gcPause > 0.0d) { // Credit the next interval with some of
                                        // the count of this interval
                    adjStatInterval -= gcPause * 1000L;
                    long intervalTotalAdj = Math
                            .round((gcPause / statIntervalDouble)
                                    * intervalTotal);
                    intervalTotal -= intervalTotalAdj;
                    totalOfInterval.addAndGet(intervalTotalAdj);
                }
                stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                        configName, phase, ++intervalCount, intervalTotal,
                        Math.max(gcPause, 0.0d)));
                previousDate = date;
                System.out.print(stats.toString());
            } while (!done);
        }

    }

    /**
     * * @param args
     * 
     * @throws InterruptedException
     * */
    public static void main(String[] args) throws Exception {
        if (args.length == 5) {
            configName = args[0];
            statInterval = Integer.parseInt(args[1]);
            numThreads = Integer.parseInt(args[2]);
            objectPerThread = Integer.parseInt(args[3]);
            maxOld = Integer.parseInt(args[4]);
        }
        barrier = new CyclicBarrier(numThreads,
                new Runnable() {
                    @Override
                    public void run() {
                        phase = ExperimentPhase.values()[phaseIdx++];
                    }
        });
        GarbageThread[] threads = new GarbageThread[numThreads];
        for (int x = 0; x < threads.length; x++) {
            threads[x] = new GarbageThread(x);
            threads[x].start();
        }
        StatThread statThread = new StatThread();
        statThread.setPriority(Thread.MAX_PRIORITY);
        statThread.start();
        for (int x = 0; x < threads.length; x++) {
            try {
                threads[x].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        done = true;
        statThread.join();
    }

}

Compile the code:

jdk1.7.0_15/bin/javac GCTest.java

Unix script run.sh:

jdk1.7.0_15/bin/java -server -Xms16g -Xmx16g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg16 1000 24 8000000 800000 > baseline.csv
jdk1.7.0_15/bin/java -server -Xms32g -Xmx32g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg32 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms64g -Xmx64g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg64 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms128g -Xmx128g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg128 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms212g -Xmx212g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest old_ratio_8 1000 24 8000000 800000 >> baseline.csv

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}

Building the Organic Cloud with Amazon APIs and Java

Well, now that we have a high availability and low latency design for our RESTful services, where can we find the weakest link in maintaining our service level agreements?  The answer often lies in the crufty, hidden, and often poorly tested “glue code” lying between (and below) the systems themselves.  This code rears it’s head when a critical notification fails to fire, or a cluster fails to scale automatically as designed. How do we get this code cleaned up, out in the open, and rock solid?  The answer, of course, is to apply agile principles to our infrastructure.

The field of system programming desperately needs an update into the Agile Age!  And why not?  Popular cloud management APIs provide complete command and control via Agile-enabled languages such as Java…  And many efforts are under way to make cloud APIs standardized and open source.  So let’s delve into some details about how we can transform shell script spaghetti into something more workable!

Automated Build and Deployment

System programs, like application programs, should follow the same automated build and deployment process as the rest of the application portfolio.

Automated Unit Test and Mocking

I long to see the day that system programs have automated unit test coverage metrics just like the rest of the civilized world. Let’s test that fail over logic in a nice safe place where it can’t ruin lives!  Think you can’t mock all failure scenarios?  Think again.  With 100% Java APIs available from many cloud providers (not to mention other test-friendly languages) we could impersonate any type of response from the cloud services. Friends don’t let friends code without testing.

Change Management Tied to Code Commits

Let’s all admit it: at one point or another we have all been tempted to cheat and “tweak” things in production.  Often it doesn’t seem possible that a particular change could cause a problem.  However, in some system programming environments “tweaking” has become a way of life because people assume there is no way to replicate the production environment.  In a cloud environment this is simply not the case.  All cloud instances follow a standardized set of options and configurations commanded by a testable language.  If we create a Java program or Python script to build and maintain the production environment, how trivial is it to re-create the same environment?  In this case tying all system program code commits directly to a Jira issue becomes possible, and complete end-to-end management of environment change also becomes possible.  All changes are code changes.  Wow, what a concept!

Testable Components

By building testable components we create layers of abstraction which hide and manage complexity.  We can now confidently build intelligent instances which dynamically configure themselves, hand in hand with the software which runs on them.  Testability makes this not only feasible, but in fact imperative as a competitive advantage.

Organic Cloud Systems

So what happens when we standardize an Agile process for system programs in the same way we do for application programs?  Repeatable results…  And we also largely take the burden of busy work off of infrastructure folks so they can focus on engineering the next generation of great systems rather than doing so much fire fighting at 3 am.  Not to say this solves all of those problems, but most repeatable IT problems have automatable solutions.  Most likely we can automate anything which we can put in a run sheet using cloud APIs.  We then have organic cloud systems which automatically configure themselves and adapt to environmental changes and failure scenarios without sacrificing reliability.

RESTful High Availability and Low Latency on Amazon EC2

For this post, I have condensed some tricks, tips, and techniques for improving availability and latency for RESTful apps on Amazon Elastic Compute Cloud.

Geo-aware Load Balancing

Since Amazon Elastic Load Balancing does not automatically route users to the best region, utilizing a geo-aware IP load balancing service on top of the Amazon ELB tier reduces latency and can also increase availability.  These services route user requests to the closest region automatically, thus reducing network latency. Also, if the entire region is down user requests are routed to another region. Examples include Dyn DNS and Akamai DNS.

Leveraging HTTP Caching

Using RESTful services makes all the rich caching features of HTTP available to your services.  Browser (agent) cache takes direct load off your servers at no cost to you: so you should conduct a thorough review of your HTTP interactions to determine which could be cached.  This can dramatically improve both latency and availability by reducing round trips to the server. Surrogate cache (such as Amazon Cloudfront) operates over a content delivery network (CDN), which replicates content close to the user to reduce latency and increase availability.  You can put Cloudfront in front of any HTTP server to take load off the web server, app server, and db tiers and improve response time for users. Secure (HTTPS) content cannot be cached by a surrogate so it is important to segregate truly secure interactions from shared resources such as images or other rich content which could be shared among users. Also, caching normally requires managing the service URLs to ensure cacheablility (i.e. don’t embed JSESSION_ID in the URL) and customizing the HTTP cache-control headers on app server responses for cacheable responses.

Throttle Back and Fail-fast

When interacting with a load balancer, use the HTTP 503 Service unavailable response code to immediately cause the load balancer to re-submit the request to another server instance. This allows throttling back the load proactively before a downstream server or other component fails, and allows the application to control how much load it is willing to take on at any given moment.  This is also called “fail-fast”, since we can throttle back the request immediately, without waiting for a server or connection time out.  In order to take advantage of throttling we need to ensure that we process requests asynchronously and monitor the queue depth or request backlog.  Then we have to have a tunable configuration point were we can control when the application throttles back.  This technique improves availability and latency in the following areas:

  1. Under heavy load and spikes, thottling allows the architecture to respond proactively to potential overload rather than waiting around for something bad to happen.
  2. Throttling back tells the load balancer exactly when to send a request to another node, instead of having it perform guess-work based on CPU, network traffic, or request latency.  This reduces availability problems which occur when the load balancer guesses wrong or has the wrong configuration.
  3. The front end web application can control throttle back based on implicit knowledge of the end-to-end architecture which the load balancer cannot possibly have.

In order to implement throttling correctly we need to ensure all requests are either idempotent or queued.  An idempotent request is implicitly cachable and could be served from another layer such as a CDN.  Anything which is not idempotent must be queued.  Non-idempotent requests implicitly take more time and resources to execute and may involve stateful interactions with a high overhead.  Using a reliable and scalable managed queuing mechanism such as Amazon SQS ensures that the queue itself is not a point of failure.

Turn Off Sessioning

Most RESTful services will not require tracking or persisting HTTP session information.  In fact, for scalability purposes REST explicitly defines a “Stateless” constraint to prevent cross-request state from interfering with web server scalability.  To disable HTTP sessions for a Tomcat webapp, add a context.xml under your webapp/META-INF (not WEB-INF) with the following contents.  This will work even with Elastic Beanstalk applications, since Beanstalk uses Tomcat:

<?xml version='1.0' encoding='utf-8'?>
<Context cookies="false" cacheTTL="0">

  <!-- Default set of monitored resources -->
  <WatchedResource>WEB-INF/web.xml</WatchedResource>

  <!-- Uncomment this to disable session persistence across restarts -->
  <Manager pathname="" />

</Context>

Redirect After Post

Many applications I’ve seen still serve a response body with a POST, PUT, or DELETE request.  This violates REST principals, and removes a needed separation of concerns.  It can effect latency and availability since a response from a POST, PUT, and DELETE are not cachable.  It can also allow the user to refresh the page and unknowingly re-submit the transaction again.  To avoid these problems, utilize the Redirect After Post pattern. As mentioned under the throttling section above, the POST, PUT, or DELETE can issue a message to the queue. This returns immediately, and should redirect to a status page, or redirect to the next page in the user interaction flow. The status page can automatically refresh periodically until the request is done, or we can just move on with the flow. It is counter productive to latency and availability to keep a HTTP connection open while a (potentially long running) state change completes, and there is no reason to keep the user waiting either.

Utilize Application Tier Cache

The ephemeral nature of cloud instances means that anything in memory or local disk could disappear at any moment.  However, this does not mean we should avoid in-memory caching in the application tier.  If the instance goes down, ELB guarantees that in-process requests go to another instance.  Therefore, it is no problem to use application tier caches just like you would in any JEE deployment.  Remember, any single server is by nature ephemeral.  The cloud does nothing to change this.  Someone could spill coffee on the server at any time.  The only thing that changes with the cloud is that the person spilling the coffee works for Amazon, so you have the potential to get your money back!

Shard

If possible, shard your data in a way that evenly distributes load.  Not all businesses can shard their data; however, if you provide a user-centric service which doesn’t often mix data between users (like Facebook), then sharding may be your availability and latency coup de grâce.  Set up a series of sub-domains, or RDS databases, etc.  Then, in the user profile, store the assigned resource name.  If you assign the resources intelligently you can service any number of users with a continuous level of latency and availability, as long as users don’t cross geo regions.  If users cross regions then there will be some overhead for that user due to cross-region communication.  However, at a later point if this becomes a problem you can create a routine to detect when a user changes regions and migrate their data to the new region.

Using RDS without Sharding?  Try Read Replicas

If you do not need absolute read consistency and need to scale out reads Amazon RDS allows creation of an infinite number of read replicas. Read away!

PS: make sure to create a multi-AZ deployment because then the replication will run off the Standby Database and not place load on the transactional instance.

Batch SQS Messages

For many applications, the latency associated with Simple Queue Service is unacceptable.  In this case, try batching multiple incoming requests into one SQS message.  SQS scales out to larger messages so this should reduce the latency of your service end point if you can find the “sweet spot” for SQS message size.  I recently bench-marked SQS on an m1.large instance and found that queue through-put with 8KB messages was 393 messages per second.  That single-threaded performance benchmark will scale linearly across multiple threads.  So by batching SQS requests a REST service with a request size of 1024 bytes SQS will provide service to over 3000 requests per second with a single thread processing the SQS enqueues.

Change Piecemeal Changes to Bulk Changes

If you followed the advice above about queuing changes in SQS, then you will find a queue full of changes to process instead of a slew of piecemeal changes.  If these changes need to be applied to an RDS MySQL instance, then considerable performance gains could be made via applying change using batch JDBC. Overall, the database latency and availability improve with batch operations due to batches incurring less frequent network hits and transaction commits compared to piecemeal changes.

Closing Comment

Hopefully the techniques above will help you build faster, more available RESTful applications on the Amazon EC2 platform.  If you have any questions, comments, or feedback please feel free to comment the post!

Hadoop Jobs Grow 175,000% since 2006

According to Indeed, we have seen stellar growth of 175,000% in Hadoop job listings since 2006. The trend curve shows consistent, long term growth prospects:
Hadoop Relative Job Growth circa 2006

Given the comparatively lackluster job growth performance of back end technology mainstays like Oracle (30%), IBM (29%), and Microsoft (49%), it looks like back end engineers may need to start catching the wave…

Oracle Relative Job Growth circa 2006

IBM Relative Job Growth circa 2006

Microsoft Relative Job Growth circa 2006

Cascading TapsMap and SubAssembly: Best Practices

Cascading Pipes provide a lot of power to the experienced user due to all the things they take care of automatically; however, the down side of this is that we can easily do things the hard way without knowing it. This can effect the performance, maintainability, re-usability of flows and can trigger the need for significant re-factoring later on. One best practice to look at is how to use the taps map and SubAssembly together to provide separation of concerns between the taps (storing the data) and the pipes (processing the data). Using taps map and SubAssembly properly produces fully re-usable pipes completely independent of the input and output data formats.

Flow Before Taps Map and SubAssembly

So as we can see from the flow above, we are selecting employees with salary < 1000 and counting them by department. The code might look something like this:

//Define source tap
Tap source =
    new Hfs(new TextDelimited(new Fields("emp_name",
                                         "salary",
                                         "dept_id"),
                              "\t"),
            "emp.tsv"
    );

//Wire the pipe
Pipe pipe = new Each("emp",
                     new Fields("salary"),
                     new ExpressionFilter("salary < 1000",
                                          Double.class));
pipe = new GroupBy(pipe, new Fields("dept_id"));
pipe = new Every(pipe, new Count());

//Define the sink tap
Tap sink =
    new Hfs(new TextDelimited(new Fields("dept_id",
                                         "count"),
                              "\t"),
            "output",
            true
    );

//Create the flow
Flow flow = new FlowConnector().connect(source,
                                        sink,
                                        pipe);

//Run the flow
flow.complete();

While the code is simple enough, it suffers from several issues with performance, re-usability, and maintainability:

  1. Supports only TSV as the input and output format
  2. It assumes the URLs of the input as “emp.tsv”
  3. Output directory is assumed to be “output”
  4. Field names are hard-coded
  5. There is no way to wire the pipe into another pipes except by serializing input and output files (via a Cascade)

Passing parameters or configuration to the job could resolve first four issues, however the last issue produces large performance penalties and has no easy solution without some major re-factoring.

Suggested Re-factoring

By implementing a SubAssembly and utilizing a Taps Map, we can dramatically improve both the maintainability and performance of the code.

Implementing a SubAssembly

Cascading SubAssembly PipePlease note the following when creating the SubAssembly:

  1. Provide a default constructor with default tap names.
  2. Create String constants for all head and tail pipes for use in the TapsMap.
  3. Provide a secondary constructor to allow linking and re-using multiple SubAssemblies.

Example SubAssembly code:

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * Abstracts Pipe from Taps.
 * @author mpouttuclarke
 *
 */
public class EmpSubAssembly
 extends SubAssembly
{
 public static final String INPUT_PIPE_NAME = "input";
 public static final String OUTPUT_PIPE_NAME = "output";
 private static final long serialVersionUID = 1L;

 /**
 * Default constructor.
 */
 public EmpSubAssembly()
 {
   this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);
 }

 /**
 * Provides a way to rename the head and tail pipes.
 * @param input
 */
 public EmpSubAssembly(Pipe input, String tailName)
 {
   super();

   // Wire the pipe
   Pipe pipe = new Each(input,
                        new Fields("salary"),
                        new ExpressionFilter("salary < 1000",
                                             Double.class));
   pipe = new GroupBy(pipe, new Fields("dept_id"));
   pipe = new Every(pipe, new Count());
   pipe = new Pipe(tailName, pipe); //Name each tail

   setTails(pipe);
 }

}

Binding Taps with a TapsMap

Now we can bind the taps with a TapsMap, allowing separation of concerns between the data locations and format and the pipe. This means we can re-wire the pipe SubAssembly with any other pipes or SubAssemblies we wish, without effecting performance of the application as a whole.

Example of a TapsMap:

 //Define source tap
 Tap source =
   new Hfs(new TextDelimited(new Fields("emp_name",
                                        "salary",
                                        "dept_id"),
           "\t"),
           "emp.tsv"
 );

 //Define the sink
 Tap sink =
   new Hfs(new TextDelimited(new Fields("dept_id",
                                        "count"),
           "\t"),
           "output",
           true
 );

 //Create input taps map (same for output if multiple tail pipes)
 Map tapsMap = new HashMap();
 tapsMap.put(EmpSubAssembly.INPUT_PIPE_NAME, source);

 //SubAssembly
 Pipe pipe = new EmpSubAssembly();

 //Create the flow
 Flow flow =
   new FlowConnector().connect(tapsMap,
                               sink,
                               pipe);

 //Run the flow
 flow.complete()

Results of the Re-factoring

After the changes, we can take full advantage of the Flow auto-wiring capabilities of Cascading:

  1. We can re-wire and re-use any number of pipes and SubAssemblies in a single Flow.
  2. We can change the data locations and formats at will without effecting the functionality of the pipes.
  3. Cascading takes care of mapping the taps, no matter how deeply nested within multiple SubAssemblies the heads and tails are.
  4. We can re-name the pipes at will to prevent naming conflicts across a given Flow.

Solution to Tag Soup

iCrossing has open sourced a JavaScript tag container designed to centralize management of multiple browser tracking vendors (such as Omniture, amoung others).  Most sites have multiple tracking vendors, creating a ‘tag soup’ on their pages and effecting load times. Connectag addresses this by switching vendor tags on and off via a centralized configuration file. Check it out…