Tag Archives: Inverse Rest

Scaling across data by moving code and small data close to big data

Feel the Beam! Google Casts a new Light on Big Data

using-lasers-to-preserve-mt-rushmoreApache Beam from Google finally provides robust unification of batch and real-time Big Data.  This framework replaced MapReduce, FlumeJava, and Millwheel at Google.  Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.  Anyone else seeing the future of Big Data in a new light?  I know I am…

Academic underpinning: http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Google’s comparison of Apache Beam vs. Apache Spark: https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison

Why I tried Apache Spark, and moved on..

I tried Apache Spark, and moved on.  Here’s why:

Resourcing Issuesspark-do-not-enter

Apache Spark, written in Scala, causes severe resourcing issues for customers due to the additional technical skill requirements:

  1. Scala ranks #30 with 0.5 % in market saturation, while Java ranks #1 with 21.5% of the market, a difference of 4300%: http://www.tiobe.com/index.php/content/paperinfo/tpci/index.html
  2. Introduction of native Functional Programming constructs into the Java language with release 1.8 practically eliminates the business case for Scala altogether: https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html
  3. Scala works best with IntelliJ Idea IDE, which has licensing costs and is extremely unlikely to replace free Eclipse tooling at any large company
  4. Scala is among a crowd of strong contenders and faces a moving target as Java has gained 5% in market share between 2015 and 2016.  To put this in perspective, Scala has less market share than Lisp

Consistency and Integrity Issues

Trying to get Spark to meet rigorous standards of data consistency and integrity proves difficult.  Apache Spark’s design originates from companies who consider Data Consistency and Data Integrity secondary concerns, while most industries consider these primary concerns.  For example, achieving at-most-once and at-least-once consistency from Spark requires numerous workarounds and hacks: http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Dependency Hell with a Vengeance

Apache Spark (and Scala) import a huge number of transitive dependencies compared to other alternative technologies.  Programmers must master all of those dependencies in order to master Spark.  No wonder very few true experts in Spark exist in the market today.

What’s the Alternative to Spark?

For real-time in-memory processing Use Case: data grids, once the purview of blue chip commercial vendors, now have very strong open source competition.  Primary contenders include Apache Ignite and Hazelcast.

For fast SQL analytics (OLAP) Use Case: Apache Drill provides similar performance to Spark SQL with a much simpler, more efficient, and more robust footprint.  Apache Kylin from eBay looks to become a major OLAP player very quickly, although I have not used it myself.

For stream processing Use Case: Apache Beam from Google looks likely to become the de-facto streaming workhorse, unseating both Apache Flink and Spark Streaming.  Major big data vendors have already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.

If you try these alternative technologies, and compare to Spark, I’m sure you’ll agree that Spark isn’t worth the headache.

 

Hermetic Lambdas: a Solution for Big Data Dependency Collisions

Dependency collisions furnish plenty of headaches and late nights while implementing Lambda Architecture.  Java class loading containers tend toward complexity and many frameworks such as Hadoop, JCascalog, and Storm provide tons of transitive dependencies but no dependency isolation for third party Lambda code deployed in the same JVM.  So currently to benefit from running in the same JVM, all the transitive dependencies come along for the ride.  This impacts:

  • Portability: Lambda logic shared in the batch layer and the speed layer must cope with at least two different sets of dependencies.
  • Manageability: framework version upgrades may cause hard failures or other incompatibilities in Lambda logic due to changing dependency versions.
  • Performance: solving the problem by forcing JVM isolation incurs overhead from IPC or network chatter.  Likewise, utilizing scripting or dynamic languages merely hides the problem rather than solving it, while imposing additional performance penalties.
  • Complexity: developing Lambdas requires knowledge and accommodation of all the dependencies of all frameworks targeted for deployment.

Many developers and architects rightfully shy away from deploying bloated dependency container frameworks such as Spring or OSGI inside of a big data framework.  Big data and fast data provide enough complexity already.  So lets look at how we can use a simple pattern, along with basic core Java libraries, to avoid “Dependency Hell“.

Signs you may be encountering this problem include these types of exceptions occurring after deployment:

  • java.lang.NoSuchMethodError: when a method no longer exists which your code depends on
  • java.lang.ClassNotFoundException: when a class your code uses cannot be found in the deployed class path
  • java.lang.IllegalAccessException: when conflicting versions of an API mark methods or fields private

Please reference my github project for a complete working implementation along with unit tests.


Shows how the Lambda executes within the big data framework in an isolated dependency context
Lambda Execution Model

Hermetic Classloading

The Java Service Loader framework introduced in JDK 1.6 provides a way to dynamically bind an implementation class to an interface while specifying an alternate class loader.  Loading a Lamda implementation with ServiceLoader forces all code called within the context of the Lamda to use the same class loader.  This allows creating a service which supports parent-first class loading, child-first class loading, or child-only (or hermetic) class loading.  In this case, the hermetic loader prevents any possible dependency collisions.  To create a hermetic loader, we need simply utilize the built-in URLClassLoader in Java.  Our implementation jar can reside on the local file system, on a web server, or in HDFS: anywhere a URL can point to.  For the parent class loader, we specify the Java bootstrap class loader.  So we can implement a hermetic class loading pattern in one line of code:

ServiceLoader<Map> loader = ServiceLoader.load(Map.class, 
    new URLClassLoader(urls, Map.class.getClassLoader()));

Note that we intentionally avoid calling ClassLoader.getSystemClassLoader() in order to prevent the calling context (such as Hadoop or Hive) form polluting the Lambda class path.  Core packages such as java.lang and java.util use the bootstrap class loader, which only carries core Java dependencies shipped as part of the JDK.  The diagram above shows how the LambdaBoot framework fits within a big data framework such as Hadoop.

Mapping the World

In the example above, we use a Map interface to interact with the Lambda.  This allows us to avoid having a separate jar containing a Service Provider Interface (SPI).  Instead, we can subclass the Map and provide any behavior desired by intercepting get and put calls to specific keys.  By optionally returning a Future from a get or put call on the Map, we get asynchronous interaction as well if desired.  In addition, the non-sensitive Map keys can provide metadata facilities.  In the example of linear regression implemented as a Map, we use a conflicting version of Apache Commons Math not yet supported by Hadoop to calculate the regression.

Shows how the LambdaBoot framework can work with deployment tools such as Jenkins, Puppet, and Git
Lambda Deployment Model

Implications

Using a very simple pattern, we can deploy Hermetic Lambdas to any Java environment without fear of dependency collisions.  The ServiceLoader also acts as a service registry, allowing us to browse metadata about available Lambdas, dynamically load new Lamdas, or update existing Lambdas.

Titan: Cassandra vs. Hazelcast persistence benchmark

10 node load test comparison using Amazon EC2 SSD-based instances.  1 billion vertices and 1 billion edges processed for each test run.  Used the titan-loadtest project to run each test.

Method

Experiment maximizes data locality by co-locating load generation, Titan graph database, and Cassandra/Hazelcast within the same JVM instance while partitioning data across a cluster. Exploration of methods for tuning garbage collection, Titan, and Cassandra for the peer computing use case.

The following components were utilized during the experiment:

Technology Version
RHEL x64 HVM AMI 6.4
Oracle JDK x64 1.7_45
Apache Cassandra 1.2.9
Hazelcast 3.1.1
Titan 0.3.2

Each test iteration has 6 read ratio phases starting with 0% reads (100% writes) all the way up to 90% reads and 10% writes.  For all tests, the persistence implementation executes in the same JVM as Titan to avoid unnecessary context switching and serialization overhead.  Tests were conducted using an Amazon placement group to ensure instances resided on the same subnet.  The storage was formatted with 4K blocks and used the noop scheduler to improve latency.

For each phase, new vertices were added with one edge linking back to a previous vertex.  No tests of update or delete were conducted.

Please see the titan-loadtest project above for all Cassandra and Hazelcast settings and configurations used in the test.

Results

Titan 10 node Summary
Titan 10 node Summary

Please note: the results are listed in rates of thousands of vertices per second and include the creation of an edge as well as a vertex. Also, the Hazelcast SSD x1 results used a custom flash storage module for Hazelcast developed privately so those results are not replicable without that module installed.

Conclusions

Hazelcast performed better than Cassandra for all tests and demonstrated one order of magnitude better performance on reads.  Surprisingly, Hazelcast slightly outperformed Cassandra for writes as well.

Large Java Heap with the G1 Collector – Part 1

Demonstration of efficient garbage collection on JVM heap sizes in excess of 128 gigabytes.  Garbage collection behavior analyzed via a mini-max optimization strategy.  Study measures maximum throughput versus minimum garbage collection pause to find optimization “sweet spot” across experimental phases.  Replicable results via well-defined open source experimental methods executable on Amazon EC2 hardware. 

Experimental Method

Goals

  • Demonstrate maximum feasible JVM size on current cloud hardware (specific to Amazon for now) using the G1 Garbage Collector (link).
  • Vary the JVM heap size (-Xmx) exponentially to find performance profile and breaking points.
  • Vary the ratio of new versus old generation objects exponentially.
  • Using in-memory workload to stress the JVM (avoids network or disk waits).
  • Produce replicable results on commodity hardware, open source operating systems, and open source tools.
  • Provide gap-free data for analysis, in spite of garbage collection pauses.

Not (Yet) in Scope

In followup to this study, subsequent efforts may include:

  • Vary the number of old objects up to maximum possible memory capacity.
  • Vary the number of processing threads to starve the concurrent G1 algorithm of CPU cycles.
  • Vary the object size.
  • Vary the field size within the object.
  • Vary the G1 JVM parameters.

Tools and Versions

  1. Amazon Linux AMI (link)
  2. One cr1.8xlarge instance (link): 244 GiB RAM, 2 x Intel Xeon E5-2670 (link)
  3. Oracle JDK 1.7_15

JVM Parameters

This experiment varied the JVM heap size but kept other parameters constant.

  • -Xmx16g, -Xmx32g, -Xmx64g, -Xmx128g, -Xmx212g
  • -XX:+UseG1GC
  • -XX:InitiatingHeapOccupancyPercent=0
  • -XX:MaxGCPauseMillis=200
  • -XX:MaxTenuringThreshold=25
  • -XX:ParallelGCThreads=32
  • -XX:ConcGCThreads=32
  • -XX:G1ReservePercent=10
  • -XX:G1HeapRegionSize=32m

Program Parameters

These parameters were kept constant for this study.

  • statInterval = 1000: the reporting interval for measures in milliseconds.
  • numThreads = 24: number of processing threads.
  • objectsPerThread = 8,000,000: number of objects to produce for each phase.
  • maxOld = 800,000: max number of old references to maintain, after which old references are overwritten with new values.

Experimental Phases

For each JVM heap size, the following phases are executed.  For each phase, after maxOld reached object references get overwritten and must be garbage collected.

  • RANDOM_WRITE_0: All of the objects produced are instant garbage, references not kept.
  • RANDOM_WRITE_1: All of the objects references kept.
  • RANDOM_WRITE_2: 1/2 of object references kept.
  • RANDOM_WRITE_4: 1/4 of object references kept.
  • RANDOM_WRITE_8: 1/8 of object references kept.

Experiment Results

Sum of Garbage Collection Pause
Sum of Garbage Collection Pause

The graph above shows the sum total of *all* garbage collection pauses across each experimental phase.

Min of Object Throughput
Min of Object Throughput (per second)

The graph above shows the *min* throughput for each second across each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The line chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Min of Throughput to Max of Garbage Collection Pause (per second)
Min of Throughput to Max of Garbage Collection Pause (per second)

The radar chart above shows log scale throughput versus standard scale garbage collection pause across all seconds in each experimental phase.

Addenda

Java garbage collection test class:

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

public class GCTest {

    public static String configName = "Test";
    public static int statInterval = 1000;
    public static int numThreads = 4;
    public static int objectPerThread = 500000;
    public static int maxOld = 1000000;

    public static enum ExperimentPhase {
        SEQUENTIAL_WRITE, SEQUENTIAL_CLEAR, RANDOM_WRITE_0, RANDOM_WRITE_8, RANDOM_WRITE_4, RANDOM_WRITE_2, RANDOM_WRITE_1, RANDOM_READ
    };

    public static int phaseIdx;
    public static ExperimentPhase phase;

    public static CyclicBarrier barrier;

    public static int[] oldToYoungRatios = new int[] { 0, 8, 4, 2, 1 };

    public static volatile boolean done = false;

    public static AtomicLong totalOfInterval = new AtomicLong();

    public static Object[] oldArr = new Object[maxOld];

    public static ConcurrentHashMap<Integer, Map<String, Object>> oldMap = new ConcurrentHashMap<Integer, Map<String, Object>>(
            maxOld);

    /**
     * Generates maximum amount of objects with nested references. Saves some
     * references so they go to the old generation.
     * 
     * @author pouttum
     * 
     */
    public static class GarbageThread extends Thread {

        protected int threadNo;

        public GarbageThread(int threadNo) {
            super();
            this.threadNo = threadNo;
        }

        @Override
        public void run() {

            await();

            // Incremental update phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = getDoc(x);
                    totalOfInterval.incrementAndGet();
                }
            }

            await();

            // Incremental clear phase
            for (int x = 0; x < maxOld; x++) {
                if (x % numThreads == threadNo) {
                    oldArr[x] = null;
                    totalOfInterval.incrementAndGet();
                }
            }

            // Random write / update phase
            for (int r = 0; r < oldToYoungRatios.length; r++) {
                await();
                for (int x = 0; x < objectPerThread; x++) {
                    Map<String, Object> doc = getDoc(x);
                    totalOfInterval.incrementAndGet();
                    if (oldToYoungRatios[r] > 0
                            && (oldToYoungRatios[r] == 1 || (x
                                    % oldToYoungRatios[r] == 0))) {
                        int index = (int) (Math.ceil(random() * maxOld));
                        oldMap.put(index, doc);
                    }
                }
            }

            await();

            // Random read phase
            for (int x = 0; x < objectPerThread; x++) {
                totalOfInterval.incrementAndGet();
                int index = (int) (Math.ceil(random() * maxOld));
                oldMap.get(index);
            }
        }

        protected void await() {
            try {
                barrier.await();
            } catch (Exception e) {
            }
        }

        protected HashMap<String, Object> getDoc(int x) {
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("value1", "value1" + String.valueOf(x));
            doc.put("value2", "value2" + String.valueOf(x));
            doc.put("value3", "value3" + String.valueOf(x));
            doc.put("value4", "value4" + String.valueOf(x));
            doc.put("value5", "value5" + String.valueOf(x));
            doc.put("value6", "value6" + String.valueOf(x));
            doc.put("value7", "value7" + String.valueOf(x));
            doc.put("value8", "value8" + String.valueOf(x));
            doc.put("value9", "value9" + String.valueOf(x));
            return doc;
        }

        protected double random() {
            return Math.random();
        }

    };

    /**
     * Calculates ongoing stats and keeps history on the stat interval.
     * 
     * @author pouttum
     * 
     */
    public static class StatThread extends Thread {

        @Override
        public void run() {
            Date previousDate = new Date();
            long adjStatInterval = statInterval;
            int intervalCount = 0;
            do {
                try {
                    Thread.sleep(adjStatInterval);
                } catch (InterruptedException e) {
                    done = true;
                }
                adjStatInterval = statInterval;
                long intervalTotal = totalOfInterval.getAndSet(0L);
                Date date = new Date();
                double intervalSeconds = (date.getTime() - previousDate
                        .getTime()) / 1000d;

                StringBuilder stats = new StringBuilder(1024);
                float statIntervalDouble = statInterval / 1000f;
                double gcPause = intervalSeconds - statIntervalDouble;
                if (intervalSeconds > statIntervalDouble * 2) {
                    double x = statIntervalDouble * 2;
                    for (; x < intervalSeconds; x += statIntervalDouble) {
                        stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                                configName, phase, ++intervalCount, 0,
                                statIntervalDouble));
                        gcPause -= statIntervalDouble;
                    }
                }
                if (gcPause > 0.0d) { // Credit the next interval with some of
                                        // the count of this interval
                    adjStatInterval -= gcPause * 1000L;
                    long intervalTotalAdj = Math
                            .round((gcPause / statIntervalDouble)
                                    * intervalTotal);
                    intervalTotal -= intervalTotalAdj;
                    totalOfInterval.addAndGet(intervalTotalAdj);
                }
                stats.append(String.format("%s\t%s\t%d\t%d\t%.3f\n",
                        configName, phase, ++intervalCount, intervalTotal,
                        Math.max(gcPause, 0.0d)));
                previousDate = date;
                System.out.print(stats.toString());
            } while (!done);
        }

    }

    /**
     * * @param args
     * 
     * @throws InterruptedException
     * */
    public static void main(String[] args) throws Exception {
        if (args.length == 5) {
            configName = args[0];
            statInterval = Integer.parseInt(args[1]);
            numThreads = Integer.parseInt(args[2]);
            objectPerThread = Integer.parseInt(args[3]);
            maxOld = Integer.parseInt(args[4]);
        }
        barrier = new CyclicBarrier(numThreads,
                new Runnable() {
                    @Override
                    public void run() {
                        phase = ExperimentPhase.values()[phaseIdx++];
                    }
        });
        GarbageThread[] threads = new GarbageThread[numThreads];
        for (int x = 0; x < threads.length; x++) {
            threads[x] = new GarbageThread(x);
            threads[x].start();
        }
        StatThread statThread = new StatThread();
        statThread.setPriority(Thread.MAX_PRIORITY);
        statThread.start();
        for (int x = 0; x < threads.length; x++) {
            try {
                threads[x].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        done = true;
        statThread.join();
    }

}

Compile the code:

jdk1.7.0_15/bin/javac GCTest.java

Unix script run.sh:

jdk1.7.0_15/bin/java -server -Xms16g -Xmx16g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg16 1000 24 8000000 800000 > baseline.csv
jdk1.7.0_15/bin/java -server -Xms32g -Xmx32g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg32 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms64g -Xmx64g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg64 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms128g -Xmx128g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest mxg128 1000 24 8000000 800000 >> baseline.csv
jdk1.7.0_15/bin/java -server -Xms212g -Xmx212g -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=0 -XX:MaxGCPauseMillis=200 -XX:MaxTenuringThreshold=25 -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1ReservePercent=10 -XX:G1HeapRegionSize=32m GCTest old_ratio_8 1000 24 8000000 800000 >> baseline.csv

CouchDB and Hadoop

CouchDB, a top level Apache project, utilizes a RESTful interface to serve up dynamic JSON data on an Internet scale.  CouchDB provides ACID guarantees with no locking using Multi-version Concurrency control, and also scales via Shared-nothing deployment using multi-master replication.  Data access leverages a map/reduce query model expressed in JavaScript, as detailed below.  This page provides a brief overview of CouchDB and potential utilization in a Hadoop Amazon Elastic MapReduce deployment.

CouchDB by Example

Looking at the Apache pages utilizes some useful examples of how to use CouchDB.  However, I found this cool Mu Dynamics simulator which shows how to query CouchDB by example.  I found that the simulator quickly clarifies how the Map/Reduce query processing works with concrete problems.  A much faster way to kick the tires than building and installing from scratch…

Scalability and Performance

The Apache wiki provides some pointers on CouchDB performance.  It seems that striping (RAID 0) provides the best performance improvements in the benchmarks I’ve seen, esp in environments with many users and a lot of random reads (see below).  CouchDB doesn’t support parallel query out of the box but opening multiple sockets to the same server would allow parallel access to data for a single client.

The map/reduce query capability produces materialized views automatically: featuring automated incremental refresh.  Therefore if the data doesn’t change in a view then each query does not re-query the base data.  By default the first query after a committed change to the base data triggers the incremental refresh of the view (this is configurable).

In terms of scalability, multi-master replication provides multiple data copies across concurrent machines and/or data centers.  Via the stateless RESTful interface load balancing doesn’t get any easier…  Also, sharding is not supported out of the box but solutions like Lounge and Pillow look promising for supporting larger data sets.

Cloud Deployment

Check out this blog post on how to spin up a CouchDB cluster on Amazon EC2 using Puppet. Seems an ideal candidate for Elastic Load Balancing: the ability to automatically spin up instances as demand increases should really make CouchDB live up to it’s name!

Potential Use Cases With Hadoop

Use of CouchDB to serve key/value reference data in an Inverse REST architecture seems an ideal use case.  Since we could maintain the CouchDB data using a separate GUI we could correct reference data or meta data dynamically as the job is running.  No need to wait around until a large job has crashed and burned to fix reference data problems.  Probably also we would want to have an LRU cache on the client side (in the Hadoop job) to minimize network hits.

Conclusion

Although it may seem counter-intuitive, leveraging the multi-layered REST caching and load balancing architecture to move small data close to big data could make a lot of sense, especially in a cloud environment designed to adapt dynamically to large spikes.

Java Embedded DB for IP2Location in Hadoop

Since the IP geographic data operates on ranges of IP addresses, we cannot simply join the log data to the IP data (using RDBMS or MapReduce), and we cannot use a simple key/value cache to look-up the IP data.  Currently the entire IP2Location DB-5 data set consumes 1+ GB of memory when loaded into a Java object array.  While we can currently fit this in memory with the current infrastructure we will have issues if this data grows significantly beyond it’s current size.  Therefore we have to look at some alternatives for looking up the data without loading it all in memory.  We cannot use a middleware or shared database solution since these will quickly become overwhelmed by requests from our cluster.  Likewise, we cannot afford to take a network hit for every look-up and we currently cannot batch look-up requests to reduce network hits.   We need a shared-nothing architecture, and therefore copying a Java embedded database locally to pull our small data close to our big data seems the best approach.  This page evaluates several Java embedded databases as a fit for the use case.

This test is Hadoop/Cascading specific in that it uses the standard set of jar files included in the class path of Apache Hadoop 20.2 and Cascading 1.2.  For example, we used the hsqldb-1.8.0.10.jar bundled with Hadoop.  Leveraging an embedded DB which is not only thread safe but also concurrent will allow re-use of in-process memory cache across multiple threads using the Hadoop Task JVM Reuse feature.

Databases Evaluated

The following databases were evaluated:

  1. Baseline load into in-memory array.
  2. Apache Derby 10.7.1.1
  3. HSQLDB 1.8.0.10
  4. H2 1.2.147

Each of the databases were set up, configured, and queried using identical SQL scripts and JDBC code, except where specifically noted in the test results section.  Also, we require the exclusive use of a JDBC interface to the database so we can easily swap DBs based on performance and stability.

Test Results

Test results for the in-memory baseline and each of the embedded DBs follows:

DB Load Time Load Rows Load Rate IP Lookup Rate
Derby 1 min, 51 sec 3,237,642 29168 / sec 82486 / sec ****
H2 1 min, 10 sec 3,237,642 46252 / sec *** 61527 / sec
in-memory array 32 sec 3,237,642 101176 / sec 133 / sec
HSQLDB 3 min, 28 sec 3,237,642 15565 / sec * 1 /  22 sec **

* = -Xmx3G JVM setting was required to load the data

** = -Xmx1G JVM setting was required to query the data

*** = -Xmx1G JVM setting was required to load the data

**** = Only Derby supported concurrent thread access, and 4 threads were used in the IP Lookup test

Test Cases

Create Table

The table was created with the following columns as:

CREATE TABLE IP_INFO (
  IP_FROM BIGINT,
  IP_TO BIGINT,
  COUNTRY_CODE VARCHAR(2),
  COUNTRY_NAME VARCHAR(64),
  REGION VARCHAR(64),
  CITY VARCHAR(64),
  LATITUDE DOUBLE,
  LONGITUDE DOUBLE
)

Load Data

The following load scripts where used to load up to the maximum 3,237,642 rows.  Data set based on IP2Location DB-5.  Some of the Java database tested could not adequately handle the data size so for those databases a smaller number of rows were loaded, as noted in the test results section:

INSERT INTO IP_INFO (
  IP_FROM,
  IP_TO,
  COUNTRY_CODE,
  COUNTRY_NAME,
  REGION,
  CITY,
  LATITUDE,
  LONGITUDE
) VALUES (
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?
)

In addition, the following JDBC code to load the data utilizes prepared statements and JDBC batch processing:

Connection conn = DriverManager.getConnection(<db url>);

PreparedStatement ps = conn.prepareStatement(insertString);

// read data file and get fields for each line ...

// for each field, bind values to the PreparedStatement
ps.setLong(1, g.getIpFrom());

ps.addBatch();

// add batch for every line, and execute batch every 100 lines
if(lineNumber % 100 == 0) {
  ps.executeBatch();
}
// at the end, execute final batch and commit ...
ps.executeBatch();
conn.commit();

Create Index

A b-tree index on IP_TO is required to select the matching IP range:

CREATE INDEX IP_INFO_I01 ON IP_INFO ( IP_TO )

Select IP Number in Range

The first row matching the following SELECT query matches the geographic information for an IP address range.  Any additional rows are not read and are ignored:

SELECT *
FROM IP_INFO
WHERE IP_TO >= ?
ORDER BY IP_TO

The following JDBC code utilizes prepared statements and bind variables to select the first matching row:

PreparedStatement ps = conn.prepareStatement(<sql>);
ps.setFetchSize(1);
ps.setMaxRows(1); 

// the following code iterates for each look up test execution
long random = (long)(Math.random() * Integer.MAX_VALUE);
ps.setLong(1, random);
ResultSet rs = ps.executeQuery();
if(rs.next()) {
  // test getting fields, and verify result
}

Test System Configuration

Test was run on the following platform configuration:

Hardware Overview:

  Model Name:    MacBook Pro
  Model Identifier:    MacBookPro6,2
  Processor Name:    Intel Core i7
  Processor Speed:    2.66 GHz
  Number Of Processors:    1
  Total Number Of Cores:    2
  L2 Cache (per core):    256 KB
  L3 Cache:    4 MB
  Memory:    4 GB
  Processor Interconnect Speed:    4.8 GT/s
  Boot ROM Version:    MBP61.0057.B0C

  Serial-ATA:
    Intel 5 Series Chipset:

      Vendor:    Intel
      Product:    5 Series Chipset
      Link Speed:    3 Gigabit
      Negotiated Link Speed:    1.5 Gigabit
      Description:    AHCI Version 1.30 Supported

  Disk:
    Hitachi HTS545050B9SA02:

      Capacity:    500.11 GB (500,107,862,016 bytes)
      Model:    Hitachi HTS545050B9SA02
      Revision:    PB4AC60W

System Software Overview:

  System Version:    Mac OS X 10.6.5 (10H574)
  Kernel Version:    Darwin 10.5.0

JVM Overview:

  java version "1.6.0_22"
  Java(TM) SE Runtime Environment (build 1.6.0_22-b04-307-10M3261)
  Java HotSpot(TM) 64-Bit Server VM (build 17.1-b03-307, mixed mode)

Modifications to Tests

Some tests had to be modified, either due to time constraints (test could not run in a reasonable time) or because the test crashed or hung.

In Memory Array

We had to run the test with JVM parameter -Xmx2G to ensure all the data could be loaded into memory.  Since we are not using a DB, the query logic is simply to iterate through the array to find the first row with IP_TO >= target IP.

Please note: we cannot use a binary search algorithm since we do not know the specific value we are searching for a priori.  We considered writing a binary search variant to find the first element >= the target IP, however we could not find a library which does this and we don’t have the time to write and test one ourselves.

Derby

We had to set the block cache size above the default to improve cache hits for multi-threaded access:

System.getProperties().setProperty("derby.storage.pageCacheSize", "524288");

We set the JVM -xmx1G for the IP Lookups and used the JVM default 128M for the data load.

HSQLDB

We had to make many modifications to the code, the DML, and SQL as well as the JVM memory settings to make the data load work. We could not find a way to force HSql to use the b-tree index so the query performance represents the default plan selected by HSQLDB.

Here is a description of the changes and why they were made:

JVM Memory Settings

We started out with the 128M default JVM heap and received an out of memory error before we completely loaded 200,000 rows.  We had to conclude that HSql only saves to disk after a commit or checkpoint, and keeps the entire set of data buffered in memory.  This was confirmed when we added a CHECKPOINT after each 100,000 rows and still got the same “java.lang.OutOfMemoryError: Java heap space”, but we did see data on disk after each checkpoint.

So we had to set memory to -Xmx3G to load the data set, and we had to take the CHECKPOINT out because it looks like HSql writes all the data to one big file.  So every time you do a CHECKPOINT the overhead becomes larger and larger to write the data.  It is also possible it’s writing the entire file with every checkpoint as well.  Definitely never use CHECKPOINT or incremental commits when writing data set on GB+ sizes, and make sure your available memory is 3 times the size of the data on disk after the load.

When querying the data, again we have the problem of HSql loading the entire data set into memory.  I.e. we could not query the data at all with the default 128MB of JVM memory allocated.  Setting -Xmx1G works around this problem.

Code and SQL Changes

We could not create the index due to this error:

java.sql.SQLException: java.io.IOException: S1000 Data file size limit is reached in statement [CREATE INDEX IP_INFO_I01 ON IP_INFO(IP_TO)]

We tried to load only 100,000 rows to see if the index is actually used but the best query rate we could get was 17 / sec so if the index was used the performance definitely would not scale to 320 times that data size.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

H2

H2 also required changes to the JVM settings to work properly.

JVM Memory Settings

With the default 128M JVM settings we were able to reach just over 400,000 rows loaded before running out of heap space.  Setting the heap to -Xmx1G fixed this issue.  Similarly to HSQLDB it looks like H2 loads the entire data set into memory for the transaction.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

Conclusion

Derby provides the best fit for this use case due to:

  1. Concurrency: multiple threads can query embedded Derby within the same JVM process.
  2. The performance of Derby in multi-threaded mode outstrips H2’s single threaded performance on a 2 core machine.  We expect this to scale linearly to our target 16 core production deployment.
  3. Sharing process memory across multiple threads allows us to scale look up data size while maximizing shared cache hits.

However, H2 demonstrated superior single-threaded performance and admirable load speeds and used 40% less space on-disk than Derby.

All-in-all Derby wins for this use case, and H2 gets admirable mention.

The Elephant and the Rhino

If we use JavaScript for Code On Demand couldn’t we also use it for Inverse Code on Demand using Hadoop?  It’s not as hard to find people who know JavaScript as it is to find people who know Groovy or Python.  Less of a learning curve for everyone involved if we use the most popular scripting language for our embedded business rules.   Plus if we build them right we could use the same business rule across our entire ultra high scale web presence: from the browser to the elastic map reduce

Mozilla Rhino doesn’t have a lot of marketing behind it, but it’s been around.  Feature rich, mature, and embeddable: Mozilla’s JavaScript engine looks like a winner. We could inject input splits and emit results with ease.

Plus if you thought the Bull in the china shop was fun, wait until you see the Elephant and the Rhino!

Make the Elephant and the Rhino your partner, and utterly destroy a china shop near you!  China Shops don’t scale anyway…

Tips for Implementing Rhino in Hadoop

Make sure to use the Java scripting API compilation option in the setup and cleanup method of your mapper or reducer. Please see this article on how to compile scripts.  This dramatically reduces the CPU requirements and execution time for scripts.

Inverse REST

The principles of REST allow HTTP to scale to Internet user volumes, and code-on-demand provides one of the building blocks of REST.  Code-on-demand allows rich content on the browser via JavaScript or browser plug-ins, and this technique has matured so much that it requires minimal server interaction to run even the most sophisticated applications.

In short, code-on-demand enables scaling across across users by moving the code (logic) to the consumer, instead of requiring the consumer to make a remote request to the code.

It follows logically that any successful approach to scaling across big data requires inverting REST and executing the code as close to the data as possible, rather than trying to move big data to the code.

Likewise, the web architecture scales across users by utilizing caching to provide data locality for shared data.  Scaling across big data also requires data locality for reference data. We must move our small data close to our big data to scale efficiently.

The major flaw designed into many current big data applications involves the failure to utilize the inverse of REST: SOA and integration vendors sell customers on the idea that big data problems can be solved by moving all data through a layer of middle-ware: whether this be a J2EE application server, a service bus, or an integration tool.  I have literally spent years of my career trying to tune and optimize middle-ware solutions for big data.  That’s why I can say definitively that the middle-ware concept does very well at selling a lot of consulting hours, software licenses, and hardware.  What it does not do is scale to big data.

You could code all the big data logic in stored procedures, assuming willingness to embed business logic into a closed system, and assuming that a database will scale to your data volumes.  Database vendors are only beginning to utilize Inverse REST: evaluating filters , transformations, and lookups in the storage pipeline is a new (or non-existent) feature in most DBMS systems.  Yet another opportunity for vendor lock-in.

Hadoop Map Reduce follows an open system implementation of inverse REST.

Regardless of who wins that battle between RDBMS and Map/Reduce one thing is certain: anyone not leveraging the principles of Inverse REST will be left in the dust.

Google, Yahoo, Facebook, StumbleUpon, and others have already hit the wall, and it’s only a matter of time before we all do.