Apache Beam from Google finally provides robust unification of batch and real-time Big Data. This framework replaced MapReduce, FlumeJava, and Millwheel at Google. Major big data vendors already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation. Anyone else seeing the future of Big Data in a new light? I know I am…
Scala works best with IntelliJ Idea IDE, which has licensing costs and is extremely unlikely to replace free Eclipse tooling at any large company
Scala is among a crowd of strong contenders and faces a moving target as Java has gained 5% in market share between 2015 and 2016. To put this in perspective, Scala has less market share than Lisp
Consistency and Integrity Issues
Trying to get Spark to meet rigorous standards of data consistency and integrity proves difficult. Apache Spark’s design originates from companies who consider Data Consistency and Data Integrity secondary concerns, while most industries consider these primary concerns. For example, achieving at-most-once and at-least-once consistency from Spark requires numerous workarounds and hacks: http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
Dependency Hell with a Vengeance
Apache Spark (and Scala) import a huge number of transitive dependencies compared to other alternative technologies. Programmers must master all of those dependencies in order to master Spark. No wonder very few true experts in Spark exist in the market today.
What’s the Alternative to Spark?
For real-time in-memory processing Use Case: data grids, once the purview of blue chip commercial vendors, now have very strong open source competition. Primary contenders include Apache Ignite and Hazelcast.
For fast SQL analytics (OLAP) Use Case: Apache Drill provides similar performance to Spark SQL with a much simpler, more efficient, and more robust footprint. Apache Kylin from eBay looks to become a major OLAP player very quickly, although I have not used it myself.
For stream processing Use Case:Apache Beam from Google looks likely to become the de-facto streaming workhorse, unseating both Apache Flink and Spark Streaming. Major big data vendors have already contributed Apache Beam execution engines for both Flink and Spark, before Beam even officially hit incubation.
If you try these alternative technologies, and compare to Spark, I’m sure you’ll agree that Spark isn’t worth the headache.
Supporting unions for fields with multiple types makes for more robust and automated feature extraction. For example, “account numbers” may contain business relevant strings or spaces due to different data stewards or external data providers.
Rather than transforming all numbers to String, statzall takes the opposite approach and packs Strings into doubles using the open source Unibit Encoding. This allows extremely efficient feature extraction of basic data science primitives using existing hardened APIs such as CERN Colt. With single-threaded performance of 8 mm / sec automated feature extraction on *all* measures becomes possible.
In addition, statzall provides support for one-click deployment to your Hadoop YARN cluster using CDAP. Or if you use cloud, you can literally set up a fully automated Internet-scale feature extraction cluster in 5 minutes using the Coopr Cloud.
The YARN Application Master provides all the raw data to accurately estimate the resources required for your big data application to meet it’s SLAs when deployed to production. By identifying crucial counters and deriving resource ratios by task and for the application as a whole we can even infer run times from a smaller test environment to a larger production footprint.
All YARN frameworks provide similar counters, however we will be using the popular Hadoop MapReduce framework as an example. We can also get the same values displayed on the web interface above directly from the MapReduce API. The following counters drive the capacity plan:
Map tasks launched
Used as a divisor to obtain avg map metrics
Reduce tasks launched
Used as a divisor to obtain avg reduce metrics
Total time spent by all maps (ms)
Used as a numerator to obtain avg map task time
Total time spent by all reduces (ms)
Used as a numerator to obtain avg reduce task time
The following counters calculate twice, once for all Mappers and once for all Reducers, it’s important not to mix ratios across task types.
CPU time used
Used as a numerator to obtain avg task CPU
Used as a numerator to obtain avg task RAM
Used as a numerator to obtain avg task read ops
Used as a numerator to obtain avg task write ops
Used as a numerator to obtain avg task read bytes
Used as a numerator to obtain avg task write bytes
The primary assumption when inferring between environments is that the data being operated on remains the same. If the input data differs between environments then results may skew, especially for reducers.
Calculating Resource to Task Type Ratios
By calculating ratios, we can then scale the run time and other resources up and down depending on available task slots and quotas in the target environment.
We can now scale parallel task quotas and other resource quotas up and down to calculate the impact on the job for a particular environment. For example, wall clock time for the map phase can vary from all tasks running in parallel ( t = MILLIS_MAPS / TOTAL_LAUNCHED_MAPS ) all the way down to a single task running in parallel ( t = MILLIS_MAPS ). Similarly for all other variables. For resource constraints, dividing by the most severe restriction governs the cost to total run time. For example, if we enforce a quota restricting CPU time to CPU_MILLISECONDS * .5 then MILLIS_MAPS will be increased to MILLIS_MAPS / .5. This would occur if for example the max mappers per node were increased to twice the number of cores. Resource to Task Type Ratios come in handy for impact assessment and prediction based on any conceivable environmental constraint.
Dependency collisions furnish plenty of headaches and late nights while implementing Lambda Architecture. Java class loading containers tend toward complexity and many frameworks such as Hadoop, JCascalog, and Storm provide tons of transitive dependencies but no dependency isolation for third party Lambda code deployed in the same JVM. So currently to benefit from running in the same JVM, all the transitive dependencies come along for the ride. This impacts:
Portability: Lambda logic shared in the batch layer and the speed layer must cope with at least two different sets of dependencies.
Manageability: framework version upgrades may cause hard failures or other incompatibilities in Lambda logic due to changing dependency versions.
Performance: solving the problem by forcing JVM isolation incurs overhead from IPC or network chatter. Likewise, utilizing scripting or dynamic languages merely hides the problem rather than solving it, while imposing additional performance penalties.
Complexity: developing Lambdas requires knowledge and accommodation of all the dependencies of all frameworks targeted for deployment.
Many developers and architects rightfully shy away from deploying bloated dependency container frameworks such as Spring or OSGI inside of a big data framework. Big data and fast data provide enough complexity already. So lets look at how we can use a simple pattern, along with basic core Java libraries, to avoid “Dependency Hell“.
Signs you may be encountering this problem include these types of exceptions occurring after deployment:
java.lang.NoSuchMethodError: when a method no longer exists which your code depends on
java.lang.ClassNotFoundException: when a class your code uses cannot be found in the deployed class path
java.lang.IllegalAccessException: when conflicting versions of an API mark methods or fields private
Please reference my github project for a complete working implementation along with unit tests.
The Java Service Loader framework introduced in JDK 1.6 provides a way to dynamically bind an implementation class to an interface while specifying an alternate class loader. Loading a Lamda implementation with ServiceLoader forces all code called within the context of the Lamda to use the same class loader. This allows creating a service which supports parent-first class loading, child-first class loading, or child-only (or hermetic) class loading. In this case, the hermetic loader prevents any possible dependency collisions. To create a hermetic loader, we need simply utilize the built-in URLClassLoader in Java. Our implementation jar can reside on the local file system, on a web server, or in HDFS: anywhere a URL can point to. For the parent class loader, we specify the Java bootstrap class loader. So we can implement a hermetic class loading pattern in one line of code:
ServiceLoader<Map> loader = ServiceLoader.load(Map.class,
new URLClassLoader(urls, Map.class.getClassLoader()));
Note that we intentionally avoid calling ClassLoader.getSystemClassLoader() in order to prevent the calling context (such as Hadoop or Hive) form polluting the Lambda class path. Core packages such as java.lang and java.util use the bootstrap class loader, which only carries core Java dependencies shipped as part of the JDK. The diagram above shows how the LambdaBoot framework fits within a big data framework such as Hadoop.
Mapping the World
In the example above, we use a Map interface to interact with the Lambda. This allows us to avoid having a separate jar containing a Service Provider Interface (SPI). Instead, we can subclass the Map and provide any behavior desired by intercepting get and put calls to specific keys. By optionally returning a Future from a get or put call on the Map, we get asynchronous interaction as well if desired. In addition, the non-sensitive Map keys can provide metadata facilities. In the example of linear regression implemented as a Map, we use a conflicting version of Apache Commons Math not yet supported by Hadoop to calculate the regression.
Using a very simple pattern, we can deploy Hermetic Lambdas to any Java environment without fear of dependency collisions. The ServiceLoader also acts as a service registry, allowing us to browse metadata about available Lambdas, dynamically load new Lamdas, or update existing Lambdas.
10 node load test comparison using Amazon EC2 SSD-based instances. 1 billion vertices and 1 billion edges processed for each test run. Used the titan-loadtest project to run each test.
Experiment maximizes data locality by co-locating load generation, Titan graph database, and Cassandra/Hazelcast within the same JVM instance while partitioning data across a cluster. Exploration of methods for tuning garbage collection, Titan, and Cassandra for the peer computing use case.
The following components were utilized during the experiment:
Each test iteration has 6 read ratio phases starting with 0% reads (100% writes) all the way up to 90% reads and 10% writes. For all tests, the persistence implementation executes in the same JVM as Titan to avoid unnecessary context switching and serialization overhead. Tests were conducted using an Amazon placement group to ensure instances resided on the same subnet. The storage was formatted with 4K blocks and used the noop scheduler to improve latency.
For each phase, new vertices were added with one edge linking back to a previous vertex. No tests of update or delete were conducted.
Please see the titan-loadtest project above for all Cassandra and Hazelcast settings and configurations used in the test.
Please note: the results are listed in rates of thousands of vertices per second and include the creation of an edge as well as a vertex. Also, the Hazelcast SSD x1 results used a custom flash storage module for Hazelcast developed privately so those results are not replicable without that module installed.
Hazelcast performed better than Cassandra for all tests and demonstrated one order of magnitude better performance on reads. Surprisingly, Hazelcast slightly outperformed Cassandra for writes as well.
Demonstration of efficient garbage collection on JVM heap sizes in excess of 128 gigabytes. Garbage collection behavior analyzed via a mini-max optimization strategy. Study measures maximum throughput versus minimum garbage collection pause to find optimization “sweet spot” across experimental phases. Replicable results via well-defined open source experimental methods executable on Amazon EC2 hardware.
Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters. Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money. Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.
That’s why I created this program which estimates charges for current and historic EMR clusters. It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ. Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster). Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.
The job flow id is the primary key for this data set. Output is tab delimited streamed to stdout. The last column contains a complete dump of the job flow in JSON format. Here is some example output:
Well, now that we have a high availability and low latency design for our RESTful services, where can we find the weakest link in maintaining our service level agreements? The answer often lies in the crufty, hidden, and often poorly tested “glue code” lying between (and below) the systems themselves. This code rears it’s head when a critical notification fails to fire, or a cluster fails to scale automatically as designed. How do we get this code cleaned up, out in the open, and rock solid? The answer, of course, is to apply agile principles to our infrastructure.
The field of system programming desperately needs an update into the Agile Age! And why not? Popular cloud management APIs provide complete command and control via Agile-enabled languages such as Java… And many efforts are under way to make cloud APIs standardized and open source. So let’s delve into some details about how we can transform shell script spaghetti into something more workable!
I long to see the day that system programs have automated unit test coverage metrics just like the rest of the civilized world. Let’s test that fail over logic in a nice safe place where it can’t ruin lives! Think you can’t mock all failure scenarios? Think again. With 100% Java APIs available from many cloud providers (not to mention other test-friendly languages) we could impersonate any type of response from the cloud services. Friends don’t let friends code without testing.
Change Management Tied to Code Commits
Let’s all admit it: at one point or another we have all been tempted to cheat and “tweak” things in production. Often it doesn’t seem possible that a particular change could cause a problem. However, in some system programming environments “tweaking” has become a way of life because people assume there is no way to replicate the production environment. In a cloud environment this is simply not the case. All cloud instances follow a standardized set of options and configurations commanded by a testable language. If we create a Java program or Python script to build and maintain the production environment, how trivial is it to re-create the same environment? In this case tying all system program code commits directly to a Jira issue becomes possible, and complete end-to-end management of environment change also becomes possible. All changes are code changes. Wow, what a concept!
By building testable components we create layers of abstraction which hide and manage complexity. We can now confidently build intelligent instances which dynamically configure themselves, hand in hand with the software which runs on them. Testability makes this not only feasible, but in fact imperative as a competitive advantage.
Organic Cloud Systems
So what happens when we standardize an Agile process for system programs in the same way we do for application programs? Repeatable results… And we also largely take the burden of busy work off of infrastructure folks so they can focus on engineering the next generation of great systems rather than doing so much fire fighting at 3 am. Not to say this solves all of those problems, but most repeatable IT problems have automatable solutions. Most likely we can automate anything which we can put in a run sheet using cloud APIs. We then have organic cloud systems which automatically configure themselves and adapt to environmental changes and failure scenarios without sacrificing reliability.
The current Amazon EMR documentation provides a lot of coverage of how to run a Hadoop custom jar using the AWS management console. This post covers how you do the same thing directly using the EC2 SDK for Java on Eclipse.
To run the sample code, you need to set up and/or have knowledge of the following: