Friday, July 8, 2011

Analyzing the National Solar Radiation Database (NSRDB) with Hadoop


In this posting, I provide a basic framework and some ideas for analyzing the National Solar Radiation Data Base (NSRDB) provided by OpenEI using MapReduce, Hadoop, and the Amazon cloud. First, a little introduction of OpenEI is in order; from the Web site,

Open Energy Information (OpenEI) is a knowledge-sharing online community dedicated to connecting people with the latest information and data on energy resources from around the world. Created in partnership with the United States Department of Energy and federal laboratories across the nation, OpenEI offers access to real-time data and unique visualizations that will help you find the answers you need to make better, more informed decisions. And because the site is designed on a linked, open-data platform, your contributions are both welcomed and encouraged.
Sounds pretty cool ... let's roll up our sleeves and see what we can do with this data using Hadoop. If you need a refresher on Hadoop, have a look at Yahoo's Hadoop Tutorial.

The current NSRDB data set contains >191M observations collected from Solar stations around the US from 1991-2005. Of course this data set is tiny by Hadoop standards, but the current data set is only a sub-set of available solar data. Also, there are other larger solar-related data sets that can use the same approach described in this posting to distribute expensive computations across large numbers of commodity servers using Hadoop. Moreover, this data can be joined with other data sets and analyzed using various Hadoop sub-projects, such as Hive or Pig. So this posting is mostly about how to get OpenEI data into a format that can be analyzed using MapReduce jobs (and other tools, e.g. Hive or Pig) in a Hadoop compute cluster. Specifically, I provide examples with source code for the following topics:

  • File size considerations for storing data in HDFS and S3
  • Loading static metadata into Hadoop's DistributedCache and special handling needed to make this work with S3 and Elastic MapReduce.
  • Running a MapReduce job locally in pseudo-distributed mode and on Amazon's Elastic MapReduce
  • Using MRUnit to unit test the MapReduce jobs before deploying to Hadoop
  • Using Hadoop Counters to help you diagnose unexpected job results and make writing unit tests easier

Building the Application

First, you need to download the source code from github/thelabdude/nsrdb (note: I don't reproduce all the source code in this blog post, so you'll have to download it to get the full details). Next, build the project with Maven 2.2.x using:

mvn clean package
I use the Maven assembly plug-in to create an executable JAR with all dependencies (nsrdb-0.0.2-jar-with-dependencies.jar) and the Hadoop Job JAR with dependencies (nsrdb-0.0.2-hadoop-jobs.jar).

Download the Data

Next, we need to download the data from the site using some good ol' screen scraping. The data is contained in 1,454 GZipped TAR files (tar.gz) on The process I used is as follows:

  1. Parse the contents of the data listing HTML page to extract links matching a specified regular expression, in this case ^\d.*\.tar\.gz$.
  2. Download each data file and save locally so that I don't have to return to the site; build in a 2 sec delay between requests so that OpenEI doesn't get overloaded by the app.
  3. Uncompress each GZipped TAR file; each TAR contains 15 CSV files containing data for a station for each year in the data set
  4. Append the contents of each entry in the TAR file into a single Hadoop SequenceFile that is stored in HDFS.

Motivation for Using a Single Hadoop SequenceFile
My current goal is to store the data in a format that makes writing MapReduce jobs easy. Hadoop works better with a small number of large files that can be split and distributed by HDFS vs. a large number of small files. By small, I mean smaller than the HDFS block size (64MB is the default). Since the NSRDB is not actually that big, I chose to merge all data into a single, compressed SequenceFile. Internally, Hadoop will split the SequenceFile into replicated blocks that are distributed around the HDFS cluster. This means that each Map task will work on a sub-set of data that is most likely stored on the same server that the task is running on (i.e. data locality). For example, on Elastic MapReduce, the default block size is 128MB, so Hadoop will run 18 map tasks for our 2.1GB of input data. I should mention that if you store this SequenceFile in S3, then you do not benefit from data locality automatically, so I'll return to this discussion below.

I use a custom class for the key in my SequenceFile: thelabdude.nsrdb.StationYearWritable. This class holds a long station ID and an int year and implements Hadoop's Writable interface for fast Serialization. The value for each key in the file is simply the entire contents of each TAR entry stored in a BytesWritable object. In other words, I'm storing the CSV data in a large BLOB for each year and station (1,454 stations * 15 years = 21,810 records). We'll save the line-by-line parsing of each CSV file for the MapReduce jobs so we can distribute the processing across a cluster.

GetData in Action
So now that you understand the basic process I'm using to get the data into Hadoop, let's see how to run it. First, make sure Hadoop is setup in your environment. For now, I recommend using the pseudo-distributed configuration (see Hadoop 0.20.2 QuickStart). Once Hadoop is running, run the GetData application, output is saved to the HDFS you configured in conf/core-site.xml:
(java -jar nsrdb-0.0.2-jar-with-dependencies.jar \
  -nsrdbIndexUrl \
  -downloadDir /mnt/openei/nsrdb/input \
  -hadoopDataPath hdfs://localhost:9000/openei/nsrdb/nsrdb.seq &>getdata.log &)
I'll refer you to the source code for thelabdude.nsrdb.GetData for all the details. This application took about 2 hours to download the 1454 tar.gz files from and merge them into a single SequenceFile stored in HDFS (tail the getdata.log file to see the application's progress). While this is slow, you should only have to do this once. When it finishes, you can make sure it is available in HDFS using:
$ bin/hadoop fs -lsr /openei/nsrdb
You should see the following output:
/openei/nsrdb/nsrdb.seq (2346388001 bytes)
Note: Don't delete the raw data files from the download directory just yet as you may want to re-run the GetData application (with -skipDownloadStep) to create more than one SequenceFile if you plan to use Amazon S3 and Elastic MapReduce (see below).

Analyzing NSRDB using MapReduce

Now that the data is in HDFS, it can be used as input to MapReduce jobs. Let's take a look at a few simple jobs that illustrate how to process the data.

Maximum Radiation Measured by Year
Which station is the sunniest in the winter months of each year? While this probably doesn't mean much to energy researchers, and I'm using the term "sunniest" very loosely, it does give us a simple example to exercise the NSRDB data using MapReduce. Here's how to run the job in your local Hadoop:
bin/hadoop jar nsrdb-0.0.2-hadoop-jobs.jar thelabdude.nsrdb.MaxValueByYearJob \
  /openei/nsrdb/nsrdb.seq /openei/nsrdb/maxValueByYear \
  /home/ubuntu/NSRDB_StationsMeta.csv 1
Notice that you need to pass the location of the NSRDB_StationsMeta.csv file to the job. Download the file from OpenEI and save it to the local filesystem where you run the job. The job uses Hadoop's DistributedCache to replicate the static station metadata CSV to all nodes in the cluster. Using Hadoop 0.20.2 in pseudo-distributed mode on a small Amazon EC2 instance (32-bit Ubuntu), the job finished in about 50 minutes (of course, we'll improve on this time in a real cluster). Once it completes, look at the results using:
hadoop fs -text /openei/nsrdb/maxValueByYear/part-r-00000
Let's look at the Map and Reduce steps in detail; the MapReduce job flow looks like this:
Map(k1,v1) → list(k2,v2) where k1 is Station ID + Year, v1 is a CSV file stored as a array of bytes, k2 is a year, such as 2001, and v2 is the station ID and sum of a solar radiation field. Using the new Hadoop API, the mapper class is defined as:
static class MaxValueByYearMapper
 extends Mapper<StationYearWritable, BytesWritable, IntWritable, StationDataWritable> {
Reduce(k2, list (v2)) → list(v3) where k2 and v2 are the same as what was output from the mapper and v3 is metadata for the station the measured the most radiation per year. All v2 values for the same key k2 are sorted by Hadoop during the shuffle and sent to the same reducer. The reducer class is defined as:
static class MaxValueByYearReducer
  extends Reducer<IntWritable, StationDataWritable, IntWritable, Text> {

Map Step
We need to sum up all the radiation measured by each station during the winter each year. In the end, we will have 15 stations identified in our reduce output (one for each year in our dataset). Recall that keys in our SequenceFile are StationYearWritable objects and values are BytesWritable objects. Thus, the map function signature is:
public void map(StationYearWritable key, BytesWritable value, Context context)
        throws IOException, InterruptedException
Now is the time to process the CSV data line-by-line, since presumably our Map task is distributed in a Hadoop cluster. Before processing the CSV data for a station, the Map step consults the station metadata stored in Hadoop's DistributedCache to see if the station is of interest to this job. For this job, I'm only interested in class 1 or 2 stations in the lower 48 US states.

Next, the Map step parses each line from the stored CSV file.
IMPORTANT: Be careful when reading from BytesWritable objects because the byte[] returned from getBytes() may have more bytes than expected. You need to read only up to index getLength()-1:

ByteArrayInputStream bytesIn = new ByteArrayInputStream(value.getBytes(), 0, value.getLength());
    reader = new BufferedReader(new InputStreamReader(bytesIn, UTF8));
For this job, I'll sum up the values in the Modeled Global Horizontal field to find my maximum. I'm also using a common MapReduce technique called a Combiner where I'm doing some of the summarization work in the Map step since we know that each value coming into the map function contains all the data we need for a given station per year. Combiners help reduce the amount of disk I/O needed between the Map and Reduce steps by reducing the number of key/value pairs output by your mapper. Hadoop allows you to configure a separate Combiner class for your job, but since the logic for this job is so easy, I can just do the summarization in the mapper.

A few words about Hadoop's DistributedCache
Hadoop's DistributedCache replicates static data needed by your job at runtime to all DataNodes in the cluster, which addresses my job's need to get access to station metadata nicely. However, using the DC can be tricky as the current documentation is not clear on how to set it up correctly, especially when working with S3 and Elastic MapReduce. In the job's run method, I have the following code to add the station metadata CSV file to Hadoop's DistributedCache:
URI localUri = stationMetadataLocal.toUri();
    if ("s3".equals(localUri.getScheme()) || "s3n".equals(localUri.getScheme())) {
      copyFromS3File(conf, stationMetadataLocal, stationMetadataHdfs);
    } else {
      FileSystem fs = FileSystem.get(conf);
      fs.copyFromLocalFile(false, true, stationMetadataLocal, stationMetadataHdfs);
    DistributedCache.addCacheFile(stationMetadataHdfs.toUri(), conf);
You'll notice that I have to handle files in S3 differently than files on the local file system where the job is run. Essentially, if your static data is in S3, you need to copy it to HDFS using two separate file systems: S3N and HDFS, see my copyFromS3File method.

Reduce Step
The reduce step is simple: it receives a list of values per year and sums them for each station. The framework collects and sorts the values for each year before invoking the reducer during an internal phase called the "shuffle". Once all station data has been summed up, the station with the maximum sum is written to the output. As with the Map step, my Reducer consults the DistributedCache to get station metadata for its output, which is easier to interpret than just a station ID.

Group by Year
I've also included a simple job to transform the data to group values by year (see thelabdude.nsrdb.GroupByYearJob). This example shows how you do not need a Reducer for your job if you just want to transform the input data. The output of this job is useful for using Hadoop's streaming interface.

Running in the Cloud

Now, let's run this same job an Amazon Elastic MapReduce cluster. You'll need a few command-line tools to do the work in this section.

On Debian-based systems, you can do:
sudo apt-get install s3cmd
sudo apt-get install openssl
sudo apt-get install ruby1.8
sudo apt-get install libopenssl-ruby1.8
sudo apt-get install libruby1.8-extras
Once installed, you'll need your Amazon Access Key and Secret Access Key values to configure s3cmd using: s3cmd --configure. Follow the instructions for elastic-mapreduce to get that setup as well. If you need a refresher on Amazon access credentials, please read: Understanding Access Credentials for AWS/EC2.
Be aware of the implications of the region setting in the elastic-mapreduce credentials.json file as it determines which region your Elastic MapReduce instances launch in. If you store your data in a US Standard S3 bucket, then I recommend launching your EMR instances in the "us-east-1" region as there are no data transfer fees between your EMR instances and S3. On the other hand, if you launch your cluster in the "us-west-1" region, you will incur data transfer fees if your data is in a US Standard S3 bucket, so only use "us-west-1" if your S3 bucket is in Northern California.

File Size Revisited for Amazon S3
Previously, I made the statement that Hadoop works better with a small number of large files vs. a large number of small files. While this is still true with EMR, we need bend this rule slightly when working with files stored in S3 as using only one large file will lead to a major degradation in performance. Your job may actually run slower than it does in pseudo-distributed mode on a single machine since you'll also have Hadoop job management and S3 latency involved. Intuitively, this makes sense because Hadoop sets the number of map tasks based on the number of splits. So no matter how big your cluster is, only one node can process the input from one file on S3. The solution is split the data into smaller chunks, such as double the number of nodes in your cluster. The GetData tool I developed allows you to specify the number of SequenceFiles to create using the chunkSize parameter. For a cluster with 4 nodes, I used chunkSize=256 which resulted in 9 SequenceFiles instead of 1. The key take-away here is that now I can have 9 map tasks running in parallel instead of only one.

Copy the SequenceFile(s) created by GetData from your local HDFS to an S3 bucket you have access to using Hadoop's distcp:

hadoop distcp /openei/nsrdb s3n://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]
Notice that I pass my Amazon ACCESS_KEY and SECRET_KEY in the S3 URI and that I'm using s3n as the protocol (native S3), see: Hadoop and Amazon S3. The distcp command uses MapReduce to distribute a large copy around your Hadoop cluster. The files copied from HDFS will be the input to your MapReduce jobs on EMR.

Next, you need to put the Jobs JAR file in S3, for this I like to use s3cmd:

s3cmd put s3://BUCKET[/PREFIX] 
For example:
s3cmd put nsrdb-0.0.2-hadoop-jobs.jar s3://thelabdude/openei/
Now, you're all set for running the job in EMR. The approach I like to take is to start a job flow without any jobs configured (actually, this is pretty much the standard approach when working with EMR). Once the cluster is running, I add job flow steps to it, which is more efficient because if the job fails due to a invalid or missing parameter, I can correct and re-start the step without waiting for the cluster to restart. To start a job flow, do the following:
elastic-mapreduce --create --alive \
  --log-uri s3n://BUCKET/openei/logs/ \
  --key-pair YOUR_KEY_PAIR_NAME \
  --slave-instance-type m1.small \
  --master-instance-type m1.small \
  --num-instances 9 \
  --name openei.nsrdb
This will create a new Job Flow named "openei.nsrdb" with 1 master node and 8 data nodes (m1.small instances); it can take a few minutes before the job flow is started. Take note of the Job Flow ID returned as you will need this to access logs and add job steps. You can add a new "step" (ie. Hadoop job) once you have the Job Flow ID:
elastic-mapreduce --jar s3n://BUCKET/openei/nsrdb-0.0.2-hadoop-jobs.jar \
  --main-class thelabdude.nsrdb.MaxValueByYearJob \
  --arg s3n://BUCKET/openei/nsrdb-chunks \
  --arg s3n://BUCKET/openei/maxValueByYear/ \
  --arg s3n://BUCKET/openei/NSRDB_StationsMeta.csv \
  --arg 8 \
  -j JOB_ID
Notice that I'm now requesting 8 reducers; Hadoop recommends a range of ((0.95 to 1.75) * cluster size). Also, I'm reading the input data directly from s3n://BUCKET/openei/nsrdb-chunks. If you're going to run multiple jobs on the data in the same EMR JobFlow, then I recommend using distcp to copy the data from S3 to the local HDFS for your Elastic MapReduce cluster using:
elastic-mapreduce --jar s3n://elasticmapreduce/samples/distcp/distcp.jar \
  --arg s3n://BUCKET/openei/nsrdb-chunks \
  --arg /openei/
  -j JOB_ID
This will help your jobs run much faster because the input data is already loaded into the local HDFS of your EMR cluster (data locality). To view the logs for your Job Flow, simply do:
elastic-mapreduce --logs -j JOB_ID
With a 8 node cluster of EC2 small instances, the job completed in roughly 8 minutes (after using distcp to copy the data from S3). The output from the job is included here:
2011-07-08 23:15:45,796 INFO org.apache.hadoop.mapred.JobClient (main): Job complete: job_201107082258_0002
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Counters: 19
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   Job Counters 
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Launched reduce tasks=8
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Rack-local map tasks=5
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Launched map tasks=19
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Data-local map tasks=14
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   thelabdude.nsrdb.MaxValueByYearJob$StationDataCounters
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     STATION_IGNORED=10095
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   FileSystemCounters
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_READ=131196
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     HDFS_BYTES_READ=2347266412
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_WRITTEN=276357
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     HDFS_BYTES_WRITTEN=1307
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):   Map-Reduce Framework
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input groups=15
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Combine output records=0
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map input records=21810
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce shuffle bytes=141561
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce output records=15
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Spilled Records=23430
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map output bytes=281160
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Combine input records=0
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map output records=11715
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input records=11715
Notice that the job ignored 673 stations (thelabdude.nsrdb.MaxValueByYearJob$StationDataCounters.STATION_IGNORED: 10095 / 15 years). The reducer output looks like:
1991    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,350400.0
1992    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,332856.0
1993    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,356962.0
1994    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,343829.0
1995    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354387.0
1996    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,377977.0
1997    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,354811.0
1998    722695,2,false,LAS CRUCES INTL,NM,32.283,-106.917,1393,-7,32.283,-106.917,1393,354944.0
1999    722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,370871.0
2000    722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,371146.0
2001    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,374057.0
2002    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354146.0
2003    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,363305.0
2004    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,362566.0
2005    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,373423.0
Not surprisingly, all the values are in either Arizona or Florida!

IMPORTANT: Be sure to terminate your job flow when you're finished working with it so that you don't incur unnecessary expenses.
elastic-mapreduce --terminate -j JOB_ID
For more tips about running jobs in Elastic MapReduce, check out my contributions to the Mahout wiki based on some benchmarking work I did for Taming Text. See Building Vectors for Large Document Sets on Mahout on Elastic MapReduce. Alternatively, you can also spin-up your own Hadoop cluster instead of EMR using the notes I compiled for Mahout on Amazon EC2 > Use an Existing Hadoop AMI

Testing Hadoop Jobs with MRUnit

Lastly, I've included a test case (thelabdude.nsrdb.MaxValueByYearMRUnitTest) to test my MR job using MRUnit. The MRUnit library provides the mock objects and job drivers that simulate the Hadoop runtime environment for testing your job outside of Hadoop. MRUnit drivers allow you to specify expected output from Mapper and Reducer classes given specific inputs. Notice that my job uses Hadoop Counters to keep track of decisions it makes, such as skipping a station or when it encounters an error when parsing the data. Counters can help you diagnose unexpected job results and make writing unit tests easier since you can assert expected counts based on your test input data. For example, the following unit test checks to see if the job is handling invalid input records correctly:

    public void testMapperIgnoreParseError() throws Exception {
      // data that produces parse errors
      String csv = ...

      BytesWritable value = new BytesWritable(csv.getBytes(MaxValueByYearJob.UTF8));              
      mapDriver.withInput(new StationYearWritable(690190,2005), value)

      Counters counters = mapDriver.getCounters();
      Counter counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATE_PARSE_ERROR);
      assertTrue("Expected 1 date parse rror", counter.getValue() == 1);              
      counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATA_PARSE_ERROR);
      assertTrue("Expected 1 data parse rror", counter.getValue() == 1);              
Both CSV formatted input records contain data formatting errors. Thus, I expect the MaxValueByYearJob.StationDataCounters.DATE_PARSE_ERROR and DATA_PARSE_ERROR Counters to report 1 parse error each. Try to avoid overloading the meaning of each counter and have a specific counter for each possible error / event in your job as it will help you troubleshoot data problems when running in a large distributed cluster. For information on Hadoop Counters, see Cloud9: Working with counters


So now we have a basic framework for loading OpenEI data into a Hadoop cluster and have seen how to run MapReduce jobs against it. As you can tell, there are many moving parts and some non-trivial Java coding work to do some simple analysis on the data with Hadoop. While energy researchers may benefit from being able to analyze huge Solar data sets using MapReduce and Hadoop, they will need a higher-level of abstraction than what was presented here. Moreover, 8 minutes really isn't very fast for having run such a simple job on 8 machines with such a small data set, so I'll also need to address performance issues. Consequently, I plan to follow-up this posting with an example of working with this data using Apache Hive and Pig to see if these data warehouse frameworks make it easier to analyze the NSRDB. I also plan to compare the results with Hive / Pig to using a MySQL database for analyzing this data.


  1. Very helpful blog post. Thanks for the details.

  2. Nice article and very helpful. I am waiting eagerly to know what happened after like example of working with this data using Apache Hive and Pig and comparison of the results with Hive / Pig to using a MySQL database for analyzing this data.