Tag Archives: Avro

Hadoop Developer Training by Matt

100% PowerPoint free!  Learn at your own pace with 10 hours of video training and 15+ hours of in-depth exercises created by Matt Pouttu-Clarke and available exclusively through Harvard Innovation Labs on the Experfy Training Platform.

  • Learn to develop industrial strength MapReduce applications hands-on with tricks of the trade you will find nowhere else.
  • Apply advanced concepts such as Monte Carlo Simulations, Intelligent Hashing, Push Predicates, and Partition Pruning.
  • Learn to produce truly reusable User Defined Functions (UDFs) which stand the test of time and work seamlessly with multiple Hadoop tools and distributions.
  • Learn the latest industry best practices on how to utilize Hadoop ecosystem tools such as Hive, Pig, Flume, Sqoop, and Oozie in an Enterprise context.

Click here for more info!

unionOf for Cascading.Avro

Previous posts noted how Cascading provides greater flexibility and testability than relational databases for ETL, and also bench-marked Avro versus text formats for large scale information processing. Recently I released a patch to Cascading.Avro which provides even more power and flexibility over traditional RDBMS-based data processing. This new AvroScheme.unionOf utility method allows better support for Avro schema evolution without needing centralized meta data store and without having to re-format all of your historical data to the new format. Unlike a traditional SQL UNION statement, AvroScheme.unionOf dynamically adds columns and converts data types as necessary.  There is no need to specify field names or data types up front since all Avro files contain self-describing meta data.

How it Works

Say that we have two Avro files: one representing an older format, and another representing the newer data format.

Old format:

ID (int) Amt (float)
1 22000.22
2 33000.33

New format:

ID (long) Amt (float) Currency (String)
3 11000.11 EUR

When you use AvroScheme.unionOf, with the directory containing the two Avro files as input, we can create a tap capable of reading all the data in all the folder(s) specified:

  Tap avroSource = new Hfs(AvroScheme.unionOf(input), input);

And when we read the tap, we get the following:

ID (long) Amt (float) Currency (String)
1 22000.22 <null>
2 33000.33 <null>
3 11000.11 EUR

As you can see above, the Scheme standardized the data types and added null values for fields not present in the old Avro Schema, allowing Cascading GroupBy or CoGroup to function normally with the input data.
Without utilizing AvroScheme.unionOf we would need to either convert all existing data to the new format, or store the set of names and data types for each Schema and use a custom flow to coerce the types and add default values. Typically, by using AvroScheme.unionOf I have seen ~80% code reduction for reading existing Avro files, in addition to the support for evolving schemata.

Support for MultiSourceTap

If we need to read multiple directories containing Avro files, or multiple Avro files in different directories we need to use the Cascading MultiSourceTap to read them as one data flow. AvroScheme.unionOf supports this by passing an array of directory or file names:

  AvroScheme scheme = AvroScheme.unionOf(inputs);
  Tap[] avroSources = new Tap[inputs.length];
  for(int x = 0; x < inputs.length; x++) {
    avroSources[x] = new Hfs(scheme, inputs[x]);
  MultiSourceTap multiSourceTap = new MultiSourceTap(avroSources);

Support for Avro Projection

Let’s say we only need to see the ID from each record, regardless of the underlying schema data type.  Then we can use the following call to produce the desired output:

  Tap avroSource = new Hfs(
        AvroScheme.unionOf(new Fields("ID"), out), out);

And the output looks like:

ID (long)

Availability and Limitations

I have released a patch to BixoLabs adding support for unionOf as well as simple type conversion support.  Please check the Cascading.Avro site for release details. Currently only default toString() conversions to String are supported and nested Map and List fields are excluded from the union field set.


Cascading.Avro provides a Cascading Scheme implementation for Apache Avro.  This allows Cascading flows to read and write Avro encoded data with ease.  This post covers the basics of how to use Cascading.Avro and some of the gotchas involved in reading, writing, and transforming data using Avro in Cascading.  We will also walk through a benchmark using Avro versus CSV format to give an idea of the performance benefits and cost savings related to storing and processing data in Avro format.

Writing Avro Data With Cascading

First we need to define some Fields and data types to provide some meta data context.  Let’s assume that we are dealing with IP location data:

Fields fields =
 new Fields(
Class<?>[] types =
 new Class<?>[] {

Then we can initialize an a Sink using AvroScheme, allowing us to write Cascading Tuples into an Avro format:

Tap avro = new Lfs(new AvroScheme(fields, types), "test");

Behind the scenes the AvroScheme automatically creates an Avro Schema using the defined fields and data types.  This saves time and effort involved with creating the Avro schema file.  Now we can write some Tuples to the Tap:

 TupleEntryCollector out = sink.openForWrite(new JobConf());

 TupleEntry t = new TupleEntry(fields, new Tuple(new Object[types.length]));
 t.set("ipFrom", 329807419L);
 t.set("ipTo", 329807422L);
 t.set("countryCode", "UC");
 t.set("countryName", "Undefined Country");
 t.set("region", "Undefined Region");
 t.set("city", "Undefined City");
 t.set("latitude", -10.88377366D);
 t.set("longitude", 40.38827661D);

 t.set("ipFrom", 245235454L);
 t.set("ipTo", 455322234L);
 t.set("countryCode", "UC2");
 t.set("countryName", "Undefined Country2");
 t.set("region", "Undefined Region2");
 t.set("city", "Undefined City2");
 t.set("latitude", 20.7372661D);
 t.set("longitude", -5.8483992D);


A few gotchas here:

  • If we do not pass a Tuple to the constructor for TupleEntry then we get a NullPointerException.
  • If we initialize the Tuple with the default constructor, or initialize with the wrong number of objects in the Object[] then we get the following exception: cascading.tuple.TupleException: failed to set a value, tuple may not be initialized with values, is zero length.
  • Avro is very strict with type conversions, so if we forget the L at the end of IpFrom value constants then we get the error: org.apache.avro.AvroRuntimeException: Not in union [“null”,”long”]: 329807419

I chalk these up to the APIs being fairly new, and while they functionally work as designed the “nice to haves” like automatic type conversions aren’t quite there yet.  All the APIs do encourage re-using value objects which really helps performance in streaming frameworks like Hadoop and Cascading.

After this Cascading has successfully written an Avro binary file to the test-avro directory with the file name part-00000.avro.  The file is in standard Avro platform independent format, and we can read it using any Avro client (Java, Scala, C, C++, others?).  Note that this is a marked advantage over sequence files in that we do not have to use the Hadoop file APIs to read them:

 File file = new File("test-avro/part-00000.avro");
 DataFileReader<GenericRecord> in =
   new DataFileReader<GenericRecord>(
     file, new GenericDatumReader<GenericRecord>());
 System.out.println(in.getSchema());  //Print the schema
 GenericRecord rec = new GenericData.Record(in.getSchema());
 while(in.hasNext()) {
   in.next(rec);  //re-use the same record instead of creating each time
   System.out.println(rec);  //Print the record
   System.out.println("Extracted IP From: " + rec.get("ipFrom"));

And here is the program output:

{"ipFrom": 329807419, "ipTo": 329807422, "countryCode": "UC", "countryName": "Undefined Country", "region": "Undefined Region", "city": "Undefined City", "latitude": -10.88377366, "longitude": 40.38827661}
Extracted IP From: 329807419
{"ipFrom": 245235454, "ipTo": 455322234, "countryCode": "UC2", "countryName": "Undefined Country2", "region": "Undefined Region2", "city": "Undefined City2", "latitude": 20.7372661, "longitude": -5.8483992}
Extracted IP From: 245235454

Note that even though the data is in binary format, if you call toString() on a GenericRecord you get a valid JSON string!

Reading Avro Data With Cascading

As you could imagine, reading Avro data in Cascading.Avro is easier than writing it, since the JSON meta data precedes the data in all Avro files.   For this example, I’ll also demonstrate a useful feature of Avro called “projection”.  That means we can parse only a sub-set of the fields without selecting the whole set of fields.  Cascading.Avro supports projection by specifying a sub-set of the fields when defining the Tap.  Here’s how it works:

 Fields projectedFields =
   new Fields(
 Class<?>[] projectedTypes =
   new Class<?>[] {
 Tap source = new Lfs(new AvroScheme(projectedFields, projectedTypes), "test-avro");
 TupleEntryIterator tuplesIn = source.openForRead(new JobConf());
 while(tuplesIn.hasNext()) {

And here’s the output:

fields: ['ipTo'] tuple: ['329807422']
fields: ['ipTo'] tuple: ['455322234']

There are two useful optimizations built into the Cascading.Avro projection process:

  1. Avro only parses the one field, saving memory and processor time.
  2. The fields are in native Cascading format, ready for use in later Sub-Assemblies or Flows.

Parsing Performance CSV vs. Avro

So I also did a simple test of parsing performance using on one hand a CSV file parsed with a BufferedReader and a Regex versus Avro.  I mainly wanted to see how Avro would perform versus CSV using plain old Java.  My data set was the IP2Location DB5 data set, which has ~3 million rows in CSV format.

Parsing Speed

CSV file with buffered reader and regex : 22150 ms
Avro file (no compression): 1644 ms
Avro is 13.5 times faster than CSV (1350%)

Output Size

Data size (CSV)  : 292,767,737 bytes
Data size (Avro) : 187,551,110 bytes
Space reduction : 35% (no compression)

Code Used for Benchmark

Code for CSV parsing:

 //Input setup
 BufferedReader read =
   new BufferedReader
     (new InputStreamReader
     (new FileInputStream("./target/test.csv")));
 long counter = 0L;
 String line = null;
 long start = System.currentTimeMillis();
 while((line = read.readLine()) != null) {
   String[] splits = line.split("^\"|\"\\,\"|\"\\n\"|\"$|\\n");
   if("BALI".equals(splits[5])) {
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Count with Region : \"BALI\" = " + counter);

Code used for Avro parsing (with projection):

 //Schema setup (originally written as this schema)
 Schema schema =
 //Projection (only decode Region field)
 Schema newSchema =

 //Set up input
 File file = new File("./target/test.avro");
 DatumReader<GenericRecord> decoder =
   new GenericDatumReader<GenericRecord>(schema, newSchema);
 DataFileReader<GenericRecord> reader =
   new DataFileReader<GenericRecord>(file, decoder);

 //Prepare for scan
 GenericRecord rec = new GenericData.Record(newSchema);
 long counter = 0L;
 Utf8 bali = new Utf8("BALI");
 long start = System.currentTimeMillis();
 while(reader.hasNext()) {
   if(rec.get("Region").equals(bali)) {
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Projection : " + rec.toString());
 System.out.println("Count with Region : \"BALI\" = " + counter);

Schema for all data: ipinfo.avsc:

 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The geographic information related to an IP address",
 "fields" : [
 { "name" : "IpFrom",        "type" : "long" },
 { "name" : "IpTo",            "type" : "long" },
 { "name" : "CountryCode",    "type" : "string" },
 { "name" : "CountryName",    "type" : "string" },
 { "name" : "Region",        "type" : "string" },
 { "name" : "City",            "type" : "string" },
 { "name" : "Latitude",        "type" : "double" },
 { "name" : "Longitude",        "type" : "double" }

Schema for Region projection:  ipinfo-region-only.avsc:

 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The Region information related to an IP address",
 "fields" : [
 { "name" : "Region",        "type" : "string" }


So from our examples we can see the following about Cascading.Avro:

  1. The Avro encoding provides superior encoding and decoding speed vs. text formats.
  2. Avro files take up less space, will use less memory, and will read and write faster from an IO perspective vs. text formats.
  3. Avro files provide the benefits of Sequence Files, without requiring Hadoop to create and read the files.  This could be critical for data collection since we could produce the Avro files while collecting data, providing a additional speed and scalability boost to data collectors.
  4. Cascading.Avro eliminates the tedious and error prone process of associating fields and data types with text files during a write.  We are encouraged to access fields by name rather than index, aiding maintainability and extensibility.
  5. Cascading.Avro automatically populates Cascading Fields and data types from the meta data of existing Avro files, so all we do is read the file and the meta data comes with it.
  6. We apply projection simply by specifying a sub-set of fields to select.  Cascading.Avro takes care of creating a projection schema which saves time and improves maintainability.
  7. We never even have to deal with Arvo schema files since Cascading.Avro automatically generates them from Cascading fields.

So in conclusion it looks like using Cascading.Avro is better, stronger, and faster than using either text files or sequence files in Cascading, and built-in schema evolution features of Avro make this a future proof data storage/processing model for Hadoop.