Category Archives: Cascading

Cascading Java framework by Chris Wensel which provides work flow automation, modular job construction, data flow auto-assembly.

Hermetic Lambdas: a Solution for Big Data Dependency Collisions

Dependency collisions furnish plenty of headaches and late nights while implementing Lambda Architecture.  Java class loading containers tend toward complexity and many frameworks such as Hadoop, JCascalog, and Storm provide tons of transitive dependencies but no dependency isolation for third party Lambda code deployed in the same JVM.  So currently to benefit from running in the same JVM, all the transitive dependencies come along for the ride.  This impacts:

  • Portability: Lambda logic shared in the batch layer and the speed layer must cope with at least two different sets of dependencies.
  • Manageability: framework version upgrades may cause hard failures or other incompatibilities in Lambda logic due to changing dependency versions.
  • Performance: solving the problem by forcing JVM isolation incurs overhead from IPC or network chatter.  Likewise, utilizing scripting or dynamic languages merely hides the problem rather than solving it, while imposing additional performance penalties.
  • Complexity: developing Lambdas requires knowledge and accommodation of all the dependencies of all frameworks targeted for deployment.

Many developers and architects rightfully shy away from deploying bloated dependency container frameworks such as Spring or OSGI inside of a big data framework.  Big data and fast data provide enough complexity already.  So lets look at how we can use a simple pattern, along with basic core Java libraries, to avoid “Dependency Hell“.

Signs you may be encountering this problem include these types of exceptions occurring after deployment:

  • java.lang.NoSuchMethodError: when a method no longer exists which your code depends on
  • java.lang.ClassNotFoundException: when a class your code uses cannot be found in the deployed class path
  • java.lang.IllegalAccessException: when conflicting versions of an API mark methods or fields private

Please reference my github project for a complete working implementation along with unit tests.


Shows how the Lambda executes within the big data framework in an isolated dependency context
Lambda Execution Model

Hermetic Classloading

The Java Service Loader framework introduced in JDK 1.6 provides a way to dynamically bind an implementation class to an interface while specifying an alternate class loader.  Loading a Lamda implementation with ServiceLoader forces all code called within the context of the Lamda to use the same class loader.  This allows creating a service which supports parent-first class loading, child-first class loading, or child-only (or hermetic) class loading.  In this case, the hermetic loader prevents any possible dependency collisions.  To create a hermetic loader, we need simply utilize the built-in URLClassLoader in Java.  Our implementation jar can reside on the local file system, on a web server, or in HDFS: anywhere a URL can point to.  For the parent class loader, we specify the Java bootstrap class loader.  So we can implement a hermetic class loading pattern in one line of code:

ServiceLoader<Map> loader = ServiceLoader.load(Map.class, 
    new URLClassLoader(urls, Map.class.getClassLoader()));

Note that we intentionally avoid calling ClassLoader.getSystemClassLoader() in order to prevent the calling context (such as Hadoop or Hive) form polluting the Lambda class path.  Core packages such as java.lang and java.util use the bootstrap class loader, which only carries core Java dependencies shipped as part of the JDK.  The diagram above shows how the LambdaBoot framework fits within a big data framework such as Hadoop.

Mapping the World

In the example above, we use a Map interface to interact with the Lambda.  This allows us to avoid having a separate jar containing a Service Provider Interface (SPI).  Instead, we can subclass the Map and provide any behavior desired by intercepting get and put calls to specific keys.  By optionally returning a Future from a get or put call on the Map, we get asynchronous interaction as well if desired.  In addition, the non-sensitive Map keys can provide metadata facilities.  In the example of linear regression implemented as a Map, we use a conflicting version of Apache Commons Math not yet supported by Hadoop to calculate the regression.

Shows how the LambdaBoot framework can work with deployment tools such as Jenkins, Puppet, and Git
Lambda Deployment Model

Implications

Using a very simple pattern, we can deploy Hermetic Lambdas to any Java environment without fear of dependency collisions.  The ServiceLoader also acts as a service registry, allowing us to browse metadata about available Lambdas, dynamically load new Lamdas, or update existing Lambdas.

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  }
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)
1
2
3

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.

Top 10 Things I Love About Cascading + Hadoop on the Cloud

My recent experience with Map-Reduce on the Cloud really makes me happy I never got away from the tech… here’s why:

  1. No fixed infrastructure investments means that performance optimizations and tuning directly effect the bottom line.
  2. No longer have to estimate hardware for a system that isn’t built yet.
  3. Everything is open source so I don’t have to deal with software vendors.  I can fix things myself if I have to, rely on the community for help, and third party paid support is available if needed.
  4. Leverage the same software stack used by millions of other cloud developers.  No vapor-ware survives here…
  5. Completely open MapReduce implementation within Hadoop allows cool solutions like using an embedded Java database for looking up dimension data.
  6. Everything is testable using automated unit testing in the local JVM, leading to code robustness and quality far exceeding any data processing stack I’ve worked on.  To be able to debug the entire Hadoop technology stack on the local JVM is priceless.
  7. Any Java developer can pick up Cascading in a couple of months and be productive.
  8. Cascading joins offer more flexibility than SQL joins.  For example, Cascading OuterJoin supports empty data on either side of the join condition.
  9. Everything is streaming and INSERT only so data processing best practices are self-evident and mandatory in MapReduce, where the same best practices are optional in SQL-based systems.  For example, a true INSERT only database model is fairly rare in the Oracle world; however, in MapReduce an INSERT only model is the only option available.
  10. I can draw any Cascading data processing design using a simple flow chart, and have direct correlation between the design and the finished product every time.

To top it all off, Cascading is a great path for entry level college grads to get into big data processing.  Admittedly, you need to know a lot to make an entire distributed data processing system work on a day in and day out basis.  But you only need to know the fundamentals of any JVM-based language (Scala, Jython, JRuby, Groovy, Clojure, etc) to learn how to process big data with Cascading.  This is the first time in a very long time that any of my employers has been talking about bringing on interns or new grads.

Cascading TapsMap and SubAssembly: Best Practices

Cascading Pipes provide a lot of power to the experienced user due to all the things they take care of automatically; however, the down side of this is that we can easily do things the hard way without knowing it. This can effect the performance, maintainability, re-usability of flows and can trigger the need for significant re-factoring later on. One best practice to look at is how to use the taps map and SubAssembly together to provide separation of concerns between the taps (storing the data) and the pipes (processing the data). Using taps map and SubAssembly properly produces fully re-usable pipes completely independent of the input and output data formats.

Flow Before Taps Map and SubAssembly

So as we can see from the flow above, we are selecting employees with salary < 1000 and counting them by department. The code might look something like this:

//Define source tap
Tap source =
    new Hfs(new TextDelimited(new Fields("emp_name",
                                         "salary",
                                         "dept_id"),
                              "\t"),
            "emp.tsv"
    );

//Wire the pipe
Pipe pipe = new Each("emp",
                     new Fields("salary"),
                     new ExpressionFilter("salary < 1000",
                                          Double.class));
pipe = new GroupBy(pipe, new Fields("dept_id"));
pipe = new Every(pipe, new Count());

//Define the sink tap
Tap sink =
    new Hfs(new TextDelimited(new Fields("dept_id",
                                         "count"),
                              "\t"),
            "output",
            true
    );

//Create the flow
Flow flow = new FlowConnector().connect(source,
                                        sink,
                                        pipe);

//Run the flow
flow.complete();

While the code is simple enough, it suffers from several issues with performance, re-usability, and maintainability:

  1. Supports only TSV as the input and output format
  2. It assumes the URLs of the input as “emp.tsv”
  3. Output directory is assumed to be “output”
  4. Field names are hard-coded
  5. There is no way to wire the pipe into another pipes except by serializing input and output files (via a Cascade)

Passing parameters or configuration to the job could resolve first four issues, however the last issue produces large performance penalties and has no easy solution without some major re-factoring.

Suggested Re-factoring

By implementing a SubAssembly and utilizing a Taps Map, we can dramatically improve both the maintainability and performance of the code.

Implementing a SubAssembly

Cascading SubAssembly PipePlease note the following when creating the SubAssembly:

  1. Provide a default constructor with default tap names.
  2. Create String constants for all head and tail pipes for use in the TapsMap.
  3. Provide a secondary constructor to allow linking and re-using multiple SubAssemblies.

Example SubAssembly code:

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * Abstracts Pipe from Taps.
 * @author mpouttuclarke
 *
 */
public class EmpSubAssembly
 extends SubAssembly
{
 public static final String INPUT_PIPE_NAME = "input";
 public static final String OUTPUT_PIPE_NAME = "output";
 private static final long serialVersionUID = 1L;

 /**
 * Default constructor.
 */
 public EmpSubAssembly()
 {
   this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);
 }

 /**
 * Provides a way to rename the head and tail pipes.
 * @param input
 */
 public EmpSubAssembly(Pipe input, String tailName)
 {
   super();

   // Wire the pipe
   Pipe pipe = new Each(input,
                        new Fields("salary"),
                        new ExpressionFilter("salary < 1000",
                                             Double.class));
   pipe = new GroupBy(pipe, new Fields("dept_id"));
   pipe = new Every(pipe, new Count());
   pipe = new Pipe(tailName, pipe); //Name each tail

   setTails(pipe);
 }

}

Binding Taps with a TapsMap

Now we can bind the taps with a TapsMap, allowing separation of concerns between the data locations and format and the pipe. This means we can re-wire the pipe SubAssembly with any other pipes or SubAssemblies we wish, without effecting performance of the application as a whole.

Example of a TapsMap:

 //Define source tap
 Tap source =
   new Hfs(new TextDelimited(new Fields("emp_name",
                                        "salary",
                                        "dept_id"),
           "\t"),
           "emp.tsv"
 );

 //Define the sink
 Tap sink =
   new Hfs(new TextDelimited(new Fields("dept_id",
                                        "count"),
           "\t"),
           "output",
           true
 );

 //Create input taps map (same for output if multiple tail pipes)
 Map tapsMap = new HashMap();
 tapsMap.put(EmpSubAssembly.INPUT_PIPE_NAME, source);

 //SubAssembly
 Pipe pipe = new EmpSubAssembly();

 //Create the flow
 Flow flow =
   new FlowConnector().connect(tapsMap,
                               sink,
                               pipe);

 //Run the flow
 flow.complete()

Results of the Re-factoring

After the changes, we can take full advantage of the Flow auto-wiring capabilities of Cascading:

  1. We can re-wire and re-use any number of pipes and SubAssemblies in a single Flow.
  2. We can change the data locations and formats at will without effecting the functionality of the pipes.
  3. Cascading takes care of mapping the taps, no matter how deeply nested within multiple SubAssemblies the heads and tails are.
  4. We can re-name the pipes at will to prevent naming conflicts across a given Flow.

Cascading.Avro

Cascading.Avro provides a Cascading Scheme implementation for Apache Avro.  This allows Cascading flows to read and write Avro encoded data with ease.  This post covers the basics of how to use Cascading.Avro and some of the gotchas involved in reading, writing, and transforming data using Avro in Cascading.  We will also walk through a benchmark using Avro versus CSV format to give an idea of the performance benefits and cost savings related to storing and processing data in Avro format.

Writing Avro Data With Cascading

First we need to define some Fields and data types to provide some meta data context.  Let’s assume that we are dealing with IP location data:

Fields fields =
 new Fields(
 "ipFrom",
 "ipTo",
 "countryCode",
 "countryName",
 "region",
 "city",
 "latitude",
 "longitude"
 );
Class<?>[] types =
 new Class<?>[] {
 Long.class,
 Long.class,
 String.class,
 String.class,
 String.class,
 String.class,
 Double.class,
 Double.class
 };

Then we can initialize an a Sink using AvroScheme, allowing us to write Cascading Tuples into an Avro format:

Tap avro = new Lfs(new AvroScheme(fields, types), "test");

Behind the scenes the AvroScheme automatically creates an Avro Schema using the defined fields and data types.  This saves time and effort involved with creating the Avro schema file.  Now we can write some Tuples to the Tap:

 TupleEntryCollector out = sink.openForWrite(new JobConf());

 TupleEntry t = new TupleEntry(fields, new Tuple(new Object[types.length]));
 t.set("ipFrom", 329807419L);
 t.set("ipTo", 329807422L);
 t.set("countryCode", "UC");
 t.set("countryName", "Undefined Country");
 t.set("region", "Undefined Region");
 t.set("city", "Undefined City");
 t.set("latitude", -10.88377366D);
 t.set("longitude", 40.38827661D);
 out.add(t);

 t.set("ipFrom", 245235454L);
 t.set("ipTo", 455322234L);
 t.set("countryCode", "UC2");
 t.set("countryName", "Undefined Country2");
 t.set("region", "Undefined Region2");
 t.set("city", "Undefined City2");
 t.set("latitude", 20.7372661D);
 t.set("longitude", -5.8483992D);
 out.add(t);

 out.close();

A few gotchas here:

  • If we do not pass a Tuple to the constructor for TupleEntry then we get a NullPointerException.
  • If we initialize the Tuple with the default constructor, or initialize with the wrong number of objects in the Object[] then we get the following exception: cascading.tuple.TupleException: failed to set a value, tuple may not be initialized with values, is zero length.
  • Avro is very strict with type conversions, so if we forget the L at the end of IpFrom value constants then we get the error: org.apache.avro.AvroRuntimeException: Not in union [“null”,”long”]: 329807419

I chalk these up to the APIs being fairly new, and while they functionally work as designed the “nice to haves” like automatic type conversions aren’t quite there yet.  All the APIs do encourage re-using value objects which really helps performance in streaming frameworks like Hadoop and Cascading.

After this Cascading has successfully written an Avro binary file to the test-avro directory with the file name part-00000.avro.  The file is in standard Avro platform independent format, and we can read it using any Avro client (Java, Scala, C, C++, others?).  Note that this is a marked advantage over sequence files in that we do not have to use the Hadoop file APIs to read them:

 File file = new File("test-avro/part-00000.avro");
 DataFileReader<GenericRecord> in =
   new DataFileReader<GenericRecord>(
     file, new GenericDatumReader<GenericRecord>());
 System.out.println(in.getSchema());  //Print the schema
 GenericRecord rec = new GenericData.Record(in.getSchema());
 while(in.hasNext()) {
   in.next(rec);  //re-use the same record instead of creating each time
   System.out.println(rec);  //Print the record
   System.out.println("Extracted IP From: " + rec.get("ipFrom"));
 }
 in.close();

And here is the program output:

{"type":"record","name":"CascadingAvroSchema","namespace":"","fields":[{"name":"ipFrom","type":["null","long"],"doc":""},{"name":"ipTo","type":["null","long"],"doc":""},{"name":"countryCode","type":["null","string"],"doc":""},{"name":"countryName","type":["null","string"],"doc":""},{"name":"region","type":["null","string"],"doc":""},{"name":"city","type":["null","string"],"doc":""},{"name":"latitude","type":["null","double"],"doc":""},{"name":"longitude","type":["null","double"],"doc":""}]}
{"ipFrom": 329807419, "ipTo": 329807422, "countryCode": "UC", "countryName": "Undefined Country", "region": "Undefined Region", "city": "Undefined City", "latitude": -10.88377366, "longitude": 40.38827661}
Extracted IP From: 329807419
{"ipFrom": 245235454, "ipTo": 455322234, "countryCode": "UC2", "countryName": "Undefined Country2", "region": "Undefined Region2", "city": "Undefined City2", "latitude": 20.7372661, "longitude": -5.8483992}
Extracted IP From: 245235454

Note that even though the data is in binary format, if you call toString() on a GenericRecord you get a valid JSON string!

Reading Avro Data With Cascading

As you could imagine, reading Avro data in Cascading.Avro is easier than writing it, since the JSON meta data precedes the data in all Avro files.   For this example, I’ll also demonstrate a useful feature of Avro called “projection”.  That means we can parse only a sub-set of the fields without selecting the whole set of fields.  Cascading.Avro supports projection by specifying a sub-set of the fields when defining the Tap.  Here’s how it works:

 Fields projectedFields =
   new Fields(
     "ipTo"
   );
 Class<?>[] projectedTypes =
   new Class<?>[] {
     Long.class
   };
 Tap source = new Lfs(new AvroScheme(projectedFields, projectedTypes), "test-avro");
 TupleEntryIterator tuplesIn = source.openForRead(new JobConf());
 while(tuplesIn.hasNext()) {
   System.out.println(tuplesIn.next());
 }
 tuplesIn.close();

And here’s the output:

fields: ['ipTo'] tuple: ['329807422']
fields: ['ipTo'] tuple: ['455322234']

There are two useful optimizations built into the Cascading.Avro projection process:

  1. Avro only parses the one field, saving memory and processor time.
  2. The fields are in native Cascading format, ready for use in later Sub-Assemblies or Flows.

Parsing Performance CSV vs. Avro

So I also did a simple test of parsing performance using on one hand a CSV file parsed with a BufferedReader and a Regex versus Avro.  I mainly wanted to see how Avro would perform versus CSV using plain old Java.  My data set was the IP2Location DB5 data set, which has ~3 million rows in CSV format.

Parsing Speed

CSV file with buffered reader and regex : 22150 ms
Avro file (no compression): 1644 ms
Avro is 13.5 times faster than CSV (1350%)

Output Size

Data size (CSV)  : 292,767,737 bytes
Data size (Avro) : 187,551,110 bytes
Space reduction : 35% (no compression)

Code Used for Benchmark

Code for CSV parsing:

 //Input setup
 BufferedReader read =
   new BufferedReader
     (new InputStreamReader
     (new FileInputStream("./target/test.csv")));
 long counter = 0L;
 String line = null;
 long start = System.currentTimeMillis();
 while((line = read.readLine()) != null) {
   String[] splits = line.split("^\"|\"\\,\"|\"\\n\"|\"$|\\n");
   if("BALI".equals(splits[5])) {
     counter++;
   }
 }
 read.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Count with Region : \"BALI\" = " + counter);

Code used for Avro parsing (with projection):

 //Schema setup (originally written as this schema)
 Schema schema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo.avsc"));
 //Projection (only decode Region field)
 Schema newSchema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo-region-only.avsc"));

 //Set up input
 File file = new File("./target/test.avro");
 DatumReader<GenericRecord> decoder =
   new GenericDatumReader<GenericRecord>(schema, newSchema);
 DataFileReader<GenericRecord> reader =
   new DataFileReader<GenericRecord>(file, decoder);

 //Prepare for scan
 GenericRecord rec = new GenericData.Record(newSchema);
 long counter = 0L;
 Utf8 bali = new Utf8("BALI");
 long start = System.currentTimeMillis();
 while(reader.hasNext()) {
   reader.next(rec);
   if(rec.get("Region").equals(bali)) {
     counter++;
   }
 }
 reader.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Projection : " + rec.toString());
 System.out.println("Count with Region : \"BALI\" = " + counter);

Schema for all data: ipinfo.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The geographic information related to an IP address",
 "fields" : [
 { "name" : "IpFrom",        "type" : "long" },
 { "name" : "IpTo",            "type" : "long" },
 { "name" : "CountryCode",    "type" : "string" },
 { "name" : "CountryName",    "type" : "string" },
 { "name" : "Region",        "type" : "string" },
 { "name" : "City",            "type" : "string" },
 { "name" : "Latitude",        "type" : "double" },
 { "name" : "Longitude",        "type" : "double" }
 ]
}

Schema for Region projection:  ipinfo-region-only.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The Region information related to an IP address",
 "fields" : [
 { "name" : "Region",        "type" : "string" }
 ]
}

Conclusion

So from our examples we can see the following about Cascading.Avro:

  1. The Avro encoding provides superior encoding and decoding speed vs. text formats.
  2. Avro files take up less space, will use less memory, and will read and write faster from an IO perspective vs. text formats.
  3. Avro files provide the benefits of Sequence Files, without requiring Hadoop to create and read the files.  This could be critical for data collection since we could produce the Avro files while collecting data, providing a additional speed and scalability boost to data collectors.
  4. Cascading.Avro eliminates the tedious and error prone process of associating fields and data types with text files during a write.  We are encouraged to access fields by name rather than index, aiding maintainability and extensibility.
  5. Cascading.Avro automatically populates Cascading Fields and data types from the meta data of existing Avro files, so all we do is read the file and the meta data comes with it.
  6. We apply projection simply by specifying a sub-set of fields to select.  Cascading.Avro takes care of creating a projection schema which saves time and improves maintainability.
  7. We never even have to deal with Arvo schema files since Cascading.Avro automatically generates them from Cascading fields.

So in conclusion it looks like using Cascading.Avro is better, stronger, and faster than using either text files or sequence files in Cascading, and built-in schema evolution features of Avro make this a future proof data storage/processing model for Hadoop.

Java Embedded DB for IP2Location in Hadoop

Since the IP geographic data operates on ranges of IP addresses, we cannot simply join the log data to the IP data (using RDBMS or MapReduce), and we cannot use a simple key/value cache to look-up the IP data.  Currently the entire IP2Location DB-5 data set consumes 1+ GB of memory when loaded into a Java object array.  While we can currently fit this in memory with the current infrastructure we will have issues if this data grows significantly beyond it’s current size.  Therefore we have to look at some alternatives for looking up the data without loading it all in memory.  We cannot use a middleware or shared database solution since these will quickly become overwhelmed by requests from our cluster.  Likewise, we cannot afford to take a network hit for every look-up and we currently cannot batch look-up requests to reduce network hits.   We need a shared-nothing architecture, and therefore copying a Java embedded database locally to pull our small data close to our big data seems the best approach.  This page evaluates several Java embedded databases as a fit for the use case.

This test is Hadoop/Cascading specific in that it uses the standard set of jar files included in the class path of Apache Hadoop 20.2 and Cascading 1.2.  For example, we used the hsqldb-1.8.0.10.jar bundled with Hadoop.  Leveraging an embedded DB which is not only thread safe but also concurrent will allow re-use of in-process memory cache across multiple threads using the Hadoop Task JVM Reuse feature.

Databases Evaluated

The following databases were evaluated:

  1. Baseline load into in-memory array.
  2. Apache Derby 10.7.1.1
  3. HSQLDB 1.8.0.10
  4. H2 1.2.147

Each of the databases were set up, configured, and queried using identical SQL scripts and JDBC code, except where specifically noted in the test results section.  Also, we require the exclusive use of a JDBC interface to the database so we can easily swap DBs based on performance and stability.

Test Results

Test results for the in-memory baseline and each of the embedded DBs follows:

DB Load Time Load Rows Load Rate IP Lookup Rate
Derby 1 min, 51 sec 3,237,642 29168 / sec 82486 / sec ****
H2 1 min, 10 sec 3,237,642 46252 / sec *** 61527 / sec
in-memory array 32 sec 3,237,642 101176 / sec 133 / sec
HSQLDB 3 min, 28 sec 3,237,642 15565 / sec * 1 /  22 sec **

* = -Xmx3G JVM setting was required to load the data

** = -Xmx1G JVM setting was required to query the data

*** = -Xmx1G JVM setting was required to load the data

**** = Only Derby supported concurrent thread access, and 4 threads were used in the IP Lookup test

Test Cases

Create Table

The table was created with the following columns as:

CREATE TABLE IP_INFO (
  IP_FROM BIGINT,
  IP_TO BIGINT,
  COUNTRY_CODE VARCHAR(2),
  COUNTRY_NAME VARCHAR(64),
  REGION VARCHAR(64),
  CITY VARCHAR(64),
  LATITUDE DOUBLE,
  LONGITUDE DOUBLE
)

Load Data

The following load scripts where used to load up to the maximum 3,237,642 rows.  Data set based on IP2Location DB-5.  Some of the Java database tested could not adequately handle the data size so for those databases a smaller number of rows were loaded, as noted in the test results section:

INSERT INTO IP_INFO (
  IP_FROM,
  IP_TO,
  COUNTRY_CODE,
  COUNTRY_NAME,
  REGION,
  CITY,
  LATITUDE,
  LONGITUDE
) VALUES (
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?,
  ?
)

In addition, the following JDBC code to load the data utilizes prepared statements and JDBC batch processing:

Connection conn = DriverManager.getConnection(<db url>);

PreparedStatement ps = conn.prepareStatement(insertString);

// read data file and get fields for each line ...

// for each field, bind values to the PreparedStatement
ps.setLong(1, g.getIpFrom());

ps.addBatch();

// add batch for every line, and execute batch every 100 lines
if(lineNumber % 100 == 0) {
  ps.executeBatch();
}
// at the end, execute final batch and commit ...
ps.executeBatch();
conn.commit();

Create Index

A b-tree index on IP_TO is required to select the matching IP range:

CREATE INDEX IP_INFO_I01 ON IP_INFO ( IP_TO )

Select IP Number in Range

The first row matching the following SELECT query matches the geographic information for an IP address range.  Any additional rows are not read and are ignored:

SELECT *
FROM IP_INFO
WHERE IP_TO >= ?
ORDER BY IP_TO

The following JDBC code utilizes prepared statements and bind variables to select the first matching row:

PreparedStatement ps = conn.prepareStatement(<sql>);
ps.setFetchSize(1);
ps.setMaxRows(1); 

// the following code iterates for each look up test execution
long random = (long)(Math.random() * Integer.MAX_VALUE);
ps.setLong(1, random);
ResultSet rs = ps.executeQuery();
if(rs.next()) {
  // test getting fields, and verify result
}

Test System Configuration

Test was run on the following platform configuration:

Hardware Overview:

  Model Name:    MacBook Pro
  Model Identifier:    MacBookPro6,2
  Processor Name:    Intel Core i7
  Processor Speed:    2.66 GHz
  Number Of Processors:    1
  Total Number Of Cores:    2
  L2 Cache (per core):    256 KB
  L3 Cache:    4 MB
  Memory:    4 GB
  Processor Interconnect Speed:    4.8 GT/s
  Boot ROM Version:    MBP61.0057.B0C

  Serial-ATA:
    Intel 5 Series Chipset:

      Vendor:    Intel
      Product:    5 Series Chipset
      Link Speed:    3 Gigabit
      Negotiated Link Speed:    1.5 Gigabit
      Description:    AHCI Version 1.30 Supported

  Disk:
    Hitachi HTS545050B9SA02:

      Capacity:    500.11 GB (500,107,862,016 bytes)
      Model:    Hitachi HTS545050B9SA02
      Revision:    PB4AC60W

System Software Overview:

  System Version:    Mac OS X 10.6.5 (10H574)
  Kernel Version:    Darwin 10.5.0

JVM Overview:

  java version "1.6.0_22"
  Java(TM) SE Runtime Environment (build 1.6.0_22-b04-307-10M3261)
  Java HotSpot(TM) 64-Bit Server VM (build 17.1-b03-307, mixed mode)

Modifications to Tests

Some tests had to be modified, either due to time constraints (test could not run in a reasonable time) or because the test crashed or hung.

In Memory Array

We had to run the test with JVM parameter -Xmx2G to ensure all the data could be loaded into memory.  Since we are not using a DB, the query logic is simply to iterate through the array to find the first row with IP_TO >= target IP.

Please note: we cannot use a binary search algorithm since we do not know the specific value we are searching for a priori.  We considered writing a binary search variant to find the first element >= the target IP, however we could not find a library which does this and we don’t have the time to write and test one ourselves.

Derby

We had to set the block cache size above the default to improve cache hits for multi-threaded access:

System.getProperties().setProperty("derby.storage.pageCacheSize", "524288");

We set the JVM -xmx1G for the IP Lookups and used the JVM default 128M for the data load.

HSQLDB

We had to make many modifications to the code, the DML, and SQL as well as the JVM memory settings to make the data load work. We could not find a way to force HSql to use the b-tree index so the query performance represents the default plan selected by HSQLDB.

Here is a description of the changes and why they were made:

JVM Memory Settings

We started out with the 128M default JVM heap and received an out of memory error before we completely loaded 200,000 rows.  We had to conclude that HSql only saves to disk after a commit or checkpoint, and keeps the entire set of data buffered in memory.  This was confirmed when we added a CHECKPOINT after each 100,000 rows and still got the same “java.lang.OutOfMemoryError: Java heap space”, but we did see data on disk after each checkpoint.

So we had to set memory to -Xmx3G to load the data set, and we had to take the CHECKPOINT out because it looks like HSql writes all the data to one big file.  So every time you do a CHECKPOINT the overhead becomes larger and larger to write the data.  It is also possible it’s writing the entire file with every checkpoint as well.  Definitely never use CHECKPOINT or incremental commits when writing data set on GB+ sizes, and make sure your available memory is 3 times the size of the data on disk after the load.

When querying the data, again we have the problem of HSql loading the entire data set into memory.  I.e. we could not query the data at all with the default 128MB of JVM memory allocated.  Setting -Xmx1G works around this problem.

Code and SQL Changes

We could not create the index due to this error:

java.sql.SQLException: java.io.IOException: S1000 Data file size limit is reached in statement [CREATE INDEX IP_INFO_I01 ON IP_INFO(IP_TO)]

We tried to load only 100,000 rows to see if the index is actually used but the best query rate we could get was 17 / sec so if the index was used the performance definitely would not scale to 320 times that data size.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

H2

H2 also required changes to the JVM settings to work properly.

JVM Memory Settings

With the default 128M JVM settings we were able to reach just over 400,000 rows loaded before running out of heap space.  Setting the heap to -Xmx1G fixed this issue.  Similarly to HSQLDB it looks like H2 loads the entire data set into memory for the transaction.

We had to specify CREATE CACHED TABLE in the table DDL to get the data load to work.

Conclusion

Derby provides the best fit for this use case due to:

  1. Concurrency: multiple threads can query embedded Derby within the same JVM process.
  2. The performance of Derby in multi-threaded mode outstrips H2’s single threaded performance on a 2 core machine.  We expect this to scale linearly to our target 16 core production deployment.
  3. Sharing process memory across multiple threads allows us to scale look up data size while maximizing shared cache hits.

However, H2 demonstrated superior single-threaded performance and admirable load speeds and used 40% less space on-disk than Derby.

All-in-all Derby wins for this use case, and H2 gets admirable mention.

Refactoring a Hadoop Job with Cascading

This post covers an approach to quickly and efficiently convert an existing Hadoop MapReduce job to a Cascading flow, with a focus on leveraging existing test cases and preserving backward compatibility where ever possible.

Areas of Change

With the assumption that the MapReduce job follows the example of existing documentation and books on Hadoop, we need to re-factor the job driver code in specific ways to convert to a Cascading flow.  We must re-factor the following areas out of the job driver and into a separate class:

  1. The job driver typically handle preconditions and validation of external dependencies (such as DistributedCache objects, etc.)
  2. The creation of the JobConf typically occurs in the job driver class, this conflicts with Cascading’s separation of the job configuration logic from the job execution logic
  3. The job driver typically handles post conditions, however we need to use Cascading’s FlowListener interface to handle this
  4. The job driver may handle deleting the job output directory, while Cascading handles this automatically

Approach

The overall approach below preserves as much backward compatibility as possible while leveraging existing unit tests.

  1. Create a JobHelper class to provide separation of concerns required by Cascading.  The JobHelper plays the same role as the View Helper in a MVC application, as it provides clean separation between the view (job execution context) and the model (job business logic).
  2. Move all preconditions, post-conditions, and job configuration to the JobHelper.
  3. Re-test the existing job driver with the re-factored code and existing unit test cases.  When you are sure that the existing job driver works with the JobHelper, then you are ready to move to Cascading.
  4. Implement cascading.flow.FlowListener and call the appropriate JobHelper pre and post-condition methods in the various event handlers provided (I named the methods the same so this should be obvious!).
  5. Create a new unit test extending CascadingTestCase.
    1. Create a new Cascading MapReduceFlow, passing the JobConf from the JobHelper.  Use the deleteSinkOnInit constructor to indicate if you want to delete the output directory automatically.
    2. Add your FlowListener implementation to the MapReduceFlow using the addListener method.
    3. Call Flow.complete() to execute the flow.
  6. Optionally, re-factor the existing job driver to execute the Cascading flow.

You should now have two different versions of the same job, one using a job driver and one using Cascading.  Also, you can still call the existing job driver code from the command line as before, as well as having the flexibility to add the job to a Cascading flow!

Code Example

Here’s an example interface for a JobHelper:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

/**
 * This interface provides a bridge between a Hadoop job driver and Cascading.
 * Serves a role very similar to ViewHelper in MVC.
 * @author mpouttuclarke
 *
 */
@SuppressWarnings("deprecation")
public interface JobHelper {
 /**
 * Logic to execute to initialize the job flow.  Should create the JobConf
 * here for later use, but may also create DistributedCache, etc.
 * @param config
 * @param parms
 * @throws IOException
 */
 public void initialize(Configuration config, Object...parms) throws IOException;
 /**
 * Logic to execute after job initialization but before job execution.
 * @throws IOException
 */
 public void onStarting() throws IOException;
 /**
 * Gets the JobConf created created earlier, used to run the job in
 * either raw Hadoop or Cascading.
 * @return
 */
 public JobConf getJobConf();
 /**
 * Logic to execute if the job is stopped.
 * @throws IOException
 */
 public void onStopping() throws IOException;
 /**
 * Logic to execute if the job completes.
 * @throws IOException
 */
 public void onCompleted() throws IOException;
 /**
 * What to do if a fatal error occurs in the job.
 * @param error
 */
 public boolean onThrowable(Throwable throwable);
}

References

Here’s an example of how to run a raw MapReduce job in a Cascade as some background. If we added a JobHelper and FlowListener to this example we could remove all the JobConf code out of the job driver and make any M/R job a first class participant in a Cascade.