In this posting, I provide a basic framework and some ideas for analyzing the National Solar Radiation Data Base (NSRDB) provided by OpenEI using MapReduce, Hadoop, and the Amazon cloud. First, a little introduction of OpenEI is in order; from the openei.org Web site,
The current NSRDB data set contains >191M observations collected from Solar stations around the US from 1991-2005. Of course this data set is tiny by Hadoop standards, but the current data set is only a sub-set of available solar data. Also, there are other larger solar-related data sets that can use the same approach described in this posting to distribute expensive computations across large numbers of commodity servers using Hadoop. Moreover, this data can be joined with other data sets and analyzed using various Hadoop sub-projects, such as Hive or Pig. So this posting is mostly about how to get OpenEI data into a format that can be analyzed using MapReduce jobs (and other tools, e.g. Hive or Pig) in a Hadoop compute cluster. Specifically, I provide examples with source code for the following topics:
- File size considerations for storing data in HDFS and S3
- Loading static metadata into Hadoop's DistributedCache and special handling needed to make this work with S3 and Elastic MapReduce.
- Running a MapReduce job locally in pseudo-distributed mode and on Amazon's Elastic MapReduce
- Using MRUnit to unit test the MapReduce jobs before deploying to Hadoop
- Using Hadoop Counters to help you diagnose unexpected job results and make writing unit tests easier
First, you need to download the source code from github/thelabdude/nsrdb (note: I don't reproduce all the source code in this blog post, so you'll have to download it to get the full details). Next, build the project with Maven 2.2.x using:
mvn clean package
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2</version> <executions> <execution> <id>make-exe-jar</id> <phase>package</phase> <goals> <goal>single</goal> </goals> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>thelabdude.nsrdb.GetData</mainClass> </manifest> </archive> </configuration> </execution> <execution> <id>make-jobs-jar</id> <phase>package</phase> <goals> <goal>single</goal> </goals> <configuration> <descriptors> <descriptor>src/main/assembly/hadoop-jobs-jar.xml</descriptor> </descriptors> </configuration> </execution> </executions> </plugin> </plugins> </build>
Next, we need to download the data from the OpenEI.org site using some good ol' screen scraping. The data is contained in 1,454 GZipped TAR files (tar.gz) on http://en.openei.org/datasets/files/39/pub/. The process I used is as follows:
- Parse the contents of the data listing HTML page to extract links matching a specified regular expression, in this case ^\d.*\.tar\.gz$.
- Download each data file and save locally so that I don't have to return to the site; build in a 2 sec delay between requests so that OpenEI doesn't get overloaded by the app.
- Uncompress each GZipped TAR file; each TAR contains 15 CSV files containing data for a station for each year in the data set
- Append the contents of each entry in the TAR file into a single Hadoop SequenceFile that is stored in HDFS.
I use a custom class for the key in my SequenceFile: thelabdude.nsrdb.StationYearWritable. This class holds a long station ID and an int year and implements Hadoop's Writable interface for fast Serialization. The value for each key in the file is simply the entire contents of each TAR entry stored in a BytesWritable object. In other words, I'm storing the CSV data in a large BLOB for each year and station (1,454 stations * 15 years = 21,810 records). We'll save the line-by-line parsing of each CSV file for the MapReduce jobs so we can distribute the processing across a cluster.
(java -jar nsrdb-0.0.2-jar-with-dependencies.jar \ -nsrdbIndexUrl http://en.openei.org/datasets/files/39/pub/ \ -downloadDir /mnt/openei/nsrdb/input \ -hadoopDataPath hdfs://localhost:9000/openei/nsrdb/nsrdb.seq &>getdata.log &)
$ bin/hadoop fs -lsr /openei/nsrdb
/openei/nsrdb/nsrdb.seq (2346388001 bytes)
Now that the data is in HDFS, it can be used as input to MapReduce jobs. Let's take a look at a few simple jobs that illustrate how to process the data.
bin/hadoop jar nsrdb-0.0.2-hadoop-jobs.jar thelabdude.nsrdb.MaxValueByYearJob \ /openei/nsrdb/nsrdb.seq /openei/nsrdb/maxValueByYear \ /home/ubuntu/NSRDB_StationsMeta.csv 1
hadoop fs -text /openei/nsrdb/maxValueByYear/part-r-00000
Map(k1,v1) → list(k2,v2) where k1 is Station ID + Year, v1 is a CSV file stored as a array of bytes, k2 is a year, such as 2001, and v2 is the station ID and sum of a solar radiation field. Using the new Hadoop API, the mapper class is defined as:
static class MaxValueByYearMapper extends Mapper<StationYearWritable, BytesWritable, IntWritable, StationDataWritable> { ... }
static class MaxValueByYearReducer extends Reducer<IntWritable, StationDataWritable, IntWritable, Text> { ... }
public void map(StationYearWritable key, BytesWritable value, Context context) throws IOException, InterruptedException
Next, the Map step parses each line from the stored CSV file.
IMPORTANT: Be careful when reading from BytesWritable objects because the byte[] returned from getBytes() may have more bytes than expected. You need to read only up to index getLength()-1:
ByteArrayInputStream bytesIn = new ByteArrayInputStream(value.getBytes(), 0, value.getLength()); reader = new BufferedReader(new InputStreamReader(bytesIn, UTF8));
URI localUri = stationMetadataLocal.toUri(); if ("s3".equals(localUri.getScheme()) || "s3n".equals(localUri.getScheme())) { copyFromS3File(conf, stationMetadataLocal, stationMetadataHdfs); } else { FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(false, true, stationMetadataLocal, stationMetadataHdfs); } DistributedCache.addCacheFile(stationMetadataHdfs.toUri(), conf);
Now, let's run this same job an Amazon Elastic MapReduce cluster. You'll need a few command-line tools to do the work in this section.
- s3cmd: http://s3tools.org/s3cmd
- elastic-mapreduce-ruby: http://aws.amazon.com/developertools/2264?_encoding=UTF8&jiveRedirect=1
sudo apt-get install s3cmd sudo apt-get install openssl sudo apt-get install ruby1.8 sudo apt-get install libopenssl-ruby1.8 sudo apt-get install libruby1.8-extras wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
Copy the SequenceFile(s) created by GetData from your local HDFS to an S3 bucket you have access to using Hadoop's distcp:
hadoop distcp /openei/nsrdb s3n://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]
Next, you need to put the Jobs JAR file in S3, for this I like to use s3cmd:
s3cmd put s3://BUCKET[/PREFIX]
s3cmd put nsrdb-0.0.2-hadoop-jobs.jar s3://thelabdude/openei/
elastic-mapreduce --create --alive \ --log-uri s3n://BUCKET/openei/logs/ \ --key-pair YOUR_KEY_PAIR_NAME \ --slave-instance-type m1.small \ --master-instance-type m1.small \ --num-instances 9 \ --name openei.nsrdb
elastic-mapreduce --jar s3n://BUCKET/openei/nsrdb-0.0.2-hadoop-jobs.jar \ --main-class thelabdude.nsrdb.MaxValueByYearJob \ --arg s3n://BUCKET/openei/nsrdb-chunks \ --arg s3n://BUCKET/openei/maxValueByYear/ \ --arg s3n://BUCKET/openei/NSRDB_StationsMeta.csv \ --arg 8 \ -j JOB_ID
elastic-mapreduce --jar s3n://elasticmapreduce/samples/distcp/distcp.jar \ --arg s3n://BUCKET/openei/nsrdb-chunks \ --arg /openei/ -j JOB_ID
elastic-mapreduce --logs -j JOB_ID
2011-07-08 23:15:45,796 INFO org.apache.hadoop.mapred.JobClient (main): Job complete: job_201107082258_0002 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Counters: 19 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Job Counters 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Launched reduce tasks=8 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Rack-local map tasks=5 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Launched map tasks=19 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Data-local map tasks=14 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): thelabdude.nsrdb.MaxValueByYearJob$StationDataCounters 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): STATION_IGNORED=10095 2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): FileSystemCounters 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): FILE_BYTES_READ=131196 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): HDFS_BYTES_READ=2347266412 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): FILE_BYTES_WRITTEN=276357 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): HDFS_BYTES_WRITTEN=1307 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Map-Reduce Framework 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Reduce input groups=15 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Combine output records=0 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Map input records=21810 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Reduce shuffle bytes=141561 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Reduce output records=15 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Spilled Records=23430 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Map output bytes=281160 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Combine input records=0 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Map output records=11715 2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main): Reduce input records=11715
1991 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,350400.0 1992 722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,332856.0 1993 722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,356962.0 1994 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,343829.0 1995 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354387.0 1996 722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,377977.0 1997 722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,354811.0 1998 722695,2,false,LAS CRUCES INTL,NM,32.283,-106.917,1393,-7,32.283,-106.917,1393,354944.0 1999 722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,370871.0 2000 722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,371146.0 2001 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,374057.0 2002 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354146.0 2003 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,363305.0 2004 722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,362566.0 2005 722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,373423.0
IMPORTANT: Be sure to terminate your job flow when you're finished working with it so that you don't incur unnecessary expenses.
elastic-mapreduce --terminate -j JOB_ID
Lastly, I've included a test case (thelabdude.nsrdb.MaxValueByYearMRUnitTest) to test my MR job using MRUnit. The MRUnit library provides the mock objects and job drivers that simulate the Hadoop runtime environment for testing your job outside of Hadoop. MRUnit drivers allow you to specify expected output from Mapper and Reducer classes given specific inputs. Notice that my job uses Hadoop Counters to keep track of decisions it makes, such as skipping a station or when it encounters an error when parsing the data. Counters can help you diagnose unexpected job results and make writing unit tests easier since you can assert expected counts based on your test input data. For example, the following unit test checks to see if the job is handling invalid input records correctly:
@Test public void testMapperIgnoreParseError() throws Exception { // data that produces parse errors String csv = ... BytesWritable value = new BytesWritable(csv.getBytes(MaxValueByYearJob.UTF8)); mapDriver.withInput(new StationYearWritable(690190,2005), value) .runTest(); Counters counters = mapDriver.getCounters(); Counter counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATE_PARSE_ERROR); assertTrue("Expected 1 date parse rror", counter.getValue() == 1); counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATA_PARSE_ERROR); assertTrue("Expected 1 data parse rror", counter.getValue() == 1); }
So now we have a basic framework for loading OpenEI data into a Hadoop cluster and have seen how to run MapReduce jobs against it. As you can tell, there are many moving parts and some non-trivial Java coding work to do some simple analysis on the data with Hadoop. While energy researchers may benefit from being able to analyze huge Solar data sets using MapReduce and Hadoop, they will need a higher-level of abstraction than what was presented here. Moreover, 8 minutes really isn't very fast for having run such a simple job on 8 machines with such a small data set, so I'll also need to address performance issues. Consequently, I plan to follow-up this posting with an example of working with this data using Apache Hive and Pig to see if these data warehouse frameworks make it easier to analyze the NSRDB. I also plan to compare the results with Hive / Pig to using a MySQL database for analyzing this data.