Home > Hadoop, java, linux > TF-IDF in Hadoop Part 3: Documents in Corpus and TFIDF Computation

TF-IDF in Hadoop Part 3: Documents in Corpus and TFIDF Computation

The previous 2 parts of this post did the small part of the job for calculating the TF-IDF for each “term” in different documents in “corpus”. Since the implementation depends on concepts of Information Retrieval, specially for starters in Information Retrieval, take a look at the book Christopher D. ManningPrabhakar Raghavan and Hinrich SchützeIntroduction to Information Retrieval, Cambridge University Press. 2008. The authors are professors at Stanford and Stuttgart Universities, have different exercises in the subject, and I found out good resources in Chapter 7, showing the basic concepts of the TF-IDF algorithm. As I mentioned before, I had first read the term 7 years ago when I was writing my BS in Computer Science degree report (Portuguese) for the problem of user profile matching and clustering. Interestingly enough, I started learning hadoop about 2 weeks ago and I was stoked about it, because my first contact with MapReduce was actually using mongoDB during my MS in Computer Science thesis report when I needed to generate a report over a data-centric collection of data collected from an environmental Sensor Network using the mongoDB’s MapReduce API over distributed mongoDB Shards. All in all, it seems the time to put this in practice is now :).

Job 3: Documents in Corpus and TF-IDF computation

In order to summarize the idea of scoring words based on its occurrence in corpus, I will use graphical and textual examples of the algorithm to take advantage of this post and make it clear what the exercises really are. Ricky Ho has implemented TF-IDF using Apache PIG and documented exactly the steps described by the algorithm in a very nice diagram shown below. Ricky also made a good summary about the terms “term frequency” and “inverse document frequency”, so check his website out in case you need.

The TF-IDF MapReduce Phases by Ricky Ho

This post implements the third round of the implementation, where the count of words are done by counting the size of the array that brings all the documents for each of the words, taking into consideration the output of the previous phase. Let’s take a look at the data format from the previous job and see if it matches the description of the diagram presented (note that the order of the terms are not sorted. I selected the term “therefore” at random):

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 2-word-counts/part-r-00000 | less
therefore@all-shakespeare       652/738781
therefore@leornardo-davinci-all.txt     124/149612
therefore@the-outline-of-science-vol1.txt       36/70650

The output shows each term at each document, and the number of its occurrence on the given document, accompanied by the total number of terms in the document. So, the final Mapper and Reducer were defined as follows:

  • Map:
    • Input: ((term@document), n/N)
    • Re-arrange the mapper to have the word as the key, since we need to count the number of documents where it occurs
    • Output: (term, document=n/N)
  • Reducer:
    • D = total number of documents in corpus. This can be passed by the driver as a constant;
    • d = number of documents in corpus where the term appears. It is a counter over the reduced values for each term;
    • TFIDF = n/N * log(D/d);
    • Output: ((word@document), d/D, (n/N), TFIDF)

This post shows the implementation of the third step, which counts the number of documents in which a “term” appears in each document in corpus and calculates the TF-IDF. I have made some assumptions for the final output to better present the results and of course to deal with the scope of the example.

  • The first problem was to maintain this step as the last one for the completion of the exercise. In order to do so, the calculation of the number of documents in corpus could be made by another MapReduce phase as described in the Cloudera’s documentation. However, they concluded the class slides by mentioning that this last phase could be done without such additional phase. I remembered in the classes that you use the JobConf for the purpose of parameters passed to the jobs. So, I used the FileSystem class to count the number of documents 😀 in the original input directory, since that is a constant number. I tried using the Context/Configuration classes of the Hadoop 0.20.1 API to pass that number to the last Reducer, but the get(key) returns null. So, the only way I could pass the number of documents was using the JobName 🙂 I know, it is a dirty hack, but it works;
  • Since the number of documents in corpus is small, the chances that a word appears in all documents are higher than applying the algorithm for web indexing on thousands or millions of documents. Therefore, the term log(totalDocs/docsPerWord) can result in “nulling” the result (log(3/3)=0) , so I simplified the calculation by using tfIdf = tf, since the log function results in 100% of occurrence in all documents in corpus (you could implement it as tfIdf=tf * 1 as well);
  • I decided to add more information to the output just in purpose of documentation. It shows [word@document, documentsFrequency/documentsCorpus, wordFrequency/totalWordsInDocument, TF-IDF];
  • The final result is formatted to a smaller to have only a few decimal points for purposes of displaying the values in this exercise. Therefore, in production these values matter.

Job3, Mapper

package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 * WordsInCorpusTFIDFMapper implements the Job 3 specification for the TF-IDF algorithm
 * @author Marcello de Sales (marcello.desales@gmail.com)
public class WordsInCorpusTFIDFMapper extends Mapper<LongWritable, Text, Text, Text> {

    public WordsInCorpusTFIDFMapper() {

     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param output has the method "collect()" to output the key,value pair
     * @param reporter allows us to retrieve some information about the job (like the current filename)
     *     PRE-CONDITION: marcello@book.txt  \t  3/1500
     *     POST-CONDITION: marcello, book.txt=3/1500
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] wordAndCounters = value.toString().split("\t");
        String[] wordAndDoc = wordAndCounters[0].split("@");                 //3/1500
        context.write(new Text(wordAndDoc[0]), new Text(wordAndDoc[1] + "=" + wordAndCounters[1]));
Job3, Reducer
package index;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

 * WordsInCorpusTFIDFReducer calculates the number of documents in corpus that a given key occurs and the TF-IDF computation.
 * The total number of D is acquired from the job name 🙂 It is a dirty hack, but the only way I could communicate the number from
 * the driver.
 * @author Marcello de Sales (marcello.desales@gmail.com)
public class WordsInCorpusTFIDFReducer extends Reducer<Text, Text, Text, Text> {

    private static final DecimalFormat DF = new DecimalFormat("###.########");

    public WordsInCorpusTFIDFReducer() {

     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *             PRECONDITION: receive a list of <word, ["doc1=n1/N1", "doc2=n2/N2"]>
     *             POSTCONDITION: <"word@doc1,  [d/D, n/N, TF-IDF]">
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // get the number of documents indirectly from the file-system (stored in the job name on purpose)
        int numberOfDocumentsInCorpus = Integer.parseInt(context.getJobName());
        // total frequency of this word
        int numberOfDocumentsInCorpusWhereKeyAppears = 0;
        Map<String, String> tempFrequencies = new HashMap<String, String>();
        for (Text val : values) {
            String[] documentAndFrequencies = val.toString().split("=");
            tempFrequencies.put(documentAndFrequencies[0], documentAndFrequencies[1]);
        for (String document : tempFrequencies.keySet()) {
            String[] wordFrequenceAndTotalWords = tempFrequencies.get(document).split("/");

            //Term frequency is the quocient of the number of terms in document and the total number of terms in doc
            double tf = Double.valueOf(Double.valueOf(wordFrequenceAndTotalWords[0])
                    / Double.valueOf(wordFrequenceAndTotalWords[1]));

            //interse document frequency quocient between the number of docs in corpus and number of docs the term appears
            double idf = (double) numberOfDocumentsInCorpus / (double) numberOfDocumentsInCorpusWhereKeyAppears;

            //given that log(10) = 0, just consider the term frequency in documents
            double tfIdf = numberOfDocumentsInCorpus == numberOfDocumentsInCorpusWhereKeyAppears ?
                    tf : tf * Math.log10(idf);

            context.write(new Text(key + "@" + document), new Text("[" + numberOfDocumentsInCorpusWhereKeyAppears + "/"
                    + numberOfDocumentsInCorpus + " , " + wordFrequenceAndTotalWords[0] + "/"
                    + wordFrequenceAndTotalWords[1] + " , " + DF.format(tfIdf) + "]"));

I have implemented the TestCases for both the Mapper and Reducer classes, but for simplification of this post, I will skip those. Let’s take a look a the driver written, since it captures the total number of documents directly from the filesystem using the buil-in Hadoop API. Definitely no need for another MapReduce phase for that. As described in the Cloudera’s training, the less the better since we are saving resources utilization :). Anyway, let’s go to the Driver implementation.

Job3, Driver
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

 * WordFrequenceInDocument Creates the index of the words in documents,
 * mapping each of them to their frequency.
 * @author Marcello de Sales (marcello.desales@gmail.com)
 * @version "Hadoop 0.20.1"
public class WordsInCorpusTFIDF extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "3-tf-idf";

    // where to read the data from.
    private static final String INPUT_PATH = "2-word-counts";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Word in Corpus, TF-IDF");



        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //Getting the number of documents from the original input directory.
        Path inputPath = new Path("input");
        FileSystem fs = inputPath.getFileSystem(conf);
        FileStatus[] stat = fs.listStatus(inputPath);

        //Dirty hack to pass the total number of documents as the job name.
        //The call to context.getConfiguration.get("docsInCorpus") returns null when I tried to pass
        //conf.set("docsInCorpus", String.valueOf(stat.length)) Or even
        //conf.setInt("docsInCorpus", stat.length)

        return job.waitForCompletion(true) ? 0 : 1;

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordsInCorpusTFIDF(), args);

Continuing with the implementation, the only thing to do is to compile and run the final driver. Note that the input directory is the one containing the partial counts of documents of job 2, that is, “2-word-counts”. The output is the directory reserved fro step 3, or “3-tf-idf”. Then, the only way that I could send the total number in corpus was using the jobName. The Hadoop 0.20.1 API does not pass the values of the configuration at any cost. As documented, I tried using the context reference in the reducer class, but the reference only returned “null” for the call “context.getConfiguration().get(“docsInCorpus”)”. I gave up and looked for an option, and the JobName was the only way I could :).

I skipped the test session and compiled everything and ran the driver as follows:

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

    [javac] Compiling 11 source files to /home/training/git/exercises/shakespeare/bin

      [jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar


Then, finally running the calculator of words:

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordsInCorpusTFIDF
10/01/09 21:41:40 INFO input.FileInputFormat: Total input paths to process : 1
10/01/09 21:41:41 INFO mapred.JobClient: Running job: job_200912301017_0115
10/01/09 21:41:42 INFO mapred.JobClient:  map 0% reduce 0%
10/01/09 21:41:51 INFO mapred.JobClient:  map 100% reduce 0%
10/01/09 21:42:00 INFO mapred.JobClient:  map 100% reduce 100%
10/01/09 21:42:02 INFO mapred.JobClient: Job complete: job_200912301017_0115
10/01/09 21:42:02 INFO mapred.JobClient: Counters: 17
10/01/09 21:42:02 INFO mapred.JobClient:   Job Counters
10/01/09 21:42:02 INFO mapred.JobClient:     Launched reduce tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Launched map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Data-local map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:   FileSystemCounters
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_READ=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_READ=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4036022
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2943390
10/01/09 21:42:02 INFO mapred.JobClient:   Map-Reduce Framework
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input groups=0
10/01/09 21:42:02 INFO mapred.JobClient:     Combine output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map input records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce shuffle bytes=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Spilled Records=97558
10/01/09 21:42:02 INFO mapred.JobClient:     Map output bytes=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     Combine input records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map output records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input records=48779

Note that the final number of values are the same as the previous Job step. So, everything went ok as expected. Taking a look and the file in the output directory:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls 3-tf-idf
Found 2 items
drwxr-xr-x   - training supergroup          0 2010-01-09 21:41 /user/training/3-tf-idf/_logs
-rw-r--r--   1 training supergroup    2943390 2010-01-09 21:41 /user/training/3-tf-idf/part-r-00000

I decided to take a look at the same word I mentioned above “therefore”. Here’s the result for them, this time the output was automatically sorted by Hadoop.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 3-tf-idf/part-r-00000 | less
abook@leornardo-davinci-all.txt [1/3 , 3/149612 , 0.00000957]
aboriginal@the-outline-of-science-vol1.txt      [1/3 , 1/70650 , 0.00000675]
abortive@all-shakespeare        [2/3 , 4/738781 , 0.00000095]
therefore@all-shakespeare       [3/3 , 652/738781 , 0.00088253]
therefore@the-outline-of-science-vol1.txt       [3/3 , 36/70650 , 0.00050955]
therefore@leornardo-davinci-all.txt     [3/3 , 124/149612 , 0.00082881]

Taking a look at chapter 7 of the book, it is clear to make the following conclusions about the output presented:

  • The term “therefore” is more relevant in the document “all-shakespeare”, since its occurrence is more likely to happen than in the other documents;
  • Other terms that does not appear in all documents such as “abook”, “aboriginal” and “abortive” have very small relevance for the given corpus of documents.

What’s Next?

I had so much fun with my first contact with Hadoop that I am going to the Cloudera Training here in the Bay Area in about 10 days. Although the training covers the Hadoop 0.18 API, I decided to use the Hadoop 0.20.1 API because I just wanted to try a “cleaner” API.

As for next steps from this exercise, one could do the categorized classification of the terms per document in a data-centric way (document -> term -> tf-idf), or whatever your need is. Time to go play with Apache PIG and HBase.

  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: