Cascading.Avro

Cascading.Avro provides a Cascading Scheme implementation for Apache Avro.  This allows Cascading flows to read and write Avro encoded data with ease.  This post covers the basics of how to use Cascading.Avro and some of the gotchas involved in reading, writing, and transforming data using Avro in Cascading.  We will also walk through a benchmark using Avro versus CSV format to give an idea of the performance benefits and cost savings related to storing and processing data in Avro format.

Writing Avro Data With Cascading

First we need to define some Fields and data types to provide some meta data context.  Let’s assume that we are dealing with IP location data:

Fields fields =
 new Fields(
 "ipFrom",
 "ipTo",
 "countryCode",
 "countryName",
 "region",
 "city",
 "latitude",
 "longitude"
 );
Class<?>[] types =
 new Class<?>[] {
 Long.class,
 Long.class,
 String.class,
 String.class,
 String.class,
 String.class,
 Double.class,
 Double.class
 };

Then we can initialize an a Sink using AvroScheme, allowing us to write Cascading Tuples into an Avro format:

Tap avro = new Lfs(new AvroScheme(fields, types), "test");

Behind the scenes the AvroScheme automatically creates an Avro Schema using the defined fields and data types.  This saves time and effort involved with creating the Avro schema file.  Now we can write some Tuples to the Tap:

 TupleEntryCollector out = sink.openForWrite(new JobConf());

 TupleEntry t = new TupleEntry(fields, new Tuple(new Object[types.length]));
 t.set("ipFrom", 329807419L);
 t.set("ipTo", 329807422L);
 t.set("countryCode", "UC");
 t.set("countryName", "Undefined Country");
 t.set("region", "Undefined Region");
 t.set("city", "Undefined City");
 t.set("latitude", -10.88377366D);
 t.set("longitude", 40.38827661D);
 out.add(t);

 t.set("ipFrom", 245235454L);
 t.set("ipTo", 455322234L);
 t.set("countryCode", "UC2");
 t.set("countryName", "Undefined Country2");
 t.set("region", "Undefined Region2");
 t.set("city", "Undefined City2");
 t.set("latitude", 20.7372661D);
 t.set("longitude", -5.8483992D);
 out.add(t);

 out.close();

A few gotchas here:

  • If we do not pass a Tuple to the constructor for TupleEntry then we get a NullPointerException.
  • If we initialize the Tuple with the default constructor, or initialize with the wrong number of objects in the Object[] then we get the following exception: cascading.tuple.TupleException: failed to set a value, tuple may not be initialized with values, is zero length.
  • Avro is very strict with type conversions, so if we forget the L at the end of IpFrom value constants then we get the error: org.apache.avro.AvroRuntimeException: Not in union ["null","long"]: 329807419

I chalk these up to the APIs being fairly new, and while they functionally work as designed the “nice to haves” like automatic type conversions aren’t quite there yet.  All the APIs do encourage re-using value objects which really helps performance in streaming frameworks like Hadoop and Cascading.

After this Cascading has successfully written an Avro binary file to the test-avro directory with the file name part-00000.avro.  The file is in standard Avro platform independent format, and we can read it using any Avro client (Java, Scala, C, C++, others?).  Note that this is a marked advantage over sequence files in that we do not have to use the Hadoop file APIs to read them:

 File file = new File("test-avro/part-00000.avro");
 DataFileReader<GenericRecord> in =
   new DataFileReader<GenericRecord>(
     file, new GenericDatumReader<GenericRecord>());
 System.out.println(in.getSchema());  //Print the schema
 GenericRecord rec = new GenericData.Record(in.getSchema());
 while(in.hasNext()) {
   in.next(rec);  //re-use the same record instead of creating each time
   System.out.println(rec);  //Print the record
   System.out.println("Extracted IP From: " + rec.get("ipFrom"));
 }
 in.close();

And here is the program output:

{"type":"record","name":"CascadingAvroSchema","namespace":"","fields":[{"name":"ipFrom","type":["null","long"],"doc":""},{"name":"ipTo","type":["null","long"],"doc":""},{"name":"countryCode","type":["null","string"],"doc":""},{"name":"countryName","type":["null","string"],"doc":""},{"name":"region","type":["null","string"],"doc":""},{"name":"city","type":["null","string"],"doc":""},{"name":"latitude","type":["null","double"],"doc":""},{"name":"longitude","type":["null","double"],"doc":""}]}
{"ipFrom": 329807419, "ipTo": 329807422, "countryCode": "UC", "countryName": "Undefined Country", "region": "Undefined Region", "city": "Undefined City", "latitude": -10.88377366, "longitude": 40.38827661}
Extracted IP From: 329807419
{"ipFrom": 245235454, "ipTo": 455322234, "countryCode": "UC2", "countryName": "Undefined Country2", "region": "Undefined Region2", "city": "Undefined City2", "latitude": 20.7372661, "longitude": -5.8483992}
Extracted IP From: 245235454

Note that even though the data is in binary format, if you call toString() on a GenericRecord you get a valid JSON string!

Reading Avro Data With Cascading

As you could imagine, reading Avro data in Cascading.Avro is easier than writing it, since the JSON meta data precedes the data in all Avro files.   For this example, I’ll also demonstrate a useful feature of Avro called “projection”.  That means we can parse only a sub-set of the fields without selecting the whole set of fields.  Cascading.Avro supports projection by specifying a sub-set of the fields when defining the Tap.  Here’s how it works:

 Fields projectedFields =
   new Fields(
     "ipTo"
   );
 Class<?>[] projectedTypes =
   new Class<?>[] {
     Long.class
   };
 Tap source = new Lfs(new AvroScheme(projectedFields, projectedTypes), "test-avro");
 TupleEntryIterator tuplesIn = source.openForRead(new JobConf());
 while(tuplesIn.hasNext()) {
   System.out.println(tuplesIn.next());
 }
 tuplesIn.close();

And here’s the output:

fields: ['ipTo'] tuple: ['329807422']
fields: ['ipTo'] tuple: ['455322234']

There are two useful optimizations built into the Cascading.Avro projection process:

  1. Avro only parses the one field, saving memory and processor time.
  2. The fields are in native Cascading format, ready for use in later Sub-Assemblies or Flows.

Parsing Performance CSV vs. Avro

So I also did a simple test of parsing performance using on one hand a CSV file parsed with a BufferedReader and a Regex versus Avro.  I mainly wanted to see how Avro would perform versus CSV using plain old Java.  My data set was the IP2Location DB5 data set, which has ~3 million rows in CSV format.

Parsing Speed

CSV file with buffered reader and regex : 22150 ms
Avro file (no compression): 1644 ms
Avro is 13.5 times faster than CSV (1350%)

Output Size

Data size (CSV)  : 292,767,737 bytes
Data size (Avro) : 187,551,110 bytes
Space reduction : 35% (no compression)

Code Used for Benchmark

Code for CSV parsing:

 //Input setup
 BufferedReader read =
   new BufferedReader
     (new InputStreamReader
     (new FileInputStream("./target/test.csv")));
 long counter = 0L;
 String line = null;
 long start = System.currentTimeMillis();
 while((line = read.readLine()) != null) {
   String[] splits = line.split("^\"|\"\\,\"|\"\\n\"|\"$|\\n");
   if("BALI".equals(splits[5])) {
     counter++;
   }
 }
 read.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Count with Region : \"BALI\" = " + counter);

Code used for Avro parsing (with projection):

 //Schema setup (originally written as this schema)
 Schema schema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo.avsc"));
 //Projection (only decode Region field)
 Schema newSchema =
   Schema.parse(AvroLoader.class.
     getResourceAsStream("ipinfo-region-only.avsc"));

 //Set up input
 File file = new File("./target/test.avro");
 DatumReader<GenericRecord> decoder =
   new GenericDatumReader<GenericRecord>(schema, newSchema);
 DataFileReader<GenericRecord> reader =
   new DataFileReader<GenericRecord>(file, decoder);

 //Prepare for scan
 GenericRecord rec = new GenericData.Record(newSchema);
 long counter = 0L;
 Utf8 bali = new Utf8("BALI");
 long start = System.currentTimeMillis();
 while(reader.hasNext()) {
   reader.next(rec);
   if(rec.get("Region").equals(bali)) {
     counter++;
   }
 }
 reader.close();
 System.out.println("Time (ms) : " + (System.currentTimeMillis() - start));
 System.out.println("Projection : " + rec.toString());
 System.out.println("Count with Region : \"BALI\" = " + counter);

Schema for all data: ipinfo.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The geographic information related to an IP address",
 "fields" : [
 { "name" : "IpFrom",        "type" : "long" },
 { "name" : "IpTo",            "type" : "long" },
 { "name" : "CountryCode",    "type" : "string" },
 { "name" : "CountryName",    "type" : "string" },
 { "name" : "Region",        "type" : "string" },
 { "name" : "City",            "type" : "string" },
 { "name" : "Latitude",        "type" : "double" },
 { "name" : "Longitude",        "type" : "double" }
 ]
}

Schema for Region projection:  ipinfo-region-only.avsc:

{
 "type"   : "record",
 "name"   : "IpInfo",
 "doc"    : "The Region information related to an IP address",
 "fields" : [
 { "name" : "Region",        "type" : "string" }
 ]
}

Conclusion

So from our examples we can see the following about Cascading.Avro:

  1. The Avro encoding provides superior encoding and decoding speed vs. text formats.
  2. Avro files take up less space, will use less memory, and will read and write faster from an IO perspective vs. text formats.
  3. Avro files provide the benefits of Sequence Files, without requiring Hadoop to create and read the files.  This could be critical for data collection since we could produce the Avro files while collecting data, providing a additional speed and scalability boost to data collectors.
  4. Cascading.Avro eliminates the tedious and error prone process of associating fields and data types with text files during a write.  We are encouraged to access fields by name rather than index, aiding maintainability and extensibility.
  5. Cascading.Avro automatically populates Cascading Fields and data types from the meta data of existing Avro files, so all we do is read the file and the meta data comes with it.
  6. We apply projection simply by specifying a sub-set of fields to select.  Cascading.Avro takes care of creating a projection schema which saves time and improves maintainability.
  7. We never even have to deal with Arvo schema files since Cascading.Avro automatically generates them from Cascading fields.

So in conclusion it looks like using Cascading.Avro is better, stronger, and faster than using either text files or sequence files in Cascading, and built-in schema evolution features of Avro make this a future proof data storage/processing model for Hadoop.

About these ads
  1. Hi Matt,

    Thanks for the very detailed write-up of the Cascading.Avro tap we created. I’m happy it seems to work well for you.

    Feel free to add issues to the project – https://github.com/bixolabs/cascading.avro.

    Also note that we just pushed version 1.0 to Conjars, which makes it easy to automatically pull via Maven/Ivy/Maven ant task. See http://conjars.org/com.bixolabs/cascading.avro

    Regards,

    — Ken

    • Thanks Ken,

      We are using Avro extensively with Cascading for processing and storing paid and natural search data. Even more important than the performance is the schema evolution and meta data features of Avro. There is no single point of failure for meta data, and backward compatibility for adding fields adds to stability and maintainability. Essentially, that makes Cascading.Avro the system-of-record quality data format for years to come at iCrossing.

      Cheers,
      Matt

  2. This is great, thanks for the write up!

    Also looks like I have a bug or two to resolve in the next Cascading maint release.

    cheers,
    chris

    • No worries mate! Actually the fact that Cascading does not accept uninitialized values actually helps us because it encourages developers to re-use and carefully initialize all objects.

      We just put a Identity function in front of the tail pipe to coerce the values into the proper types so Avro doesn’t spit up.

      Cheers,
      Matt

      • Well NPE is bad form, 1.2.2 should resolve that (out later today).

        Explicitly doing coercion is the correct approach, thoughthe Avro Scheme should provide it optionally.

        That said, see the Coerce pipe, may make the code more readable than having Identity spread about (as with Rename and Shape). 2.0 may make those first class and optimize for them.

        ckw

  1. March 18th, 2011
  2. June 16th, 2011

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: