Titan: Cassandra vs. Hazelcast persistence benchmark

10 node load test comparison using Amazon EC2 SSD-based instances.  1 billion vertices and 1 billion edges processed for each test run.  Used the titan-loadtest project to run each test.

Method

Experiment maximizes data locality by co-locating load generation, Titan graph database, and Cassandra/Hazelcast within the same JVM instance while partitioning data across a cluster. Exploration of methods for tuning garbage collection, Titan, and Cassandra for the peer computing use case.

The following components were utilized during the experiment:

Technology Version
RHEL x64 HVM AMI 6.4
Oracle JDK x64 1.7_45
Apache Cassandra 1.2.9
Hazelcast 3.1.1
Titan 0.3.2

Each test iteration has 6 read ratio phases starting with 0% reads (100% writes) all the way up to 90% reads and 10% writes.  For all tests, the persistence implementation executes in the same JVM as Titan to avoid unnecessary context switching and serialization overhead.  Tests were conducted using an Amazon placement group to ensure instances resided on the same subnet.  The storage was formatted with 4K blocks and used the noop scheduler to improve latency.

For each phase, new vertices were added with one edge linking back to a previous vertex.  No tests of update or delete were conducted.

Please see the titan-loadtest project above for all Cassandra and Hazelcast settings and configurations used in the test.

Results

Titan 10 node Summary

Titan 10 node Summary

Please note: the results are listed in rates of thousands of vertices per second and include the creation of an edge as well as a vertex. Also, the Hazelcast SSD x1 results used a custom flash storage module for Hazelcast developed privately so those results are not replicable without that module installed.

Conclusions

Hazelcast performed better than Cassandra for all tests and demonstrated one order of magnitude better performance on reads.  Surprisingly, Hazelcast slightly outperformed Cassandra for writes as well.

Large Java Heap with the G1 Collector – Part 1

Demonstration of efficient garbage collection on JVM heap sizes in excess of 128 gigabytes.  Garbage collection behavior analyzed via a mini-max optimization strategy.  Study measures maximum throughput versus minimum garbage collection pause to find optimization “sweet spot” across experimental phases.  Replicable results via well-defined open source experimental methods executable on Amazon EC2 hardware. 

Experimental Method

Goals

  • Demonstrate maximum feasible JVM size on current cloud hardware (specific to Amazon for now) using the G1 Garbage Collector (link).
  • Vary the JVM heap size (-Xmx) exponentially to find performance profile and breaking points.
  • Vary the ratio of new versus old generation objects exponentially.
  • Using in-memory workload to stress the JVM (avoids network or disk waits).
  • Produce replicable results on commodity hardware, open source operating systems, and open source tools.
  • Provide gap-free data for analysis, in spite of garbage collection pauses.

Not (Yet) in Scope

In followup to this study, subsequent efforts may include:

  • Vary the number of old objects up to maximum possible memory capacity.
  • Vary the number of processing threads to starve the concurrent G1 algorithm of CPU cycles.
  • Vary the object size.
  • Vary the field size within the object.
  • Vary the G1 JVM parameters.

Tools and Versions

  1. Amazon Linux AMI (link)
  2. One cr1.8xlarge instance (link): 244 GiB RAM, 2 x Intel Xeon E5-2670 (link)
  3. Oracle JDK 1.7_15

JVM Parameters

This experiment varied the JVM heap size but kept other parameters constant.

  • -Xmx16g, -Xmx32g, -Xmx64g, -Xmx128g, -Xmx212g
  • -XX:+UseG1GC
  • -XX:InitiatingHeapOccupancyPercent=0
  • -XX:MaxGCPauseMillis=200
  • -XX:MaxTenuringThreshold=25
  • -XX:ParallelGCThreads=32
  • -XX:ConcGCThreads=32
  • -XX:G1ReservePercent=10
  • -XX:G1HeapRegionSize=32m

Program Parameters

These parameters were kept constant for this study.

  • statInterval = 1000: the reporting interval for measures in milliseconds.
  • numThreads = 24: number of processing threads.
  • objectsPerThread = 8,000,000: number of objects to produce for each phase.
  • maxOld = 800,000: max number of old references to maintain, after which old references are overwritten with new values.

Experimental Phases

For each JVM heap size, the following phases are executed.  For each phase, after maxOld reached object references get overwritten and must be garbage collected.

  • RANDOM_WRITE_0: All of the objects produced are instant garbage, references not kept.
  • RANDOM_WRITE_1: All of the objects references kept.
  • RANDOM_WRITE_2: 1/2 of object references kept.
  • RANDOM_WRITE_4: 1/4 of object references kept.
  • RANDOM_WRITE_8: 1/8 of object references kept.

Experiment Results

Sum of Garbage Collection Pause

Sum of Garbage Collection Pause

The graph above shows the sum total of *all* garbage collection pauses across each experimental phase.

Min of Object Throughput

Min of Object Throughput (per second)

The graph above shows the *min* throughput for each second across each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)

Min of Throughput to Max of Garbage Collection Pause (per second)

The line chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)

Min of Throughput to Max of Garbage Collection Pause (per second)

The radar chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Addenda

Java garbage collection test class:

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

public class GCTest {

    public static String configName = "Test";
    public static int statInterval = 1000;
    public static int numThreads = 4;
    public static int objectPerThread = 500000;
    public static int maxOld = 1000000;

    public static enum ExperimentPhase {
        SEQUENTIAL_WRITE, SEQUENTIAL_CLEAR, RANDOM_WRITE_0, RANDOM_WRITE_8, RANDOM_WRITE_4, RANDOM_WRITE_2, RANDOM_WRITE_1, RANDOM_READ
    };

    public static int phaseIdx;
    public static ExperimentPhase phase;

    public static CyclicBarrier barrier;

    public static int[] oldToYoungRatios = new int[] { 0, 8, 4, 2, 1 };

    public static volatile boolean done = false;

    public static AtomicLong totalOfInterval = new AtomicLong();

    public static Object[] oldArr = new Object[maxOld];

    public static ConcurrentHashMap<Integer, Map<String, Object>> oldMap = new ConcurrentHashMap<Integer, Map<String, Object>>(
            maxOld);

    /**
     * Generates maximum amount of objects with nested references. Saves some
     * references so they go to the old generation.
     * 
     * @author pouttum
     * 
     */
    public static class GarbageThread extends Thread {

        protected int threadNo;

        public GarbageThread(int threadNo) {
            super();
            this.threadNo = threadNo;
        }

        @Override
        public void run() {

            await();

            // Incremental update phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = getDoc(x);
                    totalOfInterval.incrementAndGet();
                }
            }

            await();

            // Incremental clear phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = null;
                    totalOfInterval.incrementAndGet();
                }
            }

            // Random write / update phase
            for (int r = 0; r < oldToYoungRatios.length; r++) {
                await();
                for (int x = 0; x < objectPerThread; x++) {
                    Map<String, Object> doc = getDoc(x);
                    totalOfInterval.incrementAndGet();
                    if (oldToYoungRatios[r] > 0
                            && (oldToYoungRatios[r] == 1 || (x
                                    % oldToYoungRatios[r] == 0))) {
                        int index = (int) (Math.ceil(random() * maxOld));
                        oldMap.put(index, doc);
                    }
                }
            }

            await();

            // Random read phase
            for (int x = 0; x < objectPerThread; x++) {
                totalOfInterval.incrementAndGet();
                int index = (int) (Math.ceil(random() * maxOld));
                oldMap.get(index);
            }
        }

        protected void await() {
            try {
                barrier.await();
            } catch (Exception e) {
            }
        }

        protected HashMap<String, Object> getDoc(int x) {
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("value1", "value1" + String.valueOf(x));
            doc.put("value2", "value2" + String.valueOf(x));
            doc.put("value3", "value3" + String.valueOf(x));
            doc.put("value4", "value4" + String.valueOf(x));
            doc.put("value5", "value5" + String.valueOf(x));
            doc.put("value6", "value6" + String.valueOf(x));
            doc.put("value7", "value7" + String.valueOf(x));
            doc.put("value8", "value8" + String.valueOf(x));
            doc.put("value9", "value9" + String.valueOf(x));
            return doc;
        }

        protected double random() {
            return Math.random();
        }

    };

    /**
     * Calculates ongoing stats and keeps history on the stat interval.
     * 
     * @author pouttum
     * 
     */
    public static class StatThread extends Thread {

        @Override
        public void run() {
            Date previousDate = new Date();
            long adjStatInterval = statInterval;
            int intervalCount = 0;
            do {
                try {
                    Thread.sleep(adjStatInterval);
                } catch (InterruptedException e) {
                    done = true;
                }
                adjStatInterval = statInterval;
                long intervalTotal = totalOfInterval.getAndSet(0L);
                Date date = new Date();
                double intervalSeconds = (date.getTime() - previousDate
                        .getTime()) / 1000d;

                StringBuilder stats = new StringBuilder(1024);
                float statIntervalDouble = statInterval / 1000f;
                double gcPause = intervalSeconds - statIntervalDouble;
                if (intervalSeconds > statIntervalDouble * 2) {
                    double x = statIntervalDouble * 2;
                    for (; x < intervalSeconds; x += statIntervalDouble) {
                        stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                                configName, phase, ++intervalCount, 0,
                                statIntervalDouble));
                        gcPause -= statIntervalDouble;
                    }
                }
                if (gcPause > 0.0d) { // Credit the next interval with some of
                                        // the count of this interval
                    adjStatInterval -= gcPause * 1000L;
                    long intervalTotalAdj = Math
                            .round((gcPause / statIntervalDouble)
                                    * intervalTotal);
                    intervalTotal -= intervalTotalAdj;
                    totalOfInterval.addAndGet(intervalTotalAdj);
                }
                stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                        configName, phase, ++intervalCount, intervalTotal,
                        Math.max(gcPause, 0.0d)));
                previousDate = date;
                System.out.print(stats.toString());
            } while (!done);
        }

    }

    /**
     * * @param args
     * 
     * @throws InterruptedException
     * */
    public static void main(String[] args) throws Exception {
        if (args.length == 5) {
            configName = args[0];
            statInterval = Integer.parseInt(args[1]);
            numThreads = Integer.parseInt(args[2]);
            objectPerThread = Integer.parseInt(args[3]);
            maxOld = Integer.parseInt(args[4]);
        }
        barrier = new CyclicBarrier(numThreads,
                new Runnable() {
                    @Override
                    public void run() {
                        phase = ExperimentPhase.values()[phaseIdx++];
                    }
        });
        GarbageThread[] threads = new GarbageThread[numThreads];
        for (int x = 0; x < threads.length; x++) {
            threads[x] = new GarbageThread(x);
            threads[x].start();
        }
        StatThread statThread = new StatThread();
        statThread.setPriority(Thread.MAX_PRIORITY);
        statThread.start();
        for (int x = 0; x < threads.length; x++) {
            try {
                threads[x].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        done = true;
        statThread.join();
    }

}

Compile the code:

jdk1.7.0_15/bin/javac GCTest.java

Unix script run.sh:

jdk1.7.0_15/bin/java -server -Xms16g -Xmx16g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg16 1000 24 8000000 800000 > baseline.csv
jdk1.7.0_15/bin/java -server -Xms32g -Xmx32g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg32 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms64g -Xmx64g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg64 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms128g -Xmx128g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg128 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms212g -Xmx212g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest old_ratio_8 1000 24 8000000 800000 >> baseline.csv

Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}

Building the Organic Cloud with Amazon APIs and Java

Well, now that we have a high availability and low latency design for our RESTful services, where can we find the weakest link in maintaining our service level agreements?  The answer often lies in the crufty, hidden, and often poorly tested “glue code” lying between (and below) the systems themselves.  This code rears it’s head when a critical notification fails to fire, or a cluster fails to scale automatically as designed. How do we get this code cleaned up, out in the open, and rock solid?  The answer, of course, is to apply agile principles to our infrastructure.

The field of system programming desperately needs an update into the Agile Age!  And why not?  Popular cloud management APIs provide complete command and control via Agile-enabled languages such as Java…  And many efforts are under way to make cloud APIs standardized and open source.  So let’s delve into some details about how we can transform shell script spaghetti into something more workable!

Automated Build and Deployment

System programs, like application programs, should follow the same automated build and deployment process as the rest of the application portfolio.

Automated Unit Test and Mocking

I long to see the day that system programs have automated unit test coverage metrics just like the rest of the civilized world. Let’s test that fail over logic in a nice safe place where it can’t ruin lives!  Think you can’t mock all failure scenarios?  Think again.  With 100% Java APIs available from many cloud providers (not to mention other test-friendly languages) we could impersonate any type of response from the cloud services. Friends don’t let friends code without testing.

Change Management Tied to Code Commits

Let’s all admit it: at one point or another we have all been tempted to cheat and “tweak” things in production.  Often it doesn’t seem possible that a particular change could cause a problem.  However, in some system programming environments “tweaking” has become a way of life because people assume there is no way to replicate the production environment.  In a cloud environment this is simply not the case.  All cloud instances follow a standardized set of options and configurations commanded by a testable language.  If we create a Java program or Python script to build and maintain the production environment, how trivial is it to re-create the same environment?  In this case tying all system program code commits directly to a Jira issue becomes possible, and complete end-to-end management of environment change also becomes possible.  All changes are code changes.  Wow, what a concept!

Testable Components

By building testable components we create layers of abstraction which hide and manage complexity.  We can now confidently build intelligent instances which dynamically configure themselves, hand in hand with the software which runs on them.  Testability makes this not only feasible, but in fact imperative as a competitive advantage.

Organic Cloud Systems

So what happens when we standardize an Agile process for system programs in the same way we do for application programs?  Repeatable results…  And we also largely take the burden of busy work off of infrastructure folks so they can focus on engineering the next generation of great systems rather than doing so much fire fighting at 3 am.  Not to say this solves all of those problems, but most repeatable IT problems have automatable solutions.  Most likely we can automate anything which we can put in a run sheet using cloud APIs.  We then have organic cloud systems which automatically configure themselves and adapt to environmental changes and failure scenarios without sacrificing reliability.

How to Run an Elastic MapReduce Job Using the Java SDK

The current Amazon EMR documentation provides a lot of coverage of how to run a Hadoop custom jar using the AWS management console. This post covers how you do the same thing directly using the EC2 SDK for Java on Eclipse.

Prerequisites

To run the sample code, you need to set up and/or have knowledge of the following:

Running the Example Code

In order to run the example code below, please follow these setup steps:

  1. In Eclipse, select File -> New -> Other, and then search for AWS Java Project.
  2. Click Next, select a project name, select any other examples you want (S3, etc.) and enter your AWS Credentials.
  3. Click Next, then Finish.
  4. When the new project opens, right click on the (default package) -> New -> Class
  5. Enter the class name “ElasticMapReduceApp” and click Finish.
  6. Copy and paste the sample code below into the new class.
  7. Replace the string “your-bucket-name” with your S3 bucket name.
  8. Run the class.  It will report status as it runs.

After the run, the job output and job logs should appear as sub-directories under your S3 bucket.

Sample Code

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;
import com.amazonaws.services.elasticmapreduce.model.JobFlowExecutionState;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;

/**
 * Run the Amazon Cloudburst example directly using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class ElasticMapReduceApp
{

    private static final String HADOOP_VERSION = "0.20";
    private static final int INSTANCE_COUNT = 1;
    private static final String INSTANCE_TYPE = InstanceType.M1Large.toString();
    private static final UUID RANDOM_UUID = UUID.randomUUID();
    private static final String FLOW_NAME = "cloudburst-" + RANDOM_UUID.toString();
    private static final String BUCKET_NAME = "your-bucket-name";
    private static final String S3N_HADOOP_JAR =
        "s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar";
    private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/";
    private static final String[] JOB_ARGS =
        new String[] { "s3n://elasticmapreduce/samples/cloudburst/input/s_suis.br",
                      "s3n://elasticmapreduce/samples/cloudburst/input/100k.br",
                      "s3n://" + BUCKET_NAME + "/" + FLOW_NAME, "36", "3", "0",
                      "1",
                      "240",
                      "48",
                      "24", "24", "128", "16" };
    private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
    private static final List<JobFlowExecutionState> DONE_STATES = Arrays
        .asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED,
                                             JobFlowExecutionState.FAILED,
                                             JobFlowExecutionState.TERMINATED });
    static AmazonElasticMapReduce emr;

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));

        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args) throws Exception {

        System.out.println("===========================================");
        System.out.println("Welcome to the Elastic Map Reduce!");
        System.out.println("===========================================");

        init();

        try {
            // Configure instances to use
            JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
            System.out.println("Using EMR Hadoop v" + HADOOP_VERSION);
            instances.setHadoopVersion(HADOOP_VERSION);
            System.out.println("Using instance count: " + INSTANCE_COUNT);
            instances.setInstanceCount(INSTANCE_COUNT);
            System.out.println("Using master instance type: " + INSTANCE_TYPE);
            instances.setMasterInstanceType(INSTANCE_TYPE);
            System.out.println("Using slave instance type: " + INSTANCE_TYPE);
            instances.setSlaveInstanceType(INSTANCE_TYPE);

            // Configure the job flow
            System.out.println("Configuring flow: " + FLOW_NAME);
            RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
            System.out.println("\tusing log URI: " + S3N_LOG_URI);
            request.setLogUri(S3N_LOG_URI);

            // Configure the Hadoop jar to use
            System.out.println("\tusing jar URI: " + S3N_HADOOP_JAR);
            HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
            System.out.println("\tusing args: " + ARGS_AS_LIST);
            jarConfig.setArgs(ARGS_AS_LIST);
            StepConfig stepConfig =
                new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),
                               jarConfig);
            request.setSteps(Arrays.asList(new StepConfig[] { stepConfig }));

            //Run the job flow
            RunJobFlowResult result = emr.runJobFlow(request);

            //Check the status of the running job
            String lastState = "";
            STATUS_LOOP: while (true)
            {
                DescribeJobFlowsRequest desc =
                    new DescribeJobFlowsRequest(
                                                Arrays.asList(new String[] { result.getJobFlowId() }));
                DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
                for (JobFlowDetail detail : descResult.getJobFlows())
                {
                    String state = detail.getExecutionStatusDetail().getState();
                    if (isDone(state))
                    {
                        System.out.println("Job " + state + ": " + detail.toString());
                        break STATUS_LOOP;
                    }
                    else if (!lastState.equals(state))
                    {
                        lastState = state;
                        System.out.println("Job " + state + " at " + new Date().toString());
                    }
                }
                Thread.sleep(10000);
            }
        } catch (AmazonServiceException ase) {
                System.out.println("Caught Exception: " + ase.getMessage());
                System.out.println("Reponse Status Code: " + ase.getStatusCode());
                System.out.println("Error Code: " + ase.getErrorCode());
                System.out.println("Request ID: " + ase.getRequestId());
        }
    }

    /**
     * @param state
     * @return
     */
    public static boolean isDone(String value)
    {
        JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
        return DONE_STATES.contains(state);
    }
}

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  }
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)
1
2
3

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.

RESTful High Availability and Low Latency on Amazon EC2

For this post, I have condensed some tricks, tips, and techniques for improving availability and latency for RESTful apps on Amazon Elastic Compute Cloud.

Geo-aware Load Balancing

Since Amazon Elastic Load Balancing does not automatically route users to the best region, utilizing a geo-aware IP load balancing service on top of the Amazon ELB tier reduces latency and can also increase availability.  These services route user requests to the closest region automatically, thus reducing network latency. Also, if the entire region is down user requests are routed to another region. Examples include Dyn DNS and Akamai DNS.

Leveraging HTTP Caching

Using RESTful services makes all the rich caching features of HTTP available to your services.  Browser (agent) cache takes direct load off your servers at no cost to you: so you should conduct a thorough review of your HTTP interactions to determine which could be cached.  This can dramatically improve both latency and availability by reducing round trips to the server. Surrogate cache (such as Amazon Cloudfront) operates over a content delivery network (CDN), which replicates content close to the user to reduce latency and increase availability.  You can put Cloudfront in front of any HTTP server to take load off the web server, app server, and db tiers and improve response time for users. Secure (HTTPS) content cannot be cached by a surrogate so it is important to segregate truly secure interactions from shared resources such as images or other rich content which could be shared among users. Also, caching normally requires managing the service URLs to ensure cacheablility (i.e. don’t embed JSESSION_ID in the URL) and customizing the HTTP cache-control headers on app server responses for cacheable responses.

Throttle Back and Fail-fast

When interacting with a load balancer, use the HTTP 503 Service unavailable response code to immediately cause the load balancer to re-submit the request to another server instance. This allows throttling back the load proactively before a downstream server or other component fails, and allows the application to control how much load it is willing to take on at any given moment.  This is also called “fail-fast”, since we can throttle back the request immediately, without waiting for a server or connection time out.  In order to take advantage of throttling we need to ensure that we process requests asynchronously and monitor the queue depth or request backlog.  Then we have to have a tunable configuration point were we can control when the application throttles back.  This technique improves availability and latency in the following areas:

  1. Under heavy load and spikes, thottling allows the architecture to respond proactively to potential overload rather than waiting around for something bad to happen.
  2. Throttling back tells the load balancer exactly when to send a request to another node, instead of having it perform guess-work based on CPU, network traffic, or request latency.  This reduces availability problems which occur when the load balancer guesses wrong or has the wrong configuration.
  3. The front end web application can control throttle back based on implicit knowledge of the end-to-end architecture which the load balancer cannot possibly have.

In order to implement throttling correctly we need to ensure all requests are either idempotent or queued.  An idempotent request is implicitly cachable and could be served from another layer such as a CDN.  Anything which is not idempotent must be queued.  Non-idempotent requests implicitly take more time and resources to execute and may involve stateful interactions with a high overhead.  Using a reliable and scalable managed queuing mechanism such as Amazon SQS ensures that the queue itself is not a point of failure.

Turn Off Sessioning

Most RESTful services will not require tracking or persisting HTTP session information.  In fact, for scalability purposes REST explicitly defines a “Stateless” constraint to prevent cross-request state from interfering with web server scalability.  To disable HTTP sessions for a Tomcat webapp, add a context.xml under your webapp/META-INF (not WEB-INF) with the following contents.  This will work even with Elastic Beanstalk applications, since Beanstalk uses Tomcat:

<?xml version='1.0' encoding='utf-8'?>
<Context cookies="false" cacheTTL="0">

  <!-- Default set of monitored resources -->
  <WatchedResource>WEB-INF/web.xml</WatchedResource>

  <!-- Uncomment this to disable session persistence across restarts -->
  <Manager pathname="" />

</Context>

Redirect After Post

Many applications I’ve seen still serve a response body with a POST, PUT, or DELETE request.  This violates REST principals, and removes a needed separation of concerns.  It can effect latency and availability since a response from a POST, PUT, and DELETE are not cachable.  It can also allow the user to refresh the page and unknowingly re-submit the transaction again.  To avoid these problems, utilize the Redirect After Post pattern. As mentioned under the throttling section above, the POST, PUT, or DELETE can issue a message to the queue. This returns immediately, and should redirect to a status page, or redirect to the next page in the user interaction flow. The status page can automatically refresh periodically until the request is done, or we can just move on with the flow. It is counter productive to latency and availability to keep a HTTP connection open while a (potentially long running) state change completes, and there is no reason to keep the user waiting either.

Utilize Application Tier Cache

The ephemeral nature of cloud instances means that anything in memory or local disk could disappear at any moment.  However, this does not mean we should avoid in-memory caching in the application tier.  If the instance goes down, ELB guarantees that in-process requests go to another instance.  Therefore, it is no problem to use application tier caches just like you would in any JEE deployment.  Remember, any single server is by nature ephemeral.  The cloud does nothing to change this.  Someone could spill coffee on the server at any time.  The only thing that changes with the cloud is that the person spilling the coffee works for Amazon, so you have the potential to get your money back!

Shard

If possible, shard your data in a way that evenly distributes load.  Not all businesses can shard their data; however, if you provide a user-centric service which doesn’t often mix data between users (like Facebook), then sharding may be your availability and latency coup de grâce.  Set up a series of sub-domains, or RDS databases, etc.  Then, in the user profile, store the assigned resource name.  If you assign the resources intelligently you can service any number of users with a continuous level of latency and availability, as long as users don’t cross geo regions.  If users cross regions then there will be some overhead for that user due to cross-region communication.  However, at a later point if this becomes a problem you can create a routine to detect when a user changes regions and migrate their data to the new region.

Using RDS without Sharding?  Try Read Replicas

If you do not need absolute read consistency and need to scale out reads Amazon RDS allows creation of an infinite number of read replicas. Read away!

PS: make sure to create a multi-AZ deployment because then the replication will run off the Standby Database and not place load on the transactional instance.

Batch SQS Messages

For many applications, the latency associated with Simple Queue Service is unacceptable.  In this case, try batching multiple incoming requests into one SQS message.  SQS scales out to larger messages so this should reduce the latency of your service end point if you can find the “sweet spot” for SQS message size.  I recently bench-marked SQS on an m1.large instance and found that queue through-put with 8KB messages was 393 messages per second.  That single-threaded performance benchmark will scale linearly across multiple threads.  So by batching SQS requests a REST service with a request size of 1024 bytes SQS will provide service to over 3000 requests per second with a single thread processing the SQS enqueues.

Change Piecemeal Changes to Bulk Changes

If you followed the advice above about queuing changes in SQS, then you will find a queue full of changes to process instead of a slew of piecemeal changes.  If these changes need to be applied to an RDS MySQL instance, then considerable performance gains could be made via applying change using batch JDBC. Overall, the database latency and availability improve with batch operations due to batches incurring less frequent network hits and transaction commits compared to piecemeal changes.

Closing Comment

Hopefully the techniques above will help you build faster, more available RESTful applications on the Amazon EC2 platform.  If you have any questions, comments, or feedback please feel free to comment the post!

Follow

Get every new post delivered to your Inbox.