Tag Archives: Shared Nothing

Hyperdimensionality and Big Data

In this video training, Matt explains how hyperdimentional reasoning implicitly plays a part in all big data analyses and how today’s analytics and deep learning can utilize hyperdimensionality to improve accuracy and reduce algorithmic blind spots.

Watch on YouTube:

Feel the Beam! Google Casts a new Light on Big Data

using-lasers-to-preserve-mt-rushmoreApache Beam from Google finally provides robust unification of batch and real-time Big Data.  This framework replaced MapReduce, FlumeJava, and Millwheel at Google.  Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.  Anyone else seeing the future of Big Data in a new light?  I know I am…

Academic underpinning: http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Google’s comparison of Apache Beam vs. Apache Spark: https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison

Why I tried Apache Spark, and moved on..

I tried Apache Spark, and moved on.  Here’s why:

Resourcing Issuesspark-do-not-enter

Apache Spark, written in Scala, causes severe resourcing issues for customers due to the additional technical skill requirements:

  1. Scala ranks #30 with 0.5 % in market saturation, while Java ranks #1 with 21.5% of the market, a difference of 4300%: http://www.tiobe.com/index.php/content/paperinfo/tpci/index.html
  2. Introduction of native Functional Programming constructs into the Java language with release 1.8 practically eliminates the business case for Scala altogether: https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html
  3. Scala works best with IntelliJ Idea IDE, which has licensing costs and is extremely unlikely to replace free Eclipse tooling at any large company
  4. Scala is among a crowd of strong contenders and faces a moving target as Java has gained 5% in market share between 2015 and 2016.  To put this in perspective, Scala has less market share than Lisp

Consistency and Integrity Issues

Trying to get Spark to meet rigorous standards of data consistency and integrity proves difficult.  Apache Spark’s design originates from companies who consider Data Consistency and Data Integrity secondary concerns, while most industries consider these primary concerns.  For example, achieving at-most-once and at-least-once consistency from Spark requires numerous workarounds and hacks: http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Dependency Hell with a Vengeance

Apache Spark (and Scala) import a huge number of transitive dependencies compared to other alternative technologies.  Programmers must master all of those dependencies in order to master Spark.  No wonder very few true experts in Spark exist in the market today.

What’s the Alternative to Spark?

For real-time in-memory processing Use Case: data grids, once the purview of blue chip commercial vendors, now have very strong open source competition.  Primary contenders include Apache Ignite and Hazelcast.

For fast SQL analytics (OLAP) Use Case: Apache Drill provides similar performance to Spark SQL with a much simpler, more efficient, and more robust footprint.  Apache Kylin from eBay looks to become a major OLAP player very quickly, although I have not used it myself.

For stream processing Use Case: Apache Beam from Google looks likely to become the de-facto streaming workhorse, unseating both Apache Flink and Spark Streaming.  Major big data vendors have already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.

If you try these alternative technologies, and compare to Spark, I’m sure you’ll agree that Spark isn’t worth the headache.


Streaming Feature Extraction for Unions with statzall

unions-cSupporting unions for fields with multiple types makes for more robust and automated feature extraction.  For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.

Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding.  This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt.  With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.

In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP.  Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.

Titan: Cassandra vs. Hazelcast persistence benchmark

10 node load test comparison using Amazon EC2 SSD-based instances.  1 billion vertices and 1 billion edges processed for each test run.  Used the titan-loadtest project to run each test.


Experiment maximizes data locality by co-locating load generation, Titan graph database, and Cassandra/Hazelcast within the same JVM instance while partitioning data across a cluster. Exploration of methods for tuning garbage collection, Titan, and Cassandra for the peer computing use case.

The following components were utilized during the experiment:

Technology Version
RHEL x64 HVM AMI 6.4
Oracle JDK x64 1.7_45
Apache Cassandra 1.2.9
Hazelcast 3.1.1
Titan 0.3.2

Each test iteration has 6 read ratio phases starting with 0% reads (100% writes) all the way up to 90% reads and 10% writes.  For all tests, the persistence implementation executes in the same JVM as Titan to avoid unnecessary context switching and serialization overhead.  Tests were conducted using an Amazon placement group to ensure instances resided on the same subnet.  The storage was formatted with 4K blocks and used the noop scheduler to improve latency.

For each phase, new vertices were added with one edge linking back to a previous vertex.  No tests of update or delete were conducted.

Please see the titan-loadtest project above for all Cassandra and Hazelcast settings and configurations used in the test.


Titan 10 node Summary
Titan 10 node Summary

Please note: the results are listed in rates of thousands of vertices per second and include the creation of an edge as well as a vertex. Also, the Hazelcast SSD x1 results used a custom flash storage module for Hazelcast developed privately so those results are not replicable without that module installed.


Hazelcast performed better than Cassandra for all tests and demonstrated one order of magnitude better performance on reads.  Surprisingly, Hazelcast slightly outperformed Cassandra for writes as well.

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.

CouchDB and Hadoop

CouchDB, a top level Apache project, utilizes a RESTful interface to serve up dynamic JSON data on an Internet scale.  CouchDB provides ACID guarantees with no locking using Multi-version Concurrency control, and also scales via Shared-nothing deployment using multi-master replication.  Data access leverages a map/reduce query model expressed in JavaScript, as detailed below.  This page provides a brief overview of CouchDB and potential utilization in a Hadoop Amazon Elastic MapReduce deployment.

CouchDB by Example

Looking at the Apache pages utilizes some useful examples of how to use CouchDB.  However, I found this cool Mu Dynamics simulator which shows how to query CouchDB by example.  I found that the simulator quickly clarifies how the Map/Reduce query processing works with concrete problems.  A much faster way to kick the tires than building and installing from scratch…

Scalability and Performance

The Apache wiki provides some pointers on CouchDB performance.  It seems that striping (RAID 0) provides the best performance improvements in the benchmarks I’ve seen, esp in environments with many users and a lot of random reads (see below).  CouchDB doesn’t support parallel query out of the box but opening multiple sockets to the same server would allow parallel access to data for a single client.

The map/reduce query capability produces materialized views automatically: featuring automated incremental refresh.  Therefore if the data doesn’t change in a view then each query does not re-query the base data.  By default the first query after a committed change to the base data triggers the incremental refresh of the view (this is configurable).

In terms of scalability, multi-master replication provides multiple data copies across concurrent machines and/or data centers.  Via the stateless RESTful interface load balancing doesn’t get any easier…  Also, sharding is not supported out of the box but solutions like Lounge and Pillow look promising for supporting larger data sets.

Cloud Deployment

Check out this blog post on how to spin up a CouchDB cluster on Amazon EC2 using Puppet. Seems an ideal candidate for Elastic Load Balancing: the ability to automatically spin up instances as demand increases should really make CouchDB live up to it’s name!

Potential Use Cases With Hadoop

Use of CouchDB to serve key/value reference data in an Inverse REST architecture seems an ideal use case.  Since we could maintain the CouchDB data using a separate GUI we could correct reference data or meta data dynamically as the job is running.  No need to wait around until a large job has crashed and burned to fix reference data problems.  Probably also we would want to have an LRU cache on the client side (in the Hadoop job) to minimize network hits.


Although it may seem counter-intuitive, leveraging the multi-layered REST caching and load balancing architecture to move small data close to big data could make a lot of sense, especially in a cloud environment designed to adapt dynamically to large spikes.

Java Embedded DB for IP2Location in Hadoop

Since the IP geographic data operates on ranges of IP addresses, we cannot simply join the log data to the IP data (using RDBMS or MapReduce), and we cannot use a simple key/value cache to look-up the IP data.  Currently the entire IP2Location DB-5 data set consumes 1+ GB of memory when loaded into a Java object array.  While we can currently fit this in memory with the current infrastructure we will have issues if this data grows significantly beyond it’s current size.  Therefore we have to look at some alternatives for looking up the data without loading it all in memory.  We cannot use a middleware or shared database solution since these will quickly become overwhelmed by requests from our cluster.  Likewise, we cannot afford to take a network hit for every look-up and we currently cannot batch look-up requests to reduce network hits.   We need a shared-nothing architecture, and therefore copying a Java embedded database locally to pull our small data close to our big data seems the best approach.  This page evaluates several Java embedded databases as a fit for the use case.

This test is Hadoop/Cascading specific in that it uses the standard set of jar files included in the class path of Apache Hadoop 20.2 and Cascading 1.2.  For example, we used the hsqldb- bundled with Hadoop.  Leveraging an embedded DB which is not only thread safe but also concurrent will allow re-use of in-process memory cache across multiple threads using the Hadoop Task JVM Reuse feature.

Databases Evaluated

The following databases were evaluated:

  1. Baseline load into in-memory array.
  2. Apache Derby
  4. H2 1.2.147

Each of the databases were set up, configured, and queried using identical SQL scripts and JDBC code, except where specifically noted in the test results section.  Also, we require the exclusive use of a JDBC interface to the database so we can easily swap DBs based on performance and stability.

Test Results

Test results for the in-memory baseline and each of the embedded DBs follows:

DB Load Time Load Rows Load Rate IP Lookup Rate
Derby 1 min, 51 sec 3,237,642 29168 / sec 82486 / sec ****
H2 1 min, 10 sec 3,237,642 46252 / sec *** 61527 / sec
in-memory array 32 sec 3,237,642 101176 / sec 133 / sec
HSQLDB 3 min, 28 sec 3,237,642 15565 / sec * 1 /  22 sec **

* = -Xmx3G JVM setting was required to load the data

** = -Xmx1G JVM setting was required to query the data

*** = -Xmx1G JVM setting was required to load the data

**** = Only Derby supported concurrent thread access, and 4 threads were used in the IP Lookup test

Test Cases

Create Table

The table was created with the following columns as:


Load Data

The following load scripts where used to load up to the maximum 3,237,642 rows.  Data set based on IP2Location DB-5.  Some of the Java database tested could not adequately handle the data size so for those databases a smaller number of rows were loaded, as noted in the test results section:


In addition, the following JDBC code to load the data utilizes prepared statements and JDBC batch processing:

Connection conn = DriverManager.getConnection(<db url>);

PreparedStatement ps = conn.prepareStatement(insertString);

// read data file and get fields for each line ...

// for each field, bind values to the PreparedStatement
ps.setLong(1, g.getIpFrom());


// add batch for every line, and execute batch every 100 lines
if(lineNumber % 100 == 0) {
// at the end, execute final batch and commit ...

Create Index

A b-tree index on IP_TO is required to select the matching IP range:


Select IP Number in Range

The first row matching the following SELECT query matches the geographic information for an IP address range.  Any additional rows are not read and are ignored:


The following JDBC code utilizes prepared statements and bind variables to select the first matching row:

PreparedStatement ps = conn.prepareStatement(<sql>);

// the following code iterates for each look up test execution
long random = (long)(Math.random() * Integer.MAX_VALUE);
ps.setLong(1, random);
ResultSet rs = ps.executeQuery();
if(rs.next()) {
  // test getting fields, and verify result

Test System Configuration

Test was run on the following platform configuration:

Hardware Overview:

  Model Name:    MacBook Pro
  Model Identifier:    MacBookPro6,2
  Processor Name:    Intel Core i7
  Processor Speed:    2.66 GHz
  Number Of Processors:    1
  Total Number Of Cores:    2
  L2 Cache (per core):    256 KB
  L3 Cache:    4 MB
  Memory:    4 GB
  Processor Interconnect Speed:    4.8 GT/s
  Boot ROM Version:    MBP61.0057.B0C

    Intel 5 Series Chipset:

      Vendor:    Intel
      Product:    5 Series Chipset
      Link Speed:    3 Gigabit
      Negotiated Link Speed:    1.5 Gigabit
      Description:    AHCI Version 1.30 Supported

    Hitachi HTS545050B9SA02:

      Capacity:    500.11 GB (500,107,862,016 bytes)
      Model:    Hitachi HTS545050B9SA02
      Revision:    PB4AC60W

System Software Overview:

  System Version:    Mac OS X 10.6.5 (10H574)
  Kernel Version:    Darwin 10.5.0

JVM Overview:

  java version "1.6.0_22"
  Java(TM) SE Runtime Environment (build 1.6.0_22-b04-307-10M3261)
  Java HotSpot(TM) 64-Bit Server VM (build 17.1-b03-307, mixed mode)

Modifications to Tests

Some tests had to be modified, either due to time constraints (test could not run in a reasonable time) or because the test crashed or hung.

In Memory Array

We had to run the test with JVM parameter -Xmx2G to ensure all the data could be loaded into memory.  Since we are not using a DB, the query logic is simply to iterate through the array to find the first row with IP_TO >= target IP.

Please note: we cannot use a binary search algorithm since we do not know the specific value we are searching for a priori.  We considered writing a binary search variant to find the first element >= the target IP, however we could not find a library which does this and we don’t have the time to write and test one ourselves.


We had to set the block cache size above the default to improve cache hits for multi-threaded access:

System.getProperties().setProperty("derby.storage.pageCacheSize", "524288");

We set the JVM -xmx1G for the IP Lookups and used the JVM default 128M for the data load.


We had to make many modifications to the code, the DML, and SQL as well as the JVM memory settings to make the data load work. We could not find a way to force HSql to use the b-tree index so the query performance represents the default plan selected by HSQLDB.

Here is a description of the changes and why they were made:

JVM Memory Settings

We started out with the 128M default JVM heap and received an out of memory error before we completely loaded 200,000 rows.  We had to conclude that HSql only saves to disk after a commit or checkpoint, and keeps the entire set of data buffered in memory.  This was confirmed when we added a CHECKPOINT after each 100,000 rows and still got the same “java.lang.OutOfMemoryError: Java heap space”, but we did see data on disk after each checkpoint.

So we had to set memory to -Xmx3G to load the data set, and we had to take the CHECKPOINT out because it looks like HSql writes all the data to one big file.  So every time you do a CHECKPOINT the overhead becomes larger and larger to write the data.  It is also possible it’s writing the entire file with every checkpoint as well.  Definitely never use CHECKPOINT or incremental commits when writing data set on GB+ sizes, and make sure your available memory is 3 times the size of the data on disk after the load.

When querying the data, again we have the problem of HSql loading the entire data set into memory.  I.e. we could not query the data at all with the default 128MB of JVM memory allocated.  Setting -Xmx1G works around this problem.

Code and SQL Changes

We could not create the index due to this error:

java.sql.SQLException: java.io.IOException: S1000 Data file size limit is reached in statement [CREATE INDEX IP_INFO_I01 ON IP_INFO(IP_TO)]

We tried to load only 100,000 rows to see if the index is actually used but the best query rate we could get was 17 / sec so if the index was used the performance definitely would not scale to 320 times that data size.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.


H2 also required changes to the JVM settings to work properly.

JVM Memory Settings

With the default 128M JVM settings we were able to reach just over 400,000 rows loaded before running out of heap space.  Setting the heap to -Xmx1G fixed this issue.  Similarly to HSQLDB it looks like H2 loads the entire data set into memory for the transaction.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.


Derby provides the best fit for this use case due to:

  1. Concurrency: multiple threads can query embedded Derby within the same JVM process.
  2. The performance of Derby in multi-threaded mode outstrips H2’s single threaded performance on a 2 core machine.  We expect this to scale linearly to our target 16 core production deployment.
  3. Sharing process memory across multiple threads allows us to scale look up data size while maximizing shared cache hits.

However, H2 demonstrated superior single-threaded performance and admirable load speeds and used 40% less space on-disk than Derby.

All-in-all Derby wins for this use case, and H2 gets admirable mention.

Inverse REST

The principles of REST allow HTTP to scale to Internet user volumes, and code-on-demand provides one of the building blocks of REST.  Code-on-demand allows rich content on the browser via JavaScript or browser plug-ins, and this technique has matured so much that it requires minimal server interaction to run even the most sophisticated applications.

In short, code-on-demand enables scaling across across users by moving the code (logic) to the consumer, instead of requiring the consumer to make a remote request to the code.

It follows logically that any successful approach to scaling across big data requires inverting REST and executing the code as close to the data as possible, rather than trying to move big data to the code.

Likewise, the web architecture scales across users by utilizing caching to provide data locality for shared data.  Scaling across big data also requires data locality for reference data. We must move our small data close to our big data to scale efficiently.

The major flaw designed into many current big data applications involves the failure to utilize the inverse of REST: SOA and integration vendors sell customers on the idea that big data problems can be solved by moving all data through a layer of middle-ware: whether this be a J2EE application server, a service bus, or an integration tool.  I have literally spent years of my career trying to tune and optimize middle-ware solutions for big data.  That’s why I can say definitively that the middle-ware concept does very well at selling a lot of consulting hours, software licenses, and hardware.  What it does not do is scale to big data.

You could code all the big data logic in stored procedures, assuming willingness to embed business logic into a closed system, and assuming that a database will scale to your data volumes.  Database vendors are only beginning to utilize Inverse REST: evaluating filters , transformations, and lookups in the storage pipeline is a new (or non-existent) feature in most DBMS systems.  Yet another opportunity for vendor lock-in.

Hadoop Map Reduce follows an open system implementation of inverse REST.

Regardless of who wins that battle between RDBMS and Map/Reduce one thing is certain: anyone not leveraging the principles of Inverse REST will be left in the dust.

Google, Yahoo, Facebook, StumbleUpon, and others have already hit the wall, and it’s only a matter of time before we all do.