Friday, July 8, 2011

Analyzing the National Solar Radiation Database (NSRDB) with Hadoop

Introduction

In this posting, I provide a basic framework and some ideas for analyzing the National Solar Radiation Data Base (NSRDB) provided by OpenEI using MapReduce, Hadoop, and the Amazon cloud. First, a little introduction of OpenEI is in order; from the openei.org Web site,

Open Energy Information (OpenEI) is a knowledge-sharing online community dedicated to connecting people with the latest information and data on energy resources from around the world. Created in partnership with the United States Department of Energy and federal laboratories across the nation, OpenEI offers access to real-time data and unique visualizations that will help you find the answers you need to make better, more informed decisions. And because the site is designed on a linked, open-data platform, your contributions are both welcomed and encouraged.
Sounds pretty cool ... let's roll up our sleeves and see what we can do with this data using Hadoop. If you need a refresher on Hadoop, have a look at Yahoo's Hadoop Tutorial.

The current NSRDB data set contains >191M observations collected from Solar stations around the US from 1991-2005. Of course this data set is tiny by Hadoop standards, but the current data set is only a sub-set of available solar data. Also, there are other larger solar-related data sets that can use the same approach described in this posting to distribute expensive computations across large numbers of commodity servers using Hadoop. Moreover, this data can be joined with other data sets and analyzed using various Hadoop sub-projects, such as Hive or Pig. So this posting is mostly about how to get OpenEI data into a format that can be analyzed using MapReduce jobs (and other tools, e.g. Hive or Pig) in a Hadoop compute cluster. Specifically, I provide examples with source code for the following topics:

  • File size considerations for storing data in HDFS and S3
  • Loading static metadata into Hadoop's DistributedCache and special handling needed to make this work with S3 and Elastic MapReduce.
  • Running a MapReduce job locally in pseudo-distributed mode and on Amazon's Elastic MapReduce
  • Using MRUnit to unit test the MapReduce jobs before deploying to Hadoop
  • Using Hadoop Counters to help you diagnose unexpected job results and make writing unit tests easier

Building the Application

First, you need to download the source code from github/thelabdude/nsrdb (note: I don't reproduce all the source code in this blog post, so you'll have to download it to get the full details). Next, build the project with Maven 2.2.x using:

mvn clean package
I use the Maven assembly plug-in to create an executable JAR with all dependencies (nsrdb-0.0.2-jar-with-dependencies.jar) and the Hadoop Job JAR with dependencies (nsrdb-0.0.2-hadoop-jobs.jar).
<build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.2</version>
        <executions>
          <execution>
            <id>make-exe-jar</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                  <mainClass>thelabdude.nsrdb.GetData</mainClass>
                </manifest>
              </archive>
            </configuration>
          </execution>
          <execution>
            <id>make-jobs-jar</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
            <configuration>
              <descriptors>
                <descriptor>src/main/assembly/hadoop-jobs-jar.xml</descriptor>
              </descriptors>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

Download the Data

Next, we need to download the data from the OpenEI.org site using some good ol' screen scraping. The data is contained in 1,454 GZipped TAR files (tar.gz) on http://en.openei.org/datasets/files/39/pub/. The process I used is as follows:

  1. Parse the contents of the data listing HTML page to extract links matching a specified regular expression, in this case ^\d.*\.tar\.gz$.
  2. Download each data file and save locally so that I don't have to return to the site; build in a 2 sec delay between requests so that OpenEI doesn't get overloaded by the app.
  3. Uncompress each GZipped TAR file; each TAR contains 15 CSV files containing data for a station for each year in the data set
  4. Append the contents of each entry in the TAR file into a single Hadoop SequenceFile that is stored in HDFS.

Motivation for Using a Single Hadoop SequenceFile
My current goal is to store the data in a format that makes writing MapReduce jobs easy. Hadoop works better with a small number of large files that can be split and distributed by HDFS vs. a large number of small files. By small, I mean smaller than the HDFS block size (64MB is the default). Since the NSRDB is not actually that big, I chose to merge all data into a single, compressed SequenceFile. Internally, Hadoop will split the SequenceFile into replicated blocks that are distributed around the HDFS cluster. This means that each Map task will work on a sub-set of data that is most likely stored on the same server that the task is running on (i.e. data locality). For example, on Elastic MapReduce, the default block size is 128MB, so Hadoop will run 18 map tasks for our 2.1GB of input data. I should mention that if you store this SequenceFile in S3, then you do not benefit from data locality automatically, so I'll return to this discussion below.

I use a custom class for the key in my SequenceFile: thelabdude.nsrdb.StationYearWritable. This class holds a long station ID and an int year and implements Hadoop's Writable interface for fast Serialization. The value for each key in the file is simply the entire contents of each TAR entry stored in a BytesWritable object. In other words, I'm storing the CSV data in a large BLOB for each year and station (1,454 stations * 15 years = 21,810 records). We'll save the line-by-line parsing of each CSV file for the MapReduce jobs so we can distribute the processing across a cluster.

GetData in Action
So now that you understand the basic process I'm using to get the data into Hadoop, let's see how to run it. First, make sure Hadoop is setup in your environment. For now, I recommend using the pseudo-distributed configuration (see Hadoop 0.20.2 QuickStart). Once Hadoop is running, run the GetData application, output is saved to the HDFS you configured in conf/core-site.xml:
(java -jar nsrdb-0.0.2-jar-with-dependencies.jar \
  -nsrdbIndexUrl http://en.openei.org/datasets/files/39/pub/ \
  -downloadDir /mnt/openei/nsrdb/input \
  -hadoopDataPath hdfs://localhost:9000/openei/nsrdb/nsrdb.seq &>getdata.log &)
I'll refer you to the source code for thelabdude.nsrdb.GetData for all the details. This application took about 2 hours to download the 1454 tar.gz files from OpenEI.org and merge them into a single SequenceFile stored in HDFS (tail the getdata.log file to see the application's progress). While this is slow, you should only have to do this once. When it finishes, you can make sure it is available in HDFS using:
$ bin/hadoop fs -lsr /openei/nsrdb
You should see the following output:
/openei/nsrdb/nsrdb.seq (2346388001 bytes)
Note: Don't delete the raw data files from the download directory just yet as you may want to re-run the GetData application (with -skipDownloadStep) to create more than one SequenceFile if you plan to use Amazon S3 and Elastic MapReduce (see below).

Analyzing NSRDB using MapReduce

Now that the data is in HDFS, it can be used as input to MapReduce jobs. Let's take a look at a few simple jobs that illustrate how to process the data.

Maximum Radiation Measured by Year
Which station is the sunniest in the winter months of each year? While this probably doesn't mean much to energy researchers, and I'm using the term "sunniest" very loosely, it does give us a simple example to exercise the NSRDB data using MapReduce. Here's how to run the job in your local Hadoop:
bin/hadoop jar nsrdb-0.0.2-hadoop-jobs.jar thelabdude.nsrdb.MaxValueByYearJob \
  /openei/nsrdb/nsrdb.seq /openei/nsrdb/maxValueByYear \
  /home/ubuntu/NSRDB_StationsMeta.csv 1
Notice that you need to pass the location of the NSRDB_StationsMeta.csv file to the job. Download the file from OpenEI and save it to the local filesystem where you run the job. The job uses Hadoop's DistributedCache to replicate the static station metadata CSV to all nodes in the cluster. Using Hadoop 0.20.2 in pseudo-distributed mode on a small Amazon EC2 instance (32-bit Ubuntu), the job finished in about 50 minutes (of course, we'll improve on this time in a real cluster). Once it completes, look at the results using:
hadoop fs -text /openei/nsrdb/maxValueByYear/part-r-00000
Let's look at the Map and Reduce steps in detail; the MapReduce job flow looks like this:
Map(k1,v1) → list(k2,v2) where k1 is Station ID + Year, v1 is a CSV file stored as a array of bytes, k2 is a year, such as 2001, and v2 is the station ID and sum of a solar radiation field. Using the new Hadoop API, the mapper class is defined as:
static class MaxValueByYearMapper
 extends Mapper<StationYearWritable, BytesWritable, IntWritable, StationDataWritable> {
  ...
}
Reduce(k2, list (v2)) → list(v3) where k2 and v2 are the same as what was output from the mapper and v3 is metadata for the station the measured the most radiation per year. All v2 values for the same key k2 are sorted by Hadoop during the shuffle and sent to the same reducer. The reducer class is defined as:
static class MaxValueByYearReducer
  extends Reducer<IntWritable, StationDataWritable, IntWritable, Text> {
  ...
}

Map Step
We need to sum up all the radiation measured by each station during the winter each year. In the end, we will have 15 stations identified in our reduce output (one for each year in our dataset). Recall that keys in our SequenceFile are StationYearWritable objects and values are BytesWritable objects. Thus, the map function signature is:
public void map(StationYearWritable key, BytesWritable value, Context context)
        throws IOException, InterruptedException
Now is the time to process the CSV data line-by-line, since presumably our Map task is distributed in a Hadoop cluster. Before processing the CSV data for a station, the Map step consults the station metadata stored in Hadoop's DistributedCache to see if the station is of interest to this job. For this job, I'm only interested in class 1 or 2 stations in the lower 48 US states.

Next, the Map step parses each line from the stored CSV file.
IMPORTANT: Be careful when reading from BytesWritable objects because the byte[] returned from getBytes() may have more bytes than expected. You need to read only up to index getLength()-1:

ByteArrayInputStream bytesIn = new ByteArrayInputStream(value.getBytes(), 0, value.getLength());
    reader = new BufferedReader(new InputStreamReader(bytesIn, UTF8));
For this job, I'll sum up the values in the Modeled Global Horizontal field to find my maximum. I'm also using a common MapReduce technique called a Combiner where I'm doing some of the summarization work in the Map step since we know that each value coming into the map function contains all the data we need for a given station per year. Combiners help reduce the amount of disk I/O needed between the Map and Reduce steps by reducing the number of key/value pairs output by your mapper. Hadoop allows you to configure a separate Combiner class for your job, but since the logic for this job is so easy, I can just do the summarization in the mapper.

A few words about Hadoop's DistributedCache
Hadoop's DistributedCache replicates static data needed by your job at runtime to all DataNodes in the cluster, which addresses my job's need to get access to station metadata nicely. However, using the DC can be tricky as the current documentation is not clear on how to set it up correctly, especially when working with S3 and Elastic MapReduce. In the job's run method, I have the following code to add the station metadata CSV file to Hadoop's DistributedCache:
URI localUri = stationMetadataLocal.toUri();
    if ("s3".equals(localUri.getScheme()) || "s3n".equals(localUri.getScheme())) {
      copyFromS3File(conf, stationMetadataLocal, stationMetadataHdfs);
    } else {
      FileSystem fs = FileSystem.get(conf);
      fs.copyFromLocalFile(false, true, stationMetadataLocal, stationMetadataHdfs);
    }
    DistributedCache.addCacheFile(stationMetadataHdfs.toUri(), conf);
You'll notice that I have to handle files in S3 differently than files on the local file system where the job is run. Essentially, if your static data is in S3, you need to copy it to HDFS using two separate file systems: S3N and HDFS, see my copyFromS3File method.

Reduce Step
The reduce step is simple: it receives a list of values per year and sums them for each station. The framework collects and sorts the values for each year before invoking the reducer during an internal phase called the "shuffle". Once all station data has been summed up, the station with the maximum sum is written to the output. As with the Map step, my Reducer consults the DistributedCache to get station metadata for its output, which is easier to interpret than just a station ID.

Group by Year
I've also included a simple job to transform the data to group values by year (see thelabdude.nsrdb.GroupByYearJob). This example shows how you do not need a Reducer for your job if you just want to transform the input data. The output of this job is useful for using Hadoop's streaming interface.

Running in the Cloud

Now, let's run this same job an Amazon Elastic MapReduce cluster. You'll need a few command-line tools to do the work in this section.

On Debian-based systems, you can do:
sudo apt-get install s3cmd
sudo apt-get install openssl
sudo apt-get install ruby1.8
sudo apt-get install libopenssl-ruby1.8
sudo apt-get install libruby1.8-extras
wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
Once installed, you'll need your Amazon Access Key and Secret Access Key values to configure s3cmd using: s3cmd --configure. Follow the instructions for elastic-mapreduce to get that setup as well. If you need a refresher on Amazon access credentials, please read: Understanding Access Credentials for AWS/EC2.
Be aware of the implications of the region setting in the elastic-mapreduce credentials.json file as it determines which region your Elastic MapReduce instances launch in. If you store your data in a US Standard S3 bucket, then I recommend launching your EMR instances in the "us-east-1" region as there are no data transfer fees between your EMR instances and S3. On the other hand, if you launch your cluster in the "us-west-1" region, you will incur data transfer fees if your data is in a US Standard S3 bucket, so only use "us-west-1" if your S3 bucket is in Northern California.

File Size Revisited for Amazon S3
Previously, I made the statement that Hadoop works better with a small number of large files vs. a large number of small files. While this is still true with EMR, we need bend this rule slightly when working with files stored in S3 as using only one large file will lead to a major degradation in performance. Your job may actually run slower than it does in pseudo-distributed mode on a single machine since you'll also have Hadoop job management and S3 latency involved. Intuitively, this makes sense because Hadoop sets the number of map tasks based on the number of splits. So no matter how big your cluster is, only one node can process the input from one file on S3. The solution is split the data into smaller chunks, such as double the number of nodes in your cluster. The GetData tool I developed allows you to specify the number of SequenceFiles to create using the chunkSize parameter. For a cluster with 4 nodes, I used chunkSize=256 which resulted in 9 SequenceFiles instead of 1. The key take-away here is that now I can have 9 map tasks running in parallel instead of only one.

Copy the SequenceFile(s) created by GetData from your local HDFS to an S3 bucket you have access to using Hadoop's distcp:

hadoop distcp /openei/nsrdb s3n://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]
Notice that I pass my Amazon ACCESS_KEY and SECRET_KEY in the S3 URI and that I'm using s3n as the protocol (native S3), see: Hadoop and Amazon S3. The distcp command uses MapReduce to distribute a large copy around your Hadoop cluster. The files copied from HDFS will be the input to your MapReduce jobs on EMR.

Next, you need to put the Jobs JAR file in S3, for this I like to use s3cmd:

s3cmd put s3://BUCKET[/PREFIX] 
For example:
s3cmd put nsrdb-0.0.2-hadoop-jobs.jar s3://thelabdude/openei/
Now, you're all set for running the job in EMR. The approach I like to take is to start a job flow without any jobs configured (actually, this is pretty much the standard approach when working with EMR). Once the cluster is running, I add job flow steps to it, which is more efficient because if the job fails due to a invalid or missing parameter, I can correct and re-start the step without waiting for the cluster to restart. To start a job flow, do the following:
elastic-mapreduce --create --alive \
  --log-uri s3n://BUCKET/openei/logs/ \
  --key-pair YOUR_KEY_PAIR_NAME \
  --slave-instance-type m1.small \
  --master-instance-type m1.small \
  --num-instances 9 \
  --name openei.nsrdb
This will create a new Job Flow named "openei.nsrdb" with 1 master node and 8 data nodes (m1.small instances); it can take a few minutes before the job flow is started. Take note of the Job Flow ID returned as you will need this to access logs and add job steps. You can add a new "step" (ie. Hadoop job) once you have the Job Flow ID:
elastic-mapreduce --jar s3n://BUCKET/openei/nsrdb-0.0.2-hadoop-jobs.jar \
  --main-class thelabdude.nsrdb.MaxValueByYearJob \
  --arg s3n://BUCKET/openei/nsrdb-chunks \
  --arg s3n://BUCKET/openei/maxValueByYear/ \
  --arg s3n://BUCKET/openei/NSRDB_StationsMeta.csv \
  --arg 8 \
  -j JOB_ID
Notice that I'm now requesting 8 reducers; Hadoop recommends a range of ((0.95 to 1.75) * cluster size). Also, I'm reading the input data directly from s3n://BUCKET/openei/nsrdb-chunks. If you're going to run multiple jobs on the data in the same EMR JobFlow, then I recommend using distcp to copy the data from S3 to the local HDFS for your Elastic MapReduce cluster using:
elastic-mapreduce --jar s3n://elasticmapreduce/samples/distcp/distcp.jar \
  --arg s3n://BUCKET/openei/nsrdb-chunks \
  --arg /openei/
  -j JOB_ID
This will help your jobs run much faster because the input data is already loaded into the local HDFS of your EMR cluster (data locality). To view the logs for your Job Flow, simply do:
elastic-mapreduce --logs -j JOB_ID
With a 8 node cluster of EC2 small instances, the job completed in roughly 8 minutes (after using distcp to copy the data from S3). The output from the job is included here:
2011-07-08 23:15:45,796 INFO org.apache.hadoop.mapred.JobClient (main): Job complete: job_201107082258_0002
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main): Counters: 19
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   Job Counters 
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Launched reduce tasks=8
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Rack-local map tasks=5
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Launched map tasks=19
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     Data-local map tasks=14
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   thelabdude.nsrdb.MaxValueByYearJob$StationDataCounters
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):     STATION_IGNORED=10095
2011-07-08 23:15:45,804 INFO org.apache.hadoop.mapred.JobClient (main):   FileSystemCounters
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_READ=131196
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     HDFS_BYTES_READ=2347266412
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_WRITTEN=276357
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     HDFS_BYTES_WRITTEN=1307
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):   Map-Reduce Framework
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input groups=15
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Combine output records=0
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map input records=21810
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce shuffle bytes=141561
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce output records=15
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Spilled Records=23430
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map output bytes=281160
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Combine input records=0
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Map output records=11715
2011-07-08 23:15:45,805 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input records=11715
Notice that the job ignored 673 stations (thelabdude.nsrdb.MaxValueByYearJob$StationDataCounters.STATION_IGNORED: 10095 / 15 years). The reducer output looks like:
1991    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,350400.0
1992    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,332856.0
1993    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,356962.0
1994    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,343829.0
1995    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354387.0
1996    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,377977.0
1997    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,354811.0
1998    722695,2,false,LAS CRUCES INTL,NM,32.283,-106.917,1393,-7,32.283,-106.917,1393,354944.0
1999    722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,370871.0
2000    722735,2,false,DOUGLAS BISBEE-DOUGLAS INTL A,AZ,31.467,-109.6,1249,-7,31.467,-109.6,1249,371146.0
2001    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,374057.0
2002    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,354146.0
2003    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,363305.0
2004    722010,1,false,KEY WEST INTL ARPT,FL,24.55,-81.75,1,-5,24.55,-81.75,1,362566.0
2005    722016,2,false,MARATHON AIRPORT,FL,24.733,-81.05,2,-5,24.733,-81.05,2,373423.0
Not surprisingly, all the values are in either Arizona or Florida!

IMPORTANT: Be sure to terminate your job flow when you're finished working with it so that you don't incur unnecessary expenses.
elastic-mapreduce --terminate -j JOB_ID
For more tips about running jobs in Elastic MapReduce, check out my contributions to the Mahout wiki based on some benchmarking work I did for Taming Text. See Building Vectors for Large Document Sets on Mahout on Elastic MapReduce. Alternatively, you can also spin-up your own Hadoop cluster instead of EMR using the notes I compiled for Mahout on Amazon EC2 > Use an Existing Hadoop AMI

Testing Hadoop Jobs with MRUnit

Lastly, I've included a test case (thelabdude.nsrdb.MaxValueByYearMRUnitTest) to test my MR job using MRUnit. The MRUnit library provides the mock objects and job drivers that simulate the Hadoop runtime environment for testing your job outside of Hadoop. MRUnit drivers allow you to specify expected output from Mapper and Reducer classes given specific inputs. Notice that my job uses Hadoop Counters to keep track of decisions it makes, such as skipping a station or when it encounters an error when parsing the data. Counters can help you diagnose unexpected job results and make writing unit tests easier since you can assert expected counts based on your test input data. For example, the following unit test checks to see if the job is handling invalid input records correctly:

@Test
    public void testMapperIgnoreParseError() throws Exception {
      // data that produces parse errors
      String csv = ...

      BytesWritable value = new BytesWritable(csv.getBytes(MaxValueByYearJob.UTF8));              
      mapDriver.withInput(new StationYearWritable(690190,2005), value)
               .runTest();      

      Counters counters = mapDriver.getCounters();
      Counter counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATE_PARSE_ERROR);
      assertTrue("Expected 1 date parse rror", counter.getValue() == 1);              
      
      counter = counters.findCounter(MaxValueByYearJob.StationDataCounters.DATA_PARSE_ERROR);
      assertTrue("Expected 1 data parse rror", counter.getValue() == 1);              
    }
Both CSV formatted input records contain data formatting errors. Thus, I expect the MaxValueByYearJob.StationDataCounters.DATE_PARSE_ERROR and DATA_PARSE_ERROR Counters to report 1 parse error each. Try to avoid overloading the meaning of each counter and have a specific counter for each possible error / event in your job as it will help you troubleshoot data problems when running in a large distributed cluster. For information on Hadoop Counters, see Cloud9: Working with counters

Conclusion

So now we have a basic framework for loading OpenEI data into a Hadoop cluster and have seen how to run MapReduce jobs against it. As you can tell, there are many moving parts and some non-trivial Java coding work to do some simple analysis on the data with Hadoop. While energy researchers may benefit from being able to analyze huge Solar data sets using MapReduce and Hadoop, they will need a higher-level of abstraction than what was presented here. Moreover, 8 minutes really isn't very fast for having run such a simple job on 8 machines with such a small data set, so I'll also need to address performance issues. Consequently, I plan to follow-up this posting with an example of working with this data using Apache Hive and Pig to see if these data warehouse frameworks make it easier to analyze the NSRDB. I also plan to compare the results with Hive / Pig to using a MySQL database for analyzing this data.

Thursday, September 23, 2010

VinWiki Part 4: Making Recommendations with Mahout

Introduction

This is the final post in a four part series about a wine rating and recommendation Web application built using open source Java technology. The purpose of this series is to document key design and implementation decisions that can be applied to other Web applications. Please read the first, second, and third posts to get up-to-speed. You can download the project (with source) from here.

In this posting, I lay the foundation for making recommendations using Apache Mahout v. 0.3. For a thorough introduction to Mahout, I recommend Mahout in Action.

Collaborative Filtering in Mahout
Mahout's main goal is to provide scalable machine-learning libraries for classification, clustering, frequent itemset mining, and recommendations. Classification assigns a category (or class) from a fixed set of known categories to an un-categorized document. For example, some feed readers assign articles to broad categories like Sports or Politics using classification techniques. Clustering assigns documents to groups of similar documents using some notion of similarity between the documents in the group. For example, Google News uses clustering to group articles from different publishers that cover the same basic story. Frequent itemset mining determines which items, such as products in a shopping cart, typically occur together.

In this posting, I leverage the collaborative filtering features of Mahout to make wine recommendations based on ratings by VinWiki users. Collaborative filtering produces recommendations based on user preferences for items and does not require knowledge of the specific properties of the items. In contrast, content-based recommendation produces recommendations based off of intimate knowledge of the properties of items. This implies, of course, that content-based recommendation engines are domain-specific, whereas Mahout's collaborative filtering approach can work in any domain provided it has sufficient user-item preference data to work with.

For VinWiki, I experimented with three basic types of Mahout Recommenders:

  1. User Similarity
  2. Item Similarity
  3. SlopeOne
Check out the Mahout Web site for information about other more experimental recommenders, such as one based on Singular value decomposition (SVD).


To decide which one of these recommenders is best for your application, you need to consider four key questions:
  1. How to represent a user's preference for an item?
  2. What is the ratio of items to users?
  3. How do you determine the similarity between users or between items?
  4. If using UserSimilarity, what is the size of a user neighboorhood?
As I'll demonstrate below, Mahout provides a framework to allow you to answer these questions by analyzing your data.

User Preferences
What constitutes a user preference for an item in your application? Is it a boolean "like" or "dislike" or does the preference have a strength, such as "I like Chardonnay but like Sauvignon Blanc better"? The structure of user-item preference data used in Mahout is surprisingly simple: userID, itemID, score, where score represents the strength of the user's preference for the item, see org.apache.mahout.cf.taste.model.Preference. There are two concrete implementations of the Preference interface in Mahout: BooleanPreference and GenericPreference. For VinWiki, I use GenericPreference because I chose to allow users to give a score for a wine.

Basic Structure of a UserSimilarity Recommender
Let's take a look at the basic approach Mahout takes to make a UserSimilarity based recommendation using VinWiki nomenclature:
1: For all wines W that user A has NOT expressed a preference for
2:   For every other user B (in A's neighborhood) that has expressed a preference for W
3:     Compute the similarity S between user A and B
4:       Add the User B's preference X for W weighted by S to a running average preference
5: Sort Wines by weighted average preference
6: return top R wines from sorted collection as recommendations

Intuitively, this approach makes sense. From the pseudo-code above, it should be clear that we need a way to calculate the similarity S between two Users A and B, which is represented in Mahout as a org.apache.mahout.cf.taste.similarity.UserSimilarity. Also, notice that the algorithm weights recommendations by user similarity, which means that the more similar a user is to you, the more heavily their preferences count in making recommendations. Consequently, the selection of the similarity calculation is very important to making good recommendations. Mahout provides a number of concrete implementations if the UserSimilarity interface, see the org.apache.mahout.cf.taste.impl package.

In practice, most systems that need to produce recommendations have many users and calculating a similarity between all users is too computationally expensive. Thus, Mahout uses the concept of a user neighborhood to limit the number of similarity calculations to a smaller subset of similar users. This introduces another question that needs to be answered when building your recommender: What is the optimal size of the user-neighborhood for my data?

Mahout also allows you to make recommendations based on similarity between Items. Don't confuse Mahout's Item-based recommender with content-based recommenders since it is still based on user-item interactions and not the content of items.

Using Mahout in VinWiki
The main service for creating recommendations at runtime is the MahoutWineRecommender, which is an application-scoped Seam component. The MahoutWineRecommender has two dependencies injected during initialization:
  • DataModelProvider
  • RecommenderConfig

DataModel
The org.vinwiki.recommender.DataModelProvider Seam component (configured in components.xml) provides a Mahout DataModel to the recommender. For now, I'm using Mahout's FileDataModel, which as you might have guessed, reads preference data from a flat file. During startup, if this file doesn't exist, then the DataModelProvider reads wine ratings from the database and writes them to a new file.

Sample Wine Ratings Data
As this is just an example Web application, I don't have real wine ratings data. Consequently, I generated some fake data that recommends wines to sample users based on the first letter of the user name. For example, the data will cause Mahout to recommend wines that start with the letter "A" to the "A_test0" user. Here is some log output to demonstrate how the sample ratings data works:
NearestNUserNeighborhood[3,0.6,0.8,EuclideanDistanceSimilarity] 
  recommended [1887, 286, 1120, 1350, 520, 1905] wines to A_test0
     Neighbor(43) A_test30
         rated Wine 1120 91.0 pts
         rated Wine 1350 87.0 pts
     Neighbor(33) A_test20
         rated Wine 1887 88.0 pts
         rated Wine 1350 88.0 pts
     Neighbor(63) A_test50
         rated Wine 1350 90.0 pts
Notice that A_test0's neighbor's user names also start with "A_". When I created the sample ratings data, I had users rate wines that begin with the same letter a little higher than they rated other wines. You can try this yourself after deploying the application to JBoss 4.2.3 by logging in with username "A_test0" and password "P@$$w0rD" (without the quotes of course).

Refreshing the Recommender
When I first started working with Mahout, it wasn't clear how to handle data model changes at runtime because most of the built-in Mahout examples work with static, pre-existing datasets. In VinWiki, rating wines is a primary activity, so preference data will be changing frequently. Moreover, if a user provides several new ratings in a session, then they'll expect to have some recommendations based on those new ratings or they will think the site is broken and probably not return. Consequently, it's very important for this application to incorporate recent user activity into recommendations in near real-time.

Whenever a user rates a wine, the ratingHome component will raise the App.WINE_RATED_BY_USER event. The MahoutWineRecommender component observes this event and passes it to the DataModelProvider.

@Observer(App.WINE_RATED_BY_USER)
@Asynchronous
public void onWineRatedByUser(Rating r) {
    // Let the model provider know that data has changed ...
    if (dataModelProvider.updateDataModel(r.getUser().getId(), r.getWine().getId(), r.getScore())) {
        // provider indicates that we should refresh the recommender
        recommender.refresh(null);
    }
}
In response to this event, the DataModelProvider component can choose to update its internal state to reflect the change. In my current implementation, the DataModelProvider uses a nice feature provided by Mahout's FileDataModel by writing updates to a smaller "delta" file. The FileDataModel will load these additional "delta" files when it is refreshed. So that covers updating the DataModel, but what about the Recommender and its other dependencies, such as UserSimilarity and UserNeighborhood? In my implementation, the DataModelProvider makes the decision of whether the Recommender should be refreshed. This allows a more sophisticated DataModelProvider implementation to batch up changes so that the recommender is not refreshed too often as refreshing a recommender and its dependencies can be an expensive operation for large data sets.

Accounting for User Preferences
Users can de-select wines they are not interested in using the Preferences dialog. Changes to a user's preferences should be reflected in recommendations. For example, if a user indicates that they are not interested in white wine, then we should not recommend any white wines to them. Mahout allows you to inject this type of filtering on recommendations using an org.apache.mahout.cf.taste.recommender.IDRescorer.

In VinWiki, filtering recommendations by preferences is provided by the org.vinwiki.recommender.PreferencesIDRescorer class. If you revisit the pseudo-code above, then it should be obvious that the IDRescorer may need to evaluate the filter on a large number of wines. Thus, the IDRescorer should be implemented in an efficient manner; I used the Lucene native API to iterate over all wines to build and cache a Mahout FastIDSet of wine Ids that can be recommended to the current user.

// Using Lucene to initialize a Mahout FastIDSet for rescoring
int maxDoc = reader.maxDoc();
for (int docId = 0; docId < maxDoc; docId++) {
    if (reader.isDeleted(docId))
        continue;

    try {
        doc = reader.document(docId, getFieldSelector());
    } catch (Exception zzz) {
        ...
    }
    if (doc == null)
        continue;

    Long wineId = new Long(doc.get(ID));
    String type = doc.get(TYPE);
    String style = doc.get(STYLE);
    Long regionId = new Long(doc.get(REGION));

    // ask the User's Preferences object if this wine is enabled
    if (prefs.checkWineFilter(wineId, type, style, regionId)) {
        idSet.add(wineId);
    }
}

There is one subtle aspect to the current implementation in that it does not refresh during the user's session as new wines are added to the search index. In other words, you are not going to see any code that tries to update the rescorer after new wines are added to the system. Remember that our recommendations are based on user-item interactions and new wine objects are not going to have enough (if any) ratings to impact the current user's session. However, the rescorer is refreshed if the user changes their preferences.

Tip: using an IDRescorer is a simple form of content-based recommendations in that we're using specific properties of the wine objects to influence our recommendations.

RecommenderConfig
org.vinwiki.recommender.RecommenderConfig is an application-scoped Seam component (configured in components.xml) that supports common options for configuring the behavior of a Recommender.

At startup, the MahoutWineRecommender uses the DataModel and RecommenderConfig to initialize a Recommender. The Recommender is held in application-scope because it is expensive to build and should be re-used for all recommendation requests from FetchRecommended objects (see Server-side Pagination from the first posting in this series). The following code snippet gives you an idea of how to construct a User-based recommender with Mahout:

// see RecommenderConfig.java
    UserSimilarity userSimilarity = createUserSimilarity(dataModel);
    UserNeighborhood neighborhood = createUserNeighborhood(userSimilarity, dataModel);
    return new GenericUserBasedRecommender(dataModel, neighborhood, userSimilarity);

Here is an example configuration from components.xml. NOTE: You must set the fileDataModelFileName to a valid path on your server before running the sample!

<component name="dataModelProvider" auto-create="true" scope="application" 
        class="org.vinwiki.recommender.DataModelProvider">
    <property name="fileDataModelFileName">/home/thelabdude/thelabdude-blog-dev/jboss-4.2.3/bin/recommender/ratings.txt</property>
    <property name="updateFileSizeThresholdKb">10</property>
  </component>

  <component name="recommenderConfig" auto-create="true" scope="application" 
        class="org.vinwiki.recommender.RecommenderConfig">
    <property name="recommenderType">USER_SIMILARITY</property>
    <property name="similarityClassName">org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity</property>
    <property name="neighborhoodSize">2</property>
    <property name="minSimilarity">0.7</property>
    <property name="samplingRate">0.2</property>    
  </component>
Looks easy enough, but what about all those parameters to the recommenderConfig? Thankfully, Mahout provides a powerful tool to help you determine the correct values to use for each of these settings for your data - RecommenderEvaluator.

Evaluating a Mahout Recommender
It should be clear that the optimal recommender for your data requires experimentation with how you represent preferences, calculate user or item similarity, and the size of user neighborhood. Mahout provides an easy way to compare the results for different configuration options using a RecommenderEvaluator. Currently, there are two concrete RecommenderEvaluator implementations:
  • AverageAbsoluteDifferenceRecommenderEvaluator - computes the average absolute difference between predicted and actual ratings for users.
  • RMSRecommenderEvaluator - computes the "root mean squared" difference between predicted and actual ratings for users
I chose to use the RMSRecommenderEvaluator because it penalizes bad recommendations more heavily than the AverageAbsoluteDifference evaluator. When doing evaluations, the lowest score is best. Notice in the code snippet below how the RecommenderConfig (part of VinWiki) helps you run evaluations:

RecommenderConfig config = new RecommenderConfig();
config.setRecommenderType(RecommenderType.USER_SIMILARITY);
config.setSimilarityClassName(simClass.getName());
config.setNeighborhoodSize(c);
config.setMinSimilarity(minSimilarity);
config.setSamplingRate(samplingRate);

RecommenderBuilder builder = config.getBuilder();
double score = evaluator.evaluate(builder, 
                 null, // no DataModelBuilder
                 recommenderDataModel, 
                 0.8, // training data pct
                 1); // use all users

For VinWiki, I developed a Seam ComponentTest to run evaluations. At this point, the output is not as important as the process, since the results are based on simulated ratings data (VinWiki is not yet a live application with real users). This is a problem facing any new application that uses machine-learning algorithms that require real user input. One idea to get real user input is to use Amazon's Mechanical Turk service to hire users to create real user-item interactions for your application. Regardless of how you seed your application with real user data, the approach in src/test/org/vinwiki/RecommenderTest.java should still be useful to you.

Conclusion
So this concludes the four-part series on VinWiki. As you can see, integrating Mahout is easy, but it does require experimentation and tuning. You also have to be cognizant of scalability issues, such as how often to refresh your recommender. The framework I added to VinWiki should be useful for your application too. Please leave comments on my blog if you have questions or would like to suggest improvements to any of the features I discussed in any of the four posts.

Thursday, July 8, 2010

VinWiki Part 3: Authentication with Facebook Connect and Sharing Content with Friends

Introduction

This is the third post in a four part series about a wine rating and recommendation Web application built using open source Java technology. The purpose of this series is to document key design and implementation decisions that can be applied to other Web applications. Please read the first and second posts to get up-to-speed. You can download the project (with source) from here.

In this posting, I implement several common features that should help users find and contribute to your application. Specifically, I integrate with Facebook Connect (based on OAuth 2) to allow Facebook users to instantly register and authenticate using their Facebook profile. From there, I integrate the Facebook Like social plug-in which allows users to share content in your application with their friends on Facebook.

Authentication using Facebook Connect
Login with Facebook Connect
The Facebook team has made integrating Facebook Connect into your application very easy. There are open source Java libraries available for integrating with Facebook, however I found it easier to just use the JavaScript SDK. Let's go through the process in five simple steps:

I. Register Your Application with Facebook
First, you need to register an application in Facebook to get a unique Application ID (please use something other than "VinWiki" for your application name since I'll be using that one in the near future). Update resources/WEB-INF/components.xml to set the facebookAppId property on the app component:
<component name="app" auto-create="true" scope="application" class="org.vinwiki.App">
    ...
    <property name="facebookAppId">ENTER_YOUR_FB_APPLICATION_ID_HERE</property>
  </component>

II. Initialize the Facebook JavaScript Library
Second, you need to initialize the Facebook JavaScript library when your page loads. For this, I created a new Facelets include file view/WEB-INF/facelets/facebook.xhtml and loaded it into the footer in my layout template view/layout/template.xhtml:
<h:panelGroup rendered="#{app.isFacebookEnabled()}">
    <ui:include src="/WEB-INF/facelets/facebook.xhtml"/>
  </h:panelGroup>
In facebook.xhtml, I have the following JavaScript:
(function() {
    var e = document.createElement('script'); e.async = true;
    e.src = document.location.protocol + '//connect.facebook.net/en_US/all.js';
    document.getElementById('fb-root').appendChild(e);
  }());
This function, borrowed from the Facebook developer documentation, asynchronously loads the Facebook JavaScript file into your page.

III. Register a JavaScript callback handler for Facebook session events
Once the JavaScript library is loaded, the window.fbAsyncInit function is called automatically.
window.fbAsyncInit = function() {    
    FB.init({appId:'#{app.facebookAppId}', status:true, cookie:true, xfbml:true});
    FB.Event.subscribe('auth.sessionChange', function(response) {
      if (response.session) {
        // Login successful
        var uid = response.session.uid;
        FB.api('/me', function(resp) {
          onFbLogin(uid, resp.email, resp.name, resp.first_name, resp.last_name, resp.gender);
        });
      } else {
        // The user has logged out, and the cookie has been cleared
        onFbLogout();
      }
    });
  };
After initializing the library (see FB.init), the application registers a listener for auth.sessionChange events (login or logout). On login, I use Facebook's Graph API to get some basic information about the current user (see FB.api). In the response callback handler for FB.api('/me'), I invoke a JavaScript function onFbLogin that executes the #{guest.onFbLogin()} action:
<a4j:form prependId="false">
  <s:token/>
  <a4j:jsFunction immediate="true" name="onFbLogin" ajaxSingle="true" action="#{guest.onFbLogin()}">
    ... params here ...
  </a4j:jsFunction>
</a4j:form>
Notice that I'm using Seam's <s:token/> tag to prevent cross-site request forgery since the #{guest.onFbLogin()} action simply trusts that the user was authenticated by Facebook on the client-side. For an overview of the thinking behind the <s:token/> tag, I refer you to Dan Allen's post at http://seamframework.org/Community/NewComponentTagStokenAimedToGuardAgainstCSRF. However, please realize that you must set javax.faces.STATE_SAVING_METHOD to "server" in web.xml for this method to secure your forms:
<context-param>
    <param-name>javax.faces.STATE_SAVING_METHOD</param-name>
    <param-value>server</param-value>
  </context-param>
The action handler on the server side is straight-forward because Seam supports alternative authentication mechanisms out-of-the-box. Specifically, all you need to do is invoke the acceptExternallyAuthenticatedPrincipal method of the org.jboss.seam.security.Identity object. I utilized the existing guest component because it handles other guest related actions such as register and login (see src/hot/org/vinwiki/user/GuestSupport.java)
Identity.instance().acceptExternallyAuthenticatedPrincipal(new FacebookPrincipal(user.getUserName()));
    Contexts.getSessionContext().set("currentUser", user);
    Events.instance().raiseEvent(Identity.EVENT_POST_AUTHENTICATE, Identity.instance());
I also raise the Identity.EVENT_POST_AUTHENTICATE event manually so that my nav component can re-configure the default for the authenticated user instead of showing the guest view.

IV. Show Facebook Connect Button on Login Panel
Lastly, we need to let users know that they login (or register) using their Facebook credentials. This is accomplished with the <fb:login-button> tag, see view/WEB-INF/facelets/guestSupport.xhtml.
<fb:login-button perms="email,publish_stream">
    <fb:intl>Login with Facebook</fb:intl>
  </fb:login-button>
The new RichFaces Login dialog looks like:
Login Dialog with Facebook Connect
Notice that VinWiki requests access to the email and publish_stream extended permissions. The publish_stream permission allows users to share wines of interest found on VinWiki with their friends on Facebook. When accessing VinWiki for the first time, users will see a dialog that allows them to grant permissions to the application:
Facebook Extended Permissions

V. Logout
It's doubtful whether many users will ever explicitly logout of your site unless they are accessing it from a public computer. Consequently, you'll want to keep your session timeout value as low as possible. That said, you still need to offer the ability to logout. In view/WEB-INF/facelets/headerControls.xhtml, the logout action is implemented using a simple JSF commandLink:
<h:commandLink rendered="#{nav.isFbSession()}" onclick="FB.logout();" styleClass="hdrLink">
  <h:outputText value="Logout"/>
</h:commandLink>
The magic is in our sessionChange event listener discussed above. The Facebook JavaScript function FB.logout() triggers an auth.sessionChange event, which in turn calls onFbLogout() to execute the #{guest.onFbLogout()} action on the server.

So that covers authentication using Facebook Connect. There are many other ways to integrate your application with Facebook. In the next section, I'll implement a way for users to share content in your application with their friends on Facebook. For this feature, it is helpful to have bookmarkable URLs.

Sharing Content with Friends
In VinWiki, users may want to share specific wines of interest with their friends on Facebook. For this, I used Facebook's Like social plug-in. There are other options, including just posting a shared link to the user's activity stream. There's not much to integrating the Like social plug-in into your page once you've accounted for bookmarkable URLs (see posting 1). The <fb:like> tag will use the URL of the current page if you don't specify an href attribute. However, I want to make sure the URL that is shared with Facebook is as clean as possible. Thus, I introduced a new setting for the org.vinwiki.App component named baseUrl. You should change this to match your server in resources/WEB-INF/components.xml:
<component name="app" auto-create="true" scope="application" class="org.vinwiki.App">
    ...
    <property name="baseUrl">http://192.168.1.2:8080/vinwiki/</property>
  </component>
I also decided to add the Open Graph meta tags to the header of my wine details page, view/wine.xhtml:
<meta property="og:title" content="#{currentWine.fullName}"/>
  <meta property="og:type" content="drink"/>
  <meta property="og:url" content="#{viewWine.openGraphUrl}"/>
  <meta property="og:site_name" content="VinWiki"/>
  <meta property="og:description" content="#{jsf.truncate(currentWine.description,100,'...')}"/>
Presumably, when you share a page that supports the Open Group protocol, Facebook is able to present this additional metadata to your friends. Of course, the URL needs to be public before this will actually work ;-)

What's Next?
In the next and final post in this series, I'll integrate Mahout for making wine recommendations and discuss some considerations for scaling the application.

Monday, June 14, 2010

VinWiki Part 2: Full-text Search with Hibernate Search and Lucene

Introduction

This is the second post in a four part series about a wine recommendation Web application built using open source Java technology. The purpose of this series is to document key design and implementation decisions that can be applied to other Web applications. Please read the first post for an overview of the project. You can download the project (with source) from here.

In this posting, I leverage Hibernate Search and Lucene to provide full-text search for wines of interest. This is post is merely an attempt to complement the already existing documentation on Lucene and Hibernate Search. If you are familiar with Hibernate and are new to Lucene, then I recommend starting out with the online manual for Hibernate Search 3.1.1 GA. In addition, I highly recommend reading Hibernate Search in Action and Lucene in Action (Second Edition); both are extremely well-written books by experts in each field.

Basic Requirements
Full-text search allows users to find wines using simple keywords, such as "zinfandel" or "dry creek valley". Users have come to expect certain features from a full-text search engine. The following table summarizes the key features our wine search engine will support:
FeatureDescription
Paginated search results
(see first post for solution)
Return the first 5-10 results, ordered by relevance on the first page of the search results. Allow users to retrieve more results by advancing the page navigator.
Allows users to set a bounded preference for how many items to show on earch page.
Query term highlighting
(view solution)
Highlight query terms in search results to allow users to quickly scan the results for the most relevant items.
Spell correction
(view solution)
Ability to detect and correct for misspelled terms in the query.
Type-ahead suggestions
(view solution)
Show a drop-down selection box containing suggested search terms after the user has typed at least 3 characters.
Advanced search form
(view solution)
Allow users to fine-tune the search with an advanced search form.

Setup
From the previous post, you'll recall that I'm using Hibernate to manage my persistent objects. As such, I've chosen to leverage Hibernate Search (referred to as HS hereafter) to integrate my Hibernate-based object model with Lucene. For now, I'm using Hibernate Search v. 3.1.1 GA with Hibernate Core 3.3.1 GA. On the Lucene side, I'm using version 2.9.2 and a few classes from Solr 1.4. In the last posting in this series, we'll upgrade to the latest Hibernate Search and Core, which rely on JPA 2 and thus will require a move up to JBoss 6.x. As far as I know, the latest HS and Core classes have trouble on JBoss 4.2 and 5.1 because of their dependency on JPA 2 (let me know if you get it working).

Why not use the database to search?
Some may be wondering why I'm using Lucene instead of doing full-text search in my database? While possible, I feel the database is not the best tool for full-text searching. For an excellent treatment of the mis-match between relational databases and full-text search, I recommend reading Hibernate Search in Action. The first chapter provides a strong case for using Lucene instead of your database for full-text search. Just in terms of scalability, the database is the most expensive and complex layer to scale in most applications. Offloading searches to a local Lucene running on each node in your cluster reduces the amount of work your database is performing and ensures searches return quickly. Moreover, distributing a Lucene index across a cluster of app servers is almost trivial since Hibernate Search has built-in clustering support.

Project Configuration Changes
The following JAR files need to be included in your Web application's LIB directory (/WEB-INF/lib):
hibernate-search.jar
    lucene-core-2.9.2.jar
    lucene-highlighter-2.9.2.jar
    lucene-misc-2.9.2.jar
    lucene-snowball-2.9.2.jar
    lucene-spatial-2.9.2.jar
    lucene-spellchecker-2.9.2.jar
    lucene-memory-2.9.2.jar
    solr-core-1.4.0.jar
    solr-solrj-1.4.0.jar

How does Seam know to use Hibernate Search's FullTextEntityManager?
Seam's org.jboss.seam.persistence.HibernatePersistenceProvider automatically detects if Hibernate Search is available on the classpath. If HS is available, then Seam uses the org.jboss.seam.persistence.FullTextEntityManagerProxy instead of the default EntityManagerProxy, meaning that you will have access to a FullTextEntityManager wherever you have a Seam in-jected EntityManager. You also need to add a few more properties to your persistence deployment descriptor (resources/META-INF/persistence-*.xml):
<property name="hibernate.search.default.directory_provider" value="org.hibernate.search.store.FSDirectoryProvider"/>
  <property name="hibernate.search.default.indexBase" value="lucene_index"/>
  <property name="hibernate.search.reader.strategy" value="shared"/>
  <property name="hibernate.search.worker.execution" value="sync"/>
We'll make some adjustments to these settings as the project progresses, but these will suffice for now.

Indexing
The first step in providing full-text search capabilities is to index the content you want to search. In most cases, our users will want to find Wine objects and to a lesser extent Winery objects. Let's start by indexing Wine objects.

Indexing Wine Objects
I'll tackle indexing Wine objects in a few passes, progressively adding features, so let's start with the basics. First, we tell HS to index Wine objects using the @Indexed annotation on the class. As for what to search, I tend to favor using a single field to hold all searchable text for each object because it simplifies working with other Lucene extensions, such as the More Like This and term highlighting features.
@Field(name=DEFAULT_SEARCH_FIELD, index=Index.TOKENIZED)
    @Analyzer(definition="wine_stem_en")
    public String getContent() {
        // return one string containing all searchable text for this object
    }
Notice that the content field is Index.TOKENIZED during indexing. This means that the String value returned by the getContent method will be broken into a stream of tokens using a Lucene Analyzer; each token returned by the Analyzer will be searchable. Your choice of Analyzer depends on the type of text you are processing, which in our case is English text provided by our users when creating Wine objects. Hibernate Search leverages the text analysis framework provided by Solr. You can specify your Analyzer using the @Analyzer annotation. Behind the scenes, Hibernate Search creates the Analyzer using the Factory defined by an @AnalyzerDef class annotation. In our case
@AnalyzerDef(name = "wine_stem_en",
        tokenizer = @TokenizerDef(factory = org.vinwiki.search.Lucene29StandardTokenizerFactory.class),
        filters = {
          @TokenFilterDef(factory = StandardFilterFactory.class),
          @TokenFilterDef(factory = LowerCaseFilterFactory.class),
          @TokenFilterDef(factory = StopFilterFactory.class),
          @TokenFilterDef(factory = EnglishPorterFilterFactory.class)
    })
This looks more complicated than it is ... Let's take a closer look at the @AnalyzerDef annotation:
  name = "wine_stem_en"
This gives our analyzer definition a name so we can refer to it in the @Analyzer annotation. This will also come in handy when we start parsing user queries using Lucene's QueryParser, which also requires an Analyzer.
  tokenizer = 
  @TokenizerDef(factory = org.vinwiki.search.Lucene29StandardTokenizerFactory.class)
Specify a factory for Tokenizer instances. In this case, I'm supplying a custom factory class that creates a Lucene 2.9.x StandardTokenizer as the default factory provided by Solr creates a Lucene 2.4 StandardTokenizer (most likely for backwards compatibility).
  filters = {
    @TokenFilterDef(factory = StandardFilterFactory.class),
    @TokenFilterDef(factory = LowerCaseFilterFactory.class),
    @TokenFilterDef(factory = StopFilterFactory.class),
    @TokenFilterDef(factory = EnglishPorterFilterFactory.class)
  }
A tokenizer can pass tokens through a filter-chain to perform various transformations on each token. In this case, we're passing the tokens through four filters, in the order listed above.
  1. StandardFilter: Removes dots from acronyms and 's from the end of tokens. Works only on typed tokens, i.e., those produced by StandardTokenizer or equivalent.
  2. LowerCaseFilter: Lowercases letters in tokens
  3. StopFilter: Discards common English words like "an" "the" and "of".
  4. EnglishPorterFilter: Extracts the stem for each token using Porter Stemmer implemented by the Lucene Snowball extension.
You can index other fields, such as the wine type and style using the @Field annotation.
@Field(name="type",index=Index.UN_TOKENIZED,store=Store.YES)    
    public WineType getType() {
        return this.type;
    }

Building the Index
If you worked through the first post, then you should already have a database containing 2,025 wines. Hibernate Search will automatically index new Wine objects for us when the insert transaction commits, but we need to manually index the existing wines. During startup, the org.vinwiki.search.IndexHelper component counts the number of documents in the index and re-builds the index from the objects in the database if needed. During startup, you should see log output similar to:
[IndexHelper] Observed event org.vinwiki.event.REBUILD_INDEX from Thread QuartzScheduler1_Worker-8
[IndexHelper] Re-building Wine index for 2025 objects.
[IndexHelper] Flushed index update 300 from Thread Quartz ...
[IndexHelper] Flushed index update 600 from Thread Quartz ...
[IndexHelper] Flushed index update 900 from Thread Quartz ...
[IndexHelper] Flushed index update 1200 from Thread Quartz ...
[IndexHelper] Flushed index update 1500 from Thread Quartz ...
[IndexHelper] Flushed index update 1800 from Thread Quartz ...
[IndexHelper] Took 4094 (ms) to re-build the index containing 2025 documents.
Alright, with a few lines of code and some clever annotations, we now have a full-text search index for Wine objects. Let's do some searching!

Basic Search Box
From the first post, we already have a server-side pagination framework in place for displaying Wine objects. To integrate full-text search capabilities, we simply need to implement the org.vinwiki.action.PagedDataFetcher interface in terms of Hibernate Search. Here is a simple implementation (we'll add more features later):
Search.java source listing
This is sufficient to get us searching quickly.

Next, we need to pass the user's query from the search box to our nav component. In view/WEB-INF/facelets/searchControls.xhtml, update the JSF inputText tag to bind the user's query to an Event-scoped component named searchCriteria:

<h:inputText id="basicSearchQuery" styleClass="searchField" value="#{searchCriteria.basicSearchQuery}"/>
The searchCriteria component is in-jected into nav, which passes it on to an instance of org.vinwiki.search.Search which implements the PagedDataFetcher interface. You may be thinking that having a separate component for a single input field is over-kill, but the searchCriteria component will also come in very handy once we add an advanced search form. Here is what happens in nav:
@In(required=false) private SearchCriteria searchCriteria;
    ...
    public void doSearch() {
        cleanup();
        dataFetcher = new FeatureRichSearch(log, searchCriteria);
        initFirstPageOfItems(); // fetches the first page of Items
    }

Execute Search on Enter
There is the obligatory Search button next to my search input field, but most user's will just hit enter to execute the search. RichFaces makes this trivial to support using the <rich:hotKey> tag:
<rich:hotKey key="return" selector="#basicSearchQuery" 
          handler="#{rich:element('searchBtn')}.onclick();return false;"/>
The <rich:hotKey> tag binds a JavaScript event listener for the search box to submit the form when the user hits "enter". Be sure to use the selector attribute to limit the listener to the search box and not all input text boxes on the page! Any valid jQuery selector will do ...

At this point, re-compile and re-deploy these changes (at the command-line: seam clean reexplode). A search for "zinfandel" results in:
Search results for zinfandel
Hopefully, you get something similar in your environment ;-) Yes, those images are terribly ugly right now ... They were pulled dynamically from Freebase; if this were a real production application, then I'd work on getting better images. Next, we'll add some more features to the search engine, such as query term highlighting, spell correction, more like this, and filters. These additional features are implemented in the org.vinwiki.search.FeatureRichSearch class.

 

Highlighting Query Terms in Results
Highlighting query terms in results is a common and very useful feature of most search engines. The contrib/highlighter module in Lucene makes this feature easy to implement. There are two aspects to consider when displaying results to the user. First, we want to display the best "fragment" from the document text for the specific query. In other words, rather than just showing the first X characters of the description, we should show the best section of the description matching the user's query. Second, highlight the query terms in the chosen fragment. However, most of the description text for our Wine objects is short enough to display the full description for each result. Based on my current UI layout, 390 is a safe maximum length for fragments as it is long enough to provide useful information about each wine, yet short enough to keep from cluttering up the screen with text. This is something you'll have to work out for your application.

Highlighting Fragments with Hibernate Search
Recall that we stuff all the searchable text information about a Wine into a single searchable field "content". While good for searching, it's probably not something you want to display to your users directly. Instead, I've chosen to highlight terms in the wine description only. Here is how to construct a Highlighter (from the Lucene org.apache.lucene.search.highlight package):
// Pass the Lucene Query to a Scorer that extracts matching spans for the query
    // and then uses these spans to score each fragment    
    QueryScorer scorer = new QueryScorer(luceneQuery, Wine.DEFAULT_SEARCH_FIELD);
    // Highlight using a CSS style    
    SimpleHTMLFormatter formatter = new SimpleHTMLFormatter("<span class='termHL'>", "</span>");
    Highlighter highlighter = new Highlighter(formatter, scorer);
    highlighter.setTextFragmenter(new SimpleSpanFragmenter(scorer, Nav.MAX_FRAGMENT_LEN));
Also notice that I get a reference to the "wine_stem_en" Analzyer using:
Analyzer analyzer = searchFactory.getAnalyzer("wine_stem_en");
While iterating over the results, we pass the Analyzer and the actual description text to the Highlighter. The observant reader will notice that I didn't "stem" the description text during indexing, but now I am stemming the description text for highlighting. You'll see why I'm taking this approach in the next section when I add spell correction.
highlightedText = highlighter.getBestFragment(analyzer, Wine.SPELL_CHECK_SEARCH_FIELD, description);
FeatureRichSearch saves the dynamically highlighted fragments in a Map because fragments are specific to each query. If the query changes, then so must the fragments for the results. The getResultDisplayText method on the nav component interfaces with the FeatureRichSearch dataFetcher to get fragments for search results.

 

Spell Correction
For spell correction, I'm using the contrib/spellchecker module in Lucene (see http://wiki.apache.org/lucene-java/SpellChecker), which is a good place to start, but you should realize that handling spelling mistakes in search is a complex problem so this is by no means the "best" solution.

Spell Correction Dictionary
To begin, you need a dictionary of correctly spelled terms from which to base spell corrections on. The spellchecker module builds a supplementary index from the terms in your main index using fields based on n-grams of each term. Internally, SpellChecker creates several n-gram fields for each word depending on word length. Here is a screenshot of the word "zinfandel" in the SpellChecker index courtesy of Luke:
SpellChecker Index in Luke
Recall that we're stemming terms in our default search field "content". If you build the spell checker index from this field, the dictionary will only contain stemmed terms. This has un-desired side-effect that the corrected term will look misspelled to the user. For example, if you search for "strawbarry", the spell checker will probably recommend "strawberri", which is good, but we really want to show the user "Did you mean 'strawberry'?". Thus, we need to base our spell checker index off non-stemmed text. When we ask the spell checker to suggest a term for "strawbarry", then it will return "strawberry". When we query the search index, we need to query for "strawberri", which is why I pass the "wine_stem_en" Analyzer to the QueryParser after applying the spell correction process.

Hibernate Search Automatic Indexing and Spell Correction
Lucene's SpellChecker builds a supplementary index from the terms in your main index. If your main index changes, e.g. after adding a new entity, then you need to update the spell correction index. From discussing this within the HS community, there's nothing built into HS to help you determine when to update the spell checker index, especially if you're using hibernate.search.worker.execution=async. In other words, you don't know when Hibernate Search is finished updating the Lucene index. You have a couple of options to consider here: 1) update the spell index incrementally as new content is added (or updated), or 2) update the index periodically in a background job. This depends on your requirements and how much you trust the source of the changes to the main index. For now, I'm using Seam Events to incrementally update the spell index after a new Wine is added or an existing Wine is updated (see src/hot/org/vinwiki/search/IndexHelper.java for details).

Edit Distance as Measure of Similarity Between Terms
Lucene's SpellChecker uses the notion of "edit distance" to measure the similarity between terms. When you call spellChecker.suggestSimilar(wordToRespell, ...), the checker consults an instance of org.apache.lucene.search.spell.StringDistance to score hits from the spell checker index. Here's how a SpellChecker is constructed:
Directory dir = FSDirectory.open(new java.io.File("lucene_index/spellcheck"));
    SpellChecker spell = new SpellChecker(dir);
    spell.setStringDistance(new LevensteinDistance());
    spell.setAccuracy(0.8f);
    String[] suggestions = spell.suggestSimilar(wordToRespell, 10,
                             indexReader, Wine.SPELL_CHECK_SEARCH_FIELD, true);
The LevensteinDistance class implements StringDistance by calculating the minimum number of edits needed to transform one string into the other, with the allowable edit operations being insertion, deletion, or substitution of a single character.

Spell Correction Heuristics
The spellchecker module does a fine job of suggesting possible terms, but we still need to decide how to handle these suggestions depending on the structure of the user's query. To keep things simple, our first heuristic applies to single term queries where the mis-spelled term does not exist in our spell checker index. In this case, we simply re-issue the search using the best suggestion from the spell checker. Here is a screen shot of how we present the results to the user:
Spell correction for single term
However, we can't be sure that a term is mis-spelled if it exists in the spell checker index. Thus, if a single mis-spelled term exists in the spell correction index, then you need to decide if you are going to just "OR" the term mis-spelled and suggested terms together or let the user decide (it seems like Google doesn't always do one or the other, so their implementation is a bit more advanced than the one I'll provide here).

In the screenshot above, the user queried for "velvety tanins"; both terms exist in the spell correction index, but "tanins" is mis-spelled. During query processing, the SpellChecker suggested "tannins" for the mis-spelled word "tanins", which is correct. Thus, we search for "velvety tanins", but also suggest a spell-corrected query "velvety tannins", allowing the user to click on the suggested correction to see better results (hopefully). Please refer to the checkSpelling method in the src/hot/org/vinwiki/search/FeatureRichSearch.java for details on how these simple heuristics are implemented.

 

Type-ahead Search Suggestions
Most users appreciate when an application spares them un-necessary effort, such as typing common phrases into the search box. Thus, we'll use RichFaces <rich:suggestionbox> tag to provide a drop-down suggestion list of known search terms after the user types 3 or more characters. While trivial, this feature can help users be more productive with your search interface and helps minimize spelling mistakes.
Type-ahead Suggestion List
In /WEB-INF/facelets/searchControls.xhtml, we attach the <rich:suggestionbox> to the search input text field using:
<rich:suggestionbox id="suggestionBox" for="basicSearchQuery" ignoreDupResponses="true"
                    immediate="true" limitToList="true" height="100" width="250" minChars="3"
                    usingSuggestObjects="true" suggestionAction="#{nav.suggestSearchTerms}"
                    selfRendered="true" first="0" var="suggestion">
  <h:column><h:outputText value="#{suggestion}"/></h:column>
</rich:suggestionbox>
In the first posting, I discussed queue and traffic flood protection using RichFace's Global Default Queue. You should definitely active an AJAX request queue if you are using type-ahead suggestions. I've also added an animated GIF and loading message using the <a4j:status> tag.

On the Java side, I've resorted to a simple LIKE query to the database, but you can also query a field analyzed with Lucene's org.apache.lucene.analysis.ngram.EdgeNGramTokenFilter in your full-text index.

 

Advanced Search
Sometimes a single search field isn't enough to pin-point the information you're looking for; advanced search addresses this, albeit uncommon, need for users to formalize complex queries. The advanced search form is application specific but most allow the user construct a query composed of AND, OR, exact phrase, and NOT clauses, as shown in the following screen shot:
Advanced Search Form
As mentioned above, the org.vinwiki.search.SearchCriteria class manages the advanced search form data. I'll refer you to the source code for further details about advanced search. Notice that I'm using a RangeQuery to implement the Date field on the search form.

Testing Search
Be careful when building tests involving Lucene because lib/test/thirdparty-all.jar contains an older version of Lucene. To remedy this, I added the following list of JARs to the top of the test.path path in build.xml:
<fileset dir="${lib.dir}">
    <include name="lucene-core-2.9.2.jar"/>
    <include name="lucene-highlighter-2.9.2.jar"/>
    <include name="lucene-misc-2.9.2.jar"/>
    <include name="lucene-snowball-2.9.2.jar"/>
    <include name="lucene-spatial-2.9.2.jar"/>
    <include name="lucene-spellchecker-2.9.2.jar"/>
    <include name="lucene-memory-2.9.2.jar"/>
    <include name="solr-core-1.4.0.jar"/>
    <include name="solr-solrj-1.4.0.jar"/>
  </fileset>
A new test case was added for testing search, see src/test/org/vinwiki/test/SearchTest.java. While fairly trivial, this class helped me root out a few issues (like updating the spell correction index after an update) while developing the code for this post. So test-driven development shows its worth once again!

Future Enhancements to the Search Engine
There are still a few search-related features I think this application needs, including tagging, phrase handling, and synonyms. Specifically, user's should be able to add tags like "jammy" or "flabby" when rating wines. The application should be able to render a tag cloud from these user-supplied tags as another form of navigation. User tags should also be fed into the search index (using caution of course). Phrase detection complements user-supplied tagging by recognizing multi-term tags during text analysis. How you handle stop words also affects exact phrase matching. The contrib/shingles module helps speed up phrase searches involving common terms, so I'd definitely like to investigate its applicability for this application. Lastly, synonyms help supply the search index (and/or search queries) with additional terms that mean the same thing as other words in your documents. If time allows, I'll try to add a fifth post dealing with tags, phrases, and synonyms. I'd also like to hear from the community on other features that might be helpful.

TODO: Evaluating Search Quality with contrib/benchmark
The contrib/benchmark module is one extension all Lucene developers should be familiar with; benchmark allows you to run repeatable performance tests against your index. I'll refer you to the JavaDoc for using benchmark for performance testing. In the future, I'd like to use benchmark's quality package to measure relevance of search results. The classes in the quality package allow us to measure precision and recall for our search engine. Precision is a measure of "exactness" that tells us the proportion of the top N results that are relevant to the search. Recall is a measure of "completeness" that tells us the proportion of all relevant documents included in the top results. In other words, if there are 100 relevant documents for a query and the results return 80, then recall is 0.8. Sometimes, the two metrics conflict with each other in that as you return more results (to increase recall), you can introduce irrelevant documents, which decreases precision. The quality package will give us a way to benchmark precision and recall and then tune Lucene to improve these as much as possible. If I get time, then I'll post my results (and source).

A last words about scalability
While Lucene is very fast, you can run into search performance issues if you are constantly updating the index, which requires Hibernate Search to close and re-open its shared IndexReader. Hibernate Search has built-in support for a Master / Slave architecture where updates occur on a single master server and searches are executed on replicated, read-only slave indexes in your cluster. I use this configuration in my day job and it scales well. However, you'll need a JMS queue and Message-driven EJB to allow slave nodes to send updates to the master node, so you'll have to deploy your application as an EAR instead of a WAR (to deploy the MDB). Please refer to the online documentation provided with Hibernate Search for a good discussion on how to setup a master / slave cluster.

What's Next?
In the next post, I'll add authentication and registration using Facebook Connect.