Monitoring EMR Spend Using the AWS Java SDK

Elastic Map Reduce makes it so easy to spin up a cluster that sometimes it’s also easy to waste money with unused, partially used, or downright unauthorized clusters.  Obviously, as a business, Amazon doesn’t put a whole lot of effort to keep it’s customers from spending too much money.  Amazon has an instance count limit for the entire account, however effectively managing these costs involves getting a lot more granular and providing some more detailed information.

That’s why I created this program which estimates charges for current and historic EMR clusters.  It first obtains the normalized instance hours for all clusters running under the current credentials, then divides by the Normalized Compute Time provided in the Amazon EMR FAQ.  Then we multiply by the EMR Hourly Rate to get the charge for each current and historic job flow (cluster).  Historic job flows come from the Amazon job flow history which takes only the past 20 days or so.

The job flow id is the primary key for this data set.  Output is tab delimited streamed to stdout.  The last column contains a complete dump of the job flow in JSON format.  Here is some example output:

JOB_FLOW_ID    STATE    STARTED    ENDED    INSTANCE_COUNT    INSTANCE_TYPE    INSTANCE_HOURS    EMR_INSTANCE_RATE    CHARGE    DETAIL_JSON
j-DFASFWGRWRG    RUNNING    2011-09-21 10:52:17    null    12    m1.xlarge    36    0.59    21.24    {your job flow JSON}

So now you can keep track of estimated EMR spend in near real time, set alerts, and estimate monthly charges based on current workloads.  Enjoy!

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsResult;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;

/**
 * Monitor EMR spending using the AWS SDK for Java.
 *
 * @author mpouttuclarke
 *
 */
public class EMRMonitor
{
    public static class InstanceRate
    {
        private int normalizedHours;
        private double emrRate;

        public InstanceRate(int normalizedHours, double emrRate)
        {
            super();
            this.normalizedHours = normalizedHours;
            this.emrRate = emrRate;
        }

        /**
         * @return the normalizedHours
         */
        public int getNormalizedHours()
        {
            return normalizedHours;
        }

        /**
         * @return the emrRate
         */
        public double getEmrRate()
        {
            return emrRate;
        }
    };

    static final Map<InstanceType, InstanceRate> rateMap =
        new HashMap<InstanceType, EMRMonitor.InstanceRate>();
    static AmazonElasticMapReduce emr;

    static
    {
        rateMap.put(InstanceType.M1Small, new InstanceRate(1, 0.085 + 0.015));
        rateMap.put(InstanceType.C1Medium, new InstanceRate(2, 0.17 + 0.03));
        rateMap.put(InstanceType.M1Large, new InstanceRate(4, 0.34 + 0.06));
        rateMap.put(InstanceType.M1Xlarge, new InstanceRate(8, 0.50 + 0.09));
        rateMap.put(InstanceType.C1Xlarge, new InstanceRate(8, 0.68 + 0.12));
        rateMap.put(InstanceType.M22xlarge, new InstanceRate(14, 1.00 + 0.21));
        rateMap.put(InstanceType.M24xlarge, new InstanceRate(28, 2.00 + 0.42));
        rateMap.put(InstanceType.Cc14xlarge, new InstanceRate(19, 1.60 + 0.33));
        rateMap.put(InstanceType.Cg14xlarge, new InstanceRate(25, 2.10 + 0.42));
    }

    /**
     * The only information needed to create a client are security credentials consisting of the AWS
     * Access Key ID and Secret Access Key. All other configuration, such as the service end points,
     * are performed automatically. Client parameters, such as proxies, can be specified in an
     * optional ClientConfiguration object when constructing a client.
     *
     * @see com.amazonaws.auth.BasicAWSCredentials
     * @see com.amazonaws.auth.PropertiesCredentials
     * @see com.amazonaws.ClientConfiguration
     */
    private static void init()
        throws Exception
    {
        AWSCredentials credentials =
            new PropertiesCredentials(
                                      AwsConsoleApp.class
                                          .getResourceAsStream("AwsCredentials.properties"));
        emr = new AmazonElasticMapReduceClient(credentials);
    }

    public static void main(String[] args)
        throws Exception
    {
        System.out
            .println("JOB_FLOW_ID\tSTATE\tSTARTED\tENDED\tINSTANCE_COUNT\tINSTANCE_TYPE\tINSTANCE_HOURS\tEMR_INSTANCE_RATE\tCHARGE\tDETAIL_JSON");
        Logger.getLogger("com.amazonaws").setLevel(Level.WARNING); // Turn off request status
                                                                   // messages
        init();
        DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest();
        DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
        for (JobFlowDetail detail : descResult.getJobFlows())
        {
            String slaveInstanceType = detail.getInstances().getSlaveInstanceType();
            String masterInstanceType = detail.getInstances().getMasterInstanceType();
            if (slaveInstanceType == null)
            {
                slaveInstanceType = masterInstanceType;
            }
            double instanceHours = getInstanceHours(detail, slaveInstanceType);
            double charge = getInstanceCharge(slaveInstanceType, instanceHours);
            System.out
                .println(String.format("%1$s\t%2$s\t%3$tF %3$tT\t%4$tF %4$tT\t%5$d\t%6$s\t%7$.0f\t%8$.2f\t%9$.2f\t%10$s\t",
                                       detail.getJobFlowId(),
                                       detail.getExecutionStatusDetail().getState(),
                                       detail.getExecutionStatusDetail().getCreationDateTime(),
                                       detail.getExecutionStatusDetail().getEndDateTime(),
                                       detail.getInstances().getInstanceCount(),
                                       slaveInstanceType,
                                       instanceHours,
                                       rateMap.get(InstanceType.fromValue(slaveInstanceType)).getEmrRate(),
                                       charge,
                                       detail.toString().replaceAll("\\s+", " ")));
        }
    }

    /**
     * @param rate
     * @param instanceHours
     * @return
     */
    public static double getInstanceCharge(String instanceType, double instanceHours)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        return instanceHours * rate.getEmrRate();
    }

    /**
     * @param detail
     * @param rate
     * @return
     */
    public static double getInstanceHours(JobFlowDetail detail, String instanceType)
    {
        InstanceRate rate = rateMap.get(InstanceType.fromValue(instanceType));
        double instanceHours =
            detail.getInstances().getNormalizedInstanceHours() / rate.getNormalizedHours();
        return instanceHours;
    }
}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s