Archive

Posts Tagged ‘linux’

mongoDB Shards, Cluster and MapReduce: experiments for Sensor Networks

This documents the use of mongoDB as the persistence layer for the collected data from NetBEAMS. It is divided into sections of setup and the CRUD (Create, Retrieve, Update, Delete) operations, as well as advanced topics such as data replication, the use of MapReduce, etc. This document is a copy of the experiments performed for my Masters Thesis Report entitled “A Key-Value-Based Persistence Layer for Sensor Networks“. The original wiki documentation can be found at MongoDBShardsClusterAndMapReduce.

The setup of the mongoDB shards must be performed on each cluster node. First, the relevant processes are started, and then the cluster must be configured with each of the shards, as well as indexes of the collections to be used. Before continuing on this section, refer to the following documentation:

In order to start collecting data, the mongoDB’s server must be set up on a single or distributed way. Using the distributed cluster version requires starting the commands on the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ ps aux | grep mongo
marcello  3391  0.0  0.2  67336  3328 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-1/ --port 20001
marcello  3397  0.0  0.2  59140  3280 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-2/ --port 20002
marcello  3402  0.0  0.2  59140  3276 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-3/ --port 20003
marcello  3406  0.0  0.3 157452  3980 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/config --port 10000
marcello  3431  0.4  0.2  62004  3332 pts/1    Sl   12:38   0:35 mongos -vvv --configdb localhost:10000
marcello  3432  0.0  0.0   5196   704 pts/1    S    12:38   0:00 tee logs/mongos-cluster-head.log
In summary, these processes are defined as follows:
  • Shards Node: each shard process “mongod” is responsible for managing its own “chunks” of data on a given “dbpath” directory, on a given port number. These processes are used by the cluster head “mongos”;
  • Cluster Metadata Server Node: the main metadata server of the cluster can be located on a local or foreign host. This listing above shows the metadata server “config” located in the same server, managed by the “mongod” process. It carries information about the databases, the list of shards, and the list of “chunks” of each database, including the location “Ip_address:port” of them;
  • Cluster Head Server: the orchestration of the cluster is performed by the “mongos” process. It connects to the cluster head to select which shard to be used, statistics about counters, etc. This is the main process that accepts the client requests.

Make sure to proxy the output of the processes to log files. As shown in the Listing above, the process “tee” is capturing the output for the process “mongos”. mongoDB’s process has additional parameters for that matter as well.

Considering that the proper processes are running, specially the metadata server and the main cluster head, the client process can be started to issue the commands to enable shards on a given database system. Since mongoDB client’s interface uses Javascript as the main programming language abstraction to manipulate data, a script can be used to automate the process of setting up the server. Before continuing, make sure you have covered the mongoDB’s documentation on how to setup database shards:

First, connect to the server using the client process “mongo”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo
MongoDB shell version: 1.2.0
url: test
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

After connected to the server through the client, get references to 2 important databases: the “admin” and “config”. The “admin” is a database system responsible for running commands to the cluster server, while the “config” is the reference to the metadata server. The following listing shows the use of the method “db.getSisterDB()” to retrieve those references:

> admin = db.getSisterDB("admin")
admin
> config = db.getSisterDB("config")
config
> 

Once the references are available, the use of the names as shortcuts makes the access better. Let’s add each shards that are running on the local and on the foreign servers (192.168.1.2) on different communication ports. It is important to note that the issued commands are executed on the metadata server “config”.

> admin.runCommand( { addshard: "192.168.1.2:20001" } )
Sun Dec 20 16:04:02 Request::process ns: admin.$cmd msg id:-2097268492 attempt: 0
Sun Dec 20 16:04:02 single query: admin.$cmd  { addshard: "192.168.1.2:20001" }  ntoreturn: -1

> admin.runCommand( { addshard: "192.168.1.2:20002" } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268491 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "192.168.1.2:20002" }  ntoreturn: -1

> admin.runCommand( { addshard: "localhost:20001", allowLocal: true } )
> 

In order to be added into the list, a shard server must be running. In case the shard is down at this point, it will be not added into the list of available shards. On the other hand, if it is added and it goes down, the mongos keeps sending heartbeat to verify if the shard has come back. Anyway, use the command “listshards” to list the existing shards that the cluster head can use.

> admin.runCommand( { listshards:1 } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268490 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "localhost:20001", allowLocal: true }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268489 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { listshards: 1.0 }  ntoreturn: -1
{
        "shards" : [
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ea"),
                        "host" : "192.168.1.2:20001"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6eb"),
                        "host" : "192.168.1.2:20002"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ec"),
                        "host" : "localhost:20001"
                }
        ],
        "ok" : 1
}
> 

Enabling the shards means to give the metadata server “config” the name of the database to be sharded, as well as the definition of the shard keys. The function “enablesharding” receives the name of the database system. The following listing shows the database “netbeams” being enabled. Later, the definition of the shard key must be given, as the key “observation.pH” is defined as the shard key:

> admin.runCommand({enablesharding:"netbeams"})
{"ok" : 1}
admin.runCommand( { shardcollection: "netbeams.SondeDataContainer", key: { "observation.pH" : 1} } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268488 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { enablesharding: "netbeams" }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268487 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { shardcollection: "netbeams.SondeDataContainer", key: { observation.pH: 1.0 } }  ntoreturn: -1
{"collectionsharded" : "netbeams.SondeDataContainer" , "ok" : 1}
> 

The chunks show the different sections of the data. By using the reference to the metadata database server, list the different shards “config.chunks.find()” to list the documents.

> config.chunks.find()
{ "lastmod" : { "t" : 1261341503000, "i" : 1 }, "ns" : "netbeams.SondeDataContainer", "min" : { "observation" : { "pH" : { $minKey : 1 } } },
"minDotted" : { "observation.pH" : { $minKey : 1 } }, "max" : { "observation" : { "pH" : { $maxKey : 1 } } }, "maxDotted" : { "observation.pH" : { $maxKey : 1 } },
"shard" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec9") }
> 

The next step is to create the indexes of the expected keys. This procedure can be defined after the documents are inserted. In general, defining indexes slows down on “Create” operations, but speeds up “Retrieval” ones. In order to proceed, make sure you have covered the documentation on mongoDB’s Indexes.

  • mongoDB Indexes: this is the documentation regarding indexes of keys on mongoDB.

Note, in the following Listing, that the keys are written to the metadata server “config”. A reference to the database “netbeams” is acquired by using the function “db.getSisterDB()” as it was used for the databases “config” and “admin”. The method “db.collection.ensureIndex()” is used.

> netbeams = db.getSisterDB("netbeams")
netbeams
> netbeams.SondeDataContainer.ensureIndex( { "message_id":1 } )
Sun Dec 20 16:04:03 Request::process ns: netbeams.system.indexes msg id:-2097268486 attempt: 0
Sun Dec 20 16:04:03  .system.indexes write for: netbeams.system.indexes
Sun Dec 20 16:04:03 Request::process ns: netbeams.$cmd msg id:-2097268485 attempt: 0
Sun Dec 20 16:04:03 single query: netbeams.$cmd  { getlasterror: 1.0 }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: test.$cmd msg id:-2097268484 attempt: 0
Sun Dec 20 16:04:03 single query: test.$cmd  { getlasterror: 1.0 }  ntoreturn: -1

netbeams.SondeDataContainer.ensureIndex( { "sensor.ip_address":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.latitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.longitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.valid":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.transaction":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.WaterTemperature":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.SpecificConductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Conductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Resistivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Salinity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Pressure":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Depth":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pH":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pHmV":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Turbidity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODOSaturation":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODO":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Battery":1 } )

Actually, you can verify the setup performed by accessing each of the collections of the config server. Using a client to access the server in a different shell, you can directly access and modify (NOT RECOMMENDED) the settings of the metadata server, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo config
MongoDB shell version: 1.2.0
url: config
connecting to: config
type "help" for help
> Sun Dec 20 16:31:57 connection accepted from 127.0.0.1:48589 #7
show collections
Sun Dec 20 16:32:01 Request::process ns: config.system.namespaces msg id:-128400130 attempt: 0
Sun Dec 20 16:32:01 single query: config.system.namespaces  { query: {}, orderby: { name: 1.0 } }  ntoreturn: 0
chunks
databases
shards
system.indexes
version

So the method “find()” can be used to list the contents of each of the collections. An example is to list the databases configured, showing the properties of each of them (partitioned or not, server host, etc), as shown in the following listing.

> db.databases.find()
Sun Dec 20 16:47:48 Request::process ns: config.databases msg id:-128400129 attempt: 0
Sun Dec 20 16:47:48 single query: config.databases  {}  ntoreturn: 0
{ "name" : "admin", "partitioned" : false, "primary" : "localhost:10000", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec7") }
{ "name" : "netbeams", "partitioned" : true, "primary" : "192.168.1.2:20002",
                  "sharded" : { "netbeams.SondeDataContainer" : { "key" : { "observation" : { "pH" : 1 } }, "unique" : false } },
                  "_id" : ObjectId("4b2e8b3fb342bcd910b62ec8") }
{ "name" : "test", "partitioned" : false, "primary" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62eca") }

Before proceeding, make sure you have covered the basics of mongoDB use:

Using the mongoDB client process “mongo”, access a given server “mongos” or “mongod”. The client access to “mongos” process executes the commands in the context of the entire cluster through the use of the metadata server “config”, while the “mongod” is used to access a given shard server, if necessary for debug processes. Use the commands specifying the server location and which database to use. The following listing shows the command to access a given shard on a given port, using the database “netbeams”.

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo 192.168.1.2:20001/netbeams
MongoDB shell version: 1.2.0
url: 192.168.1.2:20001/netbeams
connecting to: 192.168.1.2:20001/netbeams
type "help" for help

In order to verify the stats of a collection, use the function “collection.stats()”. This function verifies the counters stored in the metadata server.

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The access of a given document is randomly chosen from one of the shards by using the function “collection.findOne()”. It is a way to verify one example of the collected data.

> db.SondeDataContainer.findOne()
Sun Dec 20 14:59:08 Request::process ns: netbeams.SondeDataContainer msg id:-1701410103 attempt: 0
Sun Dec 20 14:59:08 shard query: netbeams.SondeDataContainer  {}
Sun Dec 20 14:59:08  have to set shard version for conn: 0x2909de0 ns:netbeams.SondeDataContainer my last seq: 0  current: 4
Sun Dec 20 14:59:08     setShardVersion  192.168.1.2:20002  netbeams.SondeDataContainer  { setShardVersion: "netbeams.SondeDataContainer",
configdb: "localhost:10000", version: Timestamp 1261341503000|1, serverID: ObjId(4b2e8b3eb342bcd910b62ec6) } 0x2909de0
Sun Dec 20 14:59:08       setShardVersion success!
{
        "_id" : ObjectId("e26f40072f68234b6af3d600"),
        "message_id" : "b405e634-fd4b-450c-9466-82dc0555ea06",
        "sensor" : {
                "ip_address" : "192.168.0.178",
                "location" : {
                        "latitude" : 37.89155,
                        "longitude" : -122.4464
                }
        },
        "time" : {
                "valid" : "Sun Dec 06 2009 10:18:22 GMT-0800 (PST)",
                "transaction" : "Sat Dec 12 2009 01:52:42 GMT-0800 (PST)"
        },
        "observation" : {
                "WaterTemperature" : 23.45,
                "SpecificConductivity" : 35.4,
                "Conductivity" : 139.6,
                "Resistivity" : 899.07,
                "Salinity" : 0.02,
                "Pressure" : 0.693,
                "Depth" : 2.224,
                "pH" : 6.25,
                "pHmV" : -76,
                "Turbidity" : 0.2,
                "ODOSaturation" : 31.3,
                "ODO" : 54.83,
                "Battery" : 1.1
        }
}

MapReduce?

In order to proceed with this section, make sure you have the necessary background in the programming model “MapReduce?“. The recommended documentation and tutorials are as follows:

  • Introduction to MapReduce: this training video class describes the MapReduce? concepts using Hadoop and the Hadoop Distributed File System, which can be directly related to the mongoDB’s implementation; A Must watching before proceeding;
  • mongoDB’s MapReduce HowTo: this is the main documentation of the MapReduce? implementation and use on mongoDB. This covers the basic and how the functions “map” and “reduce” can be implemented for a given collection of documents.

The first basic example of the use of MapReduce? in distribute system is counting. In my opinion, it is a good example on how to have the counting process spread out into different machines. By using the regular client process “mongo”, access the database “netbeams”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo netbeams
MongoDB shell version: 1.2.0
url: netbeams
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

At this point, you’re connected to the server running in the main host. Refer to the setup process described in the beginning of this documentation for more details. Our goal is to report the number of collected data from different servers given by the IP address of them. In this case, our strategy is to define a map function that emits the value 1 as the counter, and use a reduce function to count the consolidated result after the mongoDB’s MapReduce? engine returns the intermediary results to be reduced.

  • The Map function: The following defines the single map function that defines the key as the IP address of the sensor, and the count as the value. Note that mongoDB’s implementation differs from the Hadoop implementation. It does not include the key as a parameter to the map function, because it uses the concept of “this”, that refers to the collection object being used during the execution.
> m1 = function () {
    emit(this.sensor.ip_address, {count:1});
}
  • The Reduce function: the following defines the single reduce function that receives the consolidated results mapping the given keys (ip addresses) and the counting values found. The function iterates over the values returned and increments the total variable with the value of the variable “count”, which in this case is equals to “1” on each of the elements. The “…” are the spaces returned from the mongoDB client shell”. The result is returned using the key “count”.
> r1 = function (key, values) {
    var total = 0;
    for (var i = 0; i < values.length; i++) {
        total += values[i].count;
    }
    return {count:total};
}

By defining each of the function “map” and “reduce”, you can use the collection function “db.collection.mapReduce”, using the function references as parameters. The following listing shows the execution of the command using the mongoDB’s shell, displaying the definition of each of the “map” and “reduce” functions before the execution:

> res = db.SondeDataContainer.mapReduce(m1, r1);
Sun Dec 20 14:26:02 Request::process ns: netbeams.$cmd msg id:-1701410106 attempt: 0
Sun Dec 20 14:26:02 single query: netbeams.$cmd  { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va... }  ntoreturn: -1

After processing the execution of the function on each of the shards, the cluster head process “mongos” returns the values and consolidates the results. The output is temporarily stored in a collection called “dbres.result?“, saving the values on a separate chunk. The output is shown as follows:

Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 creating new connection for pool to:192.168.1.2:20002
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
{
        "result" : "tmp.mr.mapreduce_1261348395_10",
        "shardCounts" : {
                "192.168.1.2:20002" : {
                        "input" : 2364851,
                        "emit" : 2364851,
                        "output" : 254
                }
        },
        "counts" : {
                "emit" : 2364851,
                "input" : 2364851,
                "output" : 254
        },
        "ok" : 1,
        "timeMillis" : 433282,
        "timing" : {
                "shards" : 433193,
                "final" : 89
        },
        "ok" : 1,
}

As shown in this output, the MapReduce? result returns the number of counts of emit, input, and final output. Since there are 253 definitions of IP address being used on the network IP “192.168.1.254?” (0 – subnet address, 255 – broadcast address). The values are related to the total number of observations inserted during the Create operation. The Retrieve section shows the total number of documents as 2.36 million documents. Again, the output of the function “db.collection.stats()” shows the total number of documents:

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The number of “emits” is the number of total documents visited by the “map” function. The reduced is referred to the output value of the counts. In order to see the result, just access the database reference dbres.result? and use the function “find()” to list the results, as shown in the following listing, showing just 20 items from the result:

> db[res.result].find()                        
Sun Dec 20 14:34:43 Request::process ns: netbeams.tmp.mr.mapreduce_1261348395_10 msg id:-1701410105 attempt: 0
Sun Dec 20 14:34:43 single query: netbeams.tmp.mr.mapreduce_1261348395_10  {}  ntoreturn: 0
Sun Dec 20 14:34:43 creating new connection for pool to:192.168.1.2:20002
{ "_id" : "192.168.0.10", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.100", "value" : { "count" : 9371 } }
{ "_id" : "192.168.0.101", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.102", "value" : { "count" : 9500 } }
{ "_id" : "192.168.0.103", "value" : { "count" : 9363 } }
{ "_id" : "192.168.0.104", "value" : { "count" : 9355 } }
{ "_id" : "192.168.0.105", "value" : { "count" : 9281 } }
{ "_id" : "192.168.0.106", "value" : { "count" : 9320 } }
{ "_id" : "192.168.0.107", "value" : { "count" : 9341 } }
{ "_id" : "192.168.0.108", "value" : { "count" : 9464 } }
{ "_id" : "192.168.0.109", "value" : { "count" : 9285 } }
{ "_id" : "192.168.0.11", "value" : { "count" : 9201 } }
{ "_id" : "192.168.0.110", "value" : { "count" : 9397 } }
{ "_id" : "192.168.0.111", "value" : { "count" : 9258 } }
{ "_id" : "192.168.0.112", "value" : { "count" : 9242 } }
{ "_id" : "192.168.0.113", "value" : { "count" : 9231 } }
{ "_id" : "192.168.0.114", "value" : { "count" : 9446 } }
{ "_id" : "192.168.0.115", "value" : { "count" : 9550 } }
{ "_id" : "192.168.0.116", "value" : { "count" : 9409 } }
{ "_id" : "192.168.0.117", "value" : { "count" : 9256 } }
has more

Note that the final result shows the key “id” being the IP address, as defined during the “map” function, and the result is “value.count”, since “value” is the default output of the MapReduce? engine and “count” was used in the “reduce” function.

Other use cases can be performed. The execution of this map reduce was not fast because of the use of one single shard. MapReduce? is designed to perform related to the proportion of servers available. If the load is distributed in more shards, the execution result is returned in a faster way.

The shard logs reveals the details of the map and reduce operations. The following listing is from the log of the process “mongod” server, showing the instants of creation of the temporary database tables for intermediate results. First, the request is received and both the map and reduce is setup to be executed.

Sun Dec 20 14:26:02 query netbeams.$cmd ntoreturn:1 reslen:179 nscanned:0 { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va..., out: "tmp.mrs.SondeDataContainer_1261347962_5" }  nreturned:1 433257ms
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc

The “map phase” is first executed, and it must be completely executed before the “reduce phase” takes place. In the scenario used to count the number of documents per IP address, it happens in different instants as shown in the following listing. In addition, it shows the process of indexing the intermediate results during the “map phase” and saves the data into the database “netbeams.tmp.mr.mapreduce_1261347962_9_inc”:

                43700/2364851   1%
                96000/2364851   4%
                148300/2364851  6%
                200300/2364851  8%
                250900/2364851  10%
                300600/2364851  12%
                351600/2364851  14%
                403800/2364851  17%
                455800/2364851  19%
                508000/2364851  21%
                560500/2364851  23%
                601100/2364851  25%
                647500/2364851  27%
                699900/2364851  29%
                752300/2364851  31%
                804300/2364851  34%
                856100/2364851  36%
                907900/2364851  38%
                959000/2364851  40%
                1009800/2364851 42%
                1060800/2364851 44%
                1112800/2364851 47%
                1164100/2364851 49%
                1209400/2364851 51%
                1253700/2364851 53%
                1305400/2364851 55%
                1350900/2364851 57%
                1401700/2364851 59%
                1453100/2364851 61%
                1503100/2364851 63%
                1551500/2364851 65%
                1602600/2364851 67%
                1637100/2364851 69%
                1687600/2364851 71%
                1736800/2364851 73%
                1787600/2364851 75%
                1839900/2364851 77%
                1891100/2364851 79%
                1941400/2364851 82%
                1989900/2364851 84%
                2041800/2364851 86%
                2094300/2364851 88%
                2145500/2364851 90%
                2193500/2364851 92%
                2245100/2364851 94%
                2296200/2364851 97%
                2341700/2364851 99%
Sun Dec 20 14:28:24 building new index on { 0: 1 } for netbeams.tmp.mr.mapreduce_1261347962_9_inc...
Sun Dec 20 14:28:24 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9_inc idxNo:0
       { ns: "netbeams.tmp.mr.mapreduce_1261347962_9_inc", key: { 0: 1 }, name: "0_1" }
Sun Dec 20 14:28:40      external sort used : 0 files  in 16 secs
Sun Dec 20 14:28:46 done for 1796343 records 22.486secs
Sun Dec 20 14:28:24 insert netbeams.system.indexes 22486ms
Sun Dec 20 14:28:47 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261347962_9...
Sun Dec 20 14:28:47 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9 idxNo:0
      { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261347962_9", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:28:47 done for 0 records 0.02secs

The execution of the “reduce phase” stars and processes the intermediate results of the “map phase”, saving the final results in the new temporary database “netbeams.tmp.mr.mapreduce_1261348395_10”.

                100/1796343     0%
                200/1796343     0%
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 end connection 192.168.1.10:38231
Sun Dec 20 14:33:15 connection accepted from 192.168.1.10:44062 #15
Sun Dec 20 14:33:15 connection accepted from 192.168.1.2:60641 #16
Sun Dec 20 14:33:15 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261348395_10...
Sun Dec 20 14:33:15 Buildindex netbeams.tmp.mr.mapreduce_1261348395_10 idxNo:0
         { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261348395_10", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:33:15 done for 0 records 0secs
Sun Dec 20 14:33:15  mapreducefinishcommand netbeams.tmp.mr.mapreduce_1261348395_10 253
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 end connection 192.168.1.2:60641
Sun Dec 20 14:33:15 end connection 192.168.1.10:44062
Sun Dec 20 14:34:43 connection accepted from 192.168.1.10:44063 #17

NOTE: If the results are important, make sure to save the temporary results into a new database system, since the results returned by a map-reduce function are purged upon new access of the server through the mongoDB client.

Advertisements

TF-IDF in Hadoop Part 3: Documents in Corpus and TFIDF Computation

The previous 2 parts of this post did the small part of the job for calculating the TF-IDF for each “term” in different documents in “corpus”. Since the implementation depends on concepts of Information Retrieval, specially for starters in Information Retrieval, take a look at the book Christopher D. ManningPrabhakar Raghavan and Hinrich SchützeIntroduction to Information Retrieval, Cambridge University Press. 2008. The authors are professors at Stanford and Stuttgart Universities, have different exercises in the subject, and I found out good resources in Chapter 7, showing the basic concepts of the TF-IDF algorithm. As I mentioned before, I had first read the term 7 years ago when I was writing my BS in Computer Science degree report (Portuguese) for the problem of user profile matching and clustering. Interestingly enough, I started learning hadoop about 2 weeks ago and I was stoked about it, because my first contact with MapReduce was actually using mongoDB during my MS in Computer Science thesis report when I needed to generate a report over a data-centric collection of data collected from an environmental Sensor Network using the mongoDB’s MapReduce API over distributed mongoDB Shards. All in all, it seems the time to put this in practice is now :).

Job 3: Documents in Corpus and TF-IDF computation

In order to summarize the idea of scoring words based on its occurrence in corpus, I will use graphical and textual examples of the algorithm to take advantage of this post and make it clear what the exercises really are. Ricky Ho has implemented TF-IDF using Apache PIG and documented exactly the steps described by the algorithm in a very nice diagram shown below. Ricky also made a good summary about the terms “term frequency” and “inverse document frequency”, so check his website out in case you need.

The TF-IDF MapReduce Phases by Ricky Ho

This post implements the third round of the implementation, where the count of words are done by counting the size of the array that brings all the documents for each of the words, taking into consideration the output of the previous phase. Let’s take a look at the data format from the previous job and see if it matches the description of the diagram presented (note that the order of the terms are not sorted. I selected the term “therefore” at random):

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 2-word-counts/part-r-00000 | less
...
therefore@all-shakespeare       652/738781
therefore@leornardo-davinci-all.txt     124/149612
therefore@the-outline-of-science-vol1.txt       36/70650
...

The output shows each term at each document, and the number of its occurrence on the given document, accompanied by the total number of terms in the document. So, the final Mapper and Reducer were defined as follows:

  • Map:
    • Input: ((term@document), n/N)
    • Re-arrange the mapper to have the word as the key, since we need to count the number of documents where it occurs
    • Output: (term, document=n/N)
  • Reducer:
    • D = total number of documents in corpus. This can be passed by the driver as a constant;
    • d = number of documents in corpus where the term appears. It is a counter over the reduced values for each term;
    • TFIDF = n/N * log(D/d);
    • Output: ((word@document), d/D, (n/N), TFIDF)

This post shows the implementation of the third step, which counts the number of documents in which a “term” appears in each document in corpus and calculates the TF-IDF. I have made some assumptions for the final output to better present the results and of course to deal with the scope of the example.

  • The first problem was to maintain this step as the last one for the completion of the exercise. In order to do so, the calculation of the number of documents in corpus could be made by another MapReduce phase as described in the Cloudera’s documentation. However, they concluded the class slides by mentioning that this last phase could be done without such additional phase. I remembered in the classes that you use the JobConf for the purpose of parameters passed to the jobs. So, I used the FileSystem class to count the number of documents 😀 in the original input directory, since that is a constant number. I tried using the Context/Configuration classes of the Hadoop 0.20.1 API to pass that number to the last Reducer, but the get(key) returns null. So, the only way I could pass the number of documents was using the JobName 🙂 I know, it is a dirty hack, but it works;
  • Since the number of documents in corpus is small, the chances that a word appears in all documents are higher than applying the algorithm for web indexing on thousands or millions of documents. Therefore, the term log(totalDocs/docsPerWord) can result in “nulling” the result (log(3/3)=0) , so I simplified the calculation by using tfIdf = tf, since the log function results in 100% of occurrence in all documents in corpus (you could implement it as tfIdf=tf * 1 as well);
  • I decided to add more information to the output just in purpose of documentation. It shows [word@document, documentsFrequency/documentsCorpus, wordFrequency/totalWordsInDocument, TF-IDF];
  • The final result is formatted to a smaller to have only a few decimal points for purposes of displaying the values in this exercise. Therefore, in production these values matter.

Job3, Mapper

package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * WordsInCorpusTFIDFMapper implements the Job 3 specification for the TF-IDF algorithm
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordsInCorpusTFIDFMapper extends Mapper<LongWritable, Text, Text, Text> {

    public WordsInCorpusTFIDFMapper() {
    }

    /**
     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param output has the method "collect()" to output the key,value pair
     * @param reporter allows us to retrieve some information about the job (like the current filename)
     *
     *     PRE-CONDITION: marcello@book.txt  \t  3/1500
     *     POST-CONDITION: marcello, book.txt=3/1500
     */
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] wordAndCounters = value.toString().split("\t");
        String[] wordAndDoc = wordAndCounters[0].split("@");                 //3/1500
        context.write(new Text(wordAndDoc[0]), new Text(wordAndDoc[1] + "=" + wordAndCounters[1]));
    }
}
Job3, Reducer
package index;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * WordsInCorpusTFIDFReducer calculates the number of documents in corpus that a given key occurs and the TF-IDF computation.
 * The total number of D is acquired from the job name 🙂 It is a dirty hack, but the only way I could communicate the number from
 * the driver.
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordsInCorpusTFIDFReducer extends Reducer<Text, Text, Text, Text> {

    private static final DecimalFormat DF = new DecimalFormat("###.########");

    public WordsInCorpusTFIDFReducer() {
    }

    /**
     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *
     *             PRECONDITION: receive a list of <word, ["doc1=n1/N1", "doc2=n2/N2"]>
     *             POSTCONDITION: <"word@doc1,  [d/D, n/N, TF-IDF]">
     */
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // get the number of documents indirectly from the file-system (stored in the job name on purpose)
        int numberOfDocumentsInCorpus = Integer.parseInt(context.getJobName());
        // total frequency of this word
        int numberOfDocumentsInCorpusWhereKeyAppears = 0;
        Map<String, String> tempFrequencies = new HashMap<String, String>();
        for (Text val : values) {
            String[] documentAndFrequencies = val.toString().split("=");
            numberOfDocumentsInCorpusWhereKeyAppears++;
            tempFrequencies.put(documentAndFrequencies[0], documentAndFrequencies[1]);
        }
        for (String document : tempFrequencies.keySet()) {
            String[] wordFrequenceAndTotalWords = tempFrequencies.get(document).split("/");

            //Term frequency is the quocient of the number of terms in document and the total number of terms in doc
            double tf = Double.valueOf(Double.valueOf(wordFrequenceAndTotalWords[0])
                    / Double.valueOf(wordFrequenceAndTotalWords[1]));

            //interse document frequency quocient between the number of docs in corpus and number of docs the term appears
            double idf = (double) numberOfDocumentsInCorpus / (double) numberOfDocumentsInCorpusWhereKeyAppears;

            //given that log(10) = 0, just consider the term frequency in documents
            double tfIdf = numberOfDocumentsInCorpus == numberOfDocumentsInCorpusWhereKeyAppears ?
                    tf : tf * Math.log10(idf);

            context.write(new Text(key + "@" + document), new Text("[" + numberOfDocumentsInCorpusWhereKeyAppears + "/"
                    + numberOfDocumentsInCorpus + " , " + wordFrequenceAndTotalWords[0] + "/"
                    + wordFrequenceAndTotalWords[1] + " , " + DF.format(tfIdf) + "]"));
        }
    }
}

I have implemented the TestCases for both the Mapper and Reducer classes, but for simplification of this post, I will skip those. Let’s take a look a the driver written, since it captures the total number of documents directly from the filesystem using the buil-in Hadoop API. Definitely no need for another MapReduce phase for that. As described in the Cloudera’s training, the less the better since we are saving resources utilization :). Anyway, let’s go to the Driver implementation.

Job3, Driver
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * WordFrequenceInDocument Creates the index of the words in documents,
 * mapping each of them to their frequency.
 * @author Marcello de Sales (marcello.desales@gmail.com)
 * @version "Hadoop 0.20.1"
 */
public class WordsInCorpusTFIDF extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "3-tf-idf";

    // where to read the data from.
    private static final String INPUT_PATH = "2-word-counts";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Word in Corpus, TF-IDF");

        job.setJarByClass(WordsInCorpusTFIDF.class);
        job.setMapperClass(WordsInCorpusTFIDFMapper.class);
        job.setReducerClass(WordsInCorpusTFIDFReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //Getting the number of documents from the original input directory.
        Path inputPath = new Path("input");
        FileSystem fs = inputPath.getFileSystem(conf);
        FileStatus[] stat = fs.listStatus(inputPath);

        //Dirty hack to pass the total number of documents as the job name.
        //The call to context.getConfiguration.get("docsInCorpus") returns null when I tried to pass
        //conf.set("docsInCorpus", String.valueOf(stat.length)) Or even
        //conf.setInt("docsInCorpus", stat.length)
        job.setJobName(String.valueOf(stat.length));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordsInCorpusTFIDF(), args);
        System.exit(res);
    }
}

Continuing with the implementation, the only thing to do is to compile and run the final driver. Note that the input directory is the one containing the partial counts of documents of job 2, that is, “2-word-counts”. The output is the directory reserved fro step 3, or “3-tf-idf”. Then, the only way that I could send the total number in corpus was using the jobName. The Hadoop 0.20.1 API does not pass the values of the configuration at any cost. As documented, I tried using the context reference in the reducer class, but the reference only returned “null” for the call “context.getConfiguration().get(“docsInCorpus”)”. I gave up and looked for an option, and the JobName was the only way I could :).

I skipped the test session and compiled everything and ran the driver as follows:

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

compile:
    [javac] Compiling 11 source files to /home/training/git/exercises/shakespeare/bin

jar:
      [jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL

Then, finally running the calculator of words:

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordsInCorpusTFIDF
10/01/09 21:41:40 INFO input.FileInputFormat: Total input paths to process : 1
10/01/09 21:41:41 INFO mapred.JobClient: Running job: job_200912301017_0115
10/01/09 21:41:42 INFO mapred.JobClient:  map 0% reduce 0%
10/01/09 21:41:51 INFO mapred.JobClient:  map 100% reduce 0%
10/01/09 21:42:00 INFO mapred.JobClient:  map 100% reduce 100%
10/01/09 21:42:02 INFO mapred.JobClient: Job complete: job_200912301017_0115
10/01/09 21:42:02 INFO mapred.JobClient: Counters: 17
10/01/09 21:42:02 INFO mapred.JobClient:   Job Counters
10/01/09 21:42:02 INFO mapred.JobClient:     Launched reduce tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Launched map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Data-local map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:   FileSystemCounters
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_READ=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_READ=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4036022
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2943390
10/01/09 21:42:02 INFO mapred.JobClient:   Map-Reduce Framework
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input groups=0
10/01/09 21:42:02 INFO mapred.JobClient:     Combine output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map input records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce shuffle bytes=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Spilled Records=97558
10/01/09 21:42:02 INFO mapred.JobClient:     Map output bytes=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     Combine input records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map output records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input records=48779

Note that the final number of values are the same as the previous Job step. So, everything went ok as expected. Taking a look and the file in the output directory:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls 3-tf-idf
Found 2 items
drwxr-xr-x   - training supergroup          0 2010-01-09 21:41 /user/training/3-tf-idf/_logs
-rw-r--r--   1 training supergroup    2943390 2010-01-09 21:41 /user/training/3-tf-idf/part-r-00000

I decided to take a look at the same word I mentioned above “therefore”. Here’s the result for them, this time the output was automatically sorted by Hadoop.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 3-tf-idf/part-r-00000 | less
...
abook@leornardo-davinci-all.txt [1/3 , 3/149612 , 0.00000957]
aboriginal@the-outline-of-science-vol1.txt      [1/3 , 1/70650 , 0.00000675]
abortive@all-shakespeare        [2/3 , 4/738781 , 0.00000095]
...
therefore@all-shakespeare       [3/3 , 652/738781 , 0.00088253]
therefore@the-outline-of-science-vol1.txt       [3/3 , 36/70650 , 0.00050955]
therefore@leornardo-davinci-all.txt     [3/3 , 124/149612 , 0.00082881]
...

Taking a look at chapter 7 of the book, it is clear to make the following conclusions about the output presented:

  • The term “therefore” is more relevant in the document “all-shakespeare”, since its occurrence is more likely to happen than in the other documents;
  • Other terms that does not appear in all documents such as “abook”, “aboriginal” and “abortive” have very small relevance for the given corpus of documents.

What’s Next?

I had so much fun with my first contact with Hadoop that I am going to the Cloudera Training here in the Bay Area in about 10 days. Although the training covers the Hadoop 0.18 API, I decided to use the Hadoop 0.20.1 API because I just wanted to try a “cleaner” API.

As for next steps from this exercise, one could do the categorized classification of the terms per document in a data-centric way (document -> term -> tf-idf), or whatever your need is. Time to go play with Apache PIG and HBase.

TF-IDF in Hadoop Part 2: Word Counts For Docs

January 6, 2010 1 comment

The TF-IDF algorithm can be implemented in different ways. The Cloudera Hadoop training defines different steps on the implementation of each of the steps through different Jobs. I decided to take the approach of persisting the intermediate data before the execution of the subsequent steps. This part documents the implementation of Job 2 as the second part of my experiments with Hadoop.

Part 1’s goal is to generate the word frequency for each of the documents in the input path provided, persisted at the “1-word-freq” output directory, as shown below:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 1-word-freq/part-r-00000 | less
…
therefore@all-shakespeare 652
therefore@leornardo-davinci-all.txt 124
therefore@the-outline-of-science-vol1.txt 36

The definition of Job 2 will take into account the structure of this data in the creation of the Mapper and Reducer classes.

Job 2: Word Counts for Docs

The goal of this job is to count the total number of words for each document, in a way to compare each word with the total number of words. I’ve tried to implement a default InputFormat and I couldn’t find examples related to it. As I understood, the values could be read in the same format they are saved (Text, IntWritable), but I will keep it simple and use the same default InputFormat as before. Following the same definition as in part one, the specification of the Map and Reduce are as follows:

  • Map:
    • Input: ((word@document), n)
    • Re-arrange the mapper to have the key based on each document
    • Output: (document, word=n)
  • Reducer
    • N = totalWordsInDoc = sum [word=n]) for each document
    • Output: ((word@document), (n/N))

Note that the format used for the input of the mapper is the output for the previous job. The delimiters “@” and “/” were randomly picked to better represent the intent of the data. So, feel free to pick anything you prefer. The reducer just need to sum the total number of values in a document and pass this value over to the next step, along with the previous number of values, as necessary data for the next step.

I have learned that the Iterable values in the values of the Reducer class can’t be iterated more than once. The loop just did not enter when two foreach operations were performed, so I implemented it using a temporary map.
Job2, Mapper
package index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * LineIndexMapper Maps each observed word in a line to a (filename@offset) string.
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordCountsForDocsMapper extends Mapper<LongWritable, Text, Text, Text> {

    public WordCountsForDocsMapper() {
    }

    /**
     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param context
     *
     *     PRE-CONDITION: aa@leornardo-davinci-all.txt    1
     *                    aaron@all-shakespeare   98
     *                    ab@leornardo-davinci-all.txt    3
     *
     *     POST-CONDITION: Output <"all-shakespeare", "aaron=98"> pairs
     */
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] wordAndDocCounter = value.toString().split("\t");
        String[] wordAndDoc = wordAndDocCounter[0].split("@");
        context.write(new Text(wordAndDoc[1]), new Text(wordAndDoc[0] + "=" + wordAndDocCounter[1]));
    }
}
Job2, Mapper Unit Test
I have just simplified the unit test to verify if the test Mapper generates the format needed for the Reducer.
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the word count mapper.
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordCountsForDocsMapperTest extends TestCase {

    private Mapper<LongWritable, Text, Text, Text> mapper;
    private MapDriver<LongWritable, Text, Text, Text> driver;

    @Before
    public void setUp() {
        mapper = new WordCountsForDocsMapper();
        driver = new MapDriver<LongWritable, Text, Text, Text>(mapper);
    }

    @Test
    public void testMultiWords() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("crazy@all-shakespeare\t25")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("all-shakespeare"), new Text("crazy=25")));
        assertListEquals(expected, out);
    }
}
Job 2, Reducer
package index;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * WordCountsForDocsReducer counts the number of documents in the
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordCountsForDocsReducer extends Reducer<Text, Text, Text, Text> {

    public WordCountsForDocsReducer() {
    }

    /**
     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *
     *        PRE-CONDITION: receive a list of <document, ["word=n", "word-b=x"]>
     *            pairs <"a.txt", ["word1=3", "word2=5", "word3=5"]>
     *
     *       POST-CONDITION: <"word1@a.txt, 3/13">,
     *            <"word2@a.txt, 5/13">
     */
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int sumOfWordsInDocument = 0;
        Map<String, Integer> tempCounter = new HashMap<String, Integer>();
        for (Text val : values) {
            String[] wordCounter = val.toString().split("=");
            tempCounter.put(wordCounter[0], Integer.valueOf(wordCounter[1]));
            sumOfWordsInDocument += Integer.parseInt(val.toString().split("=")[1]);
        }
        for (String wordKey : tempCounter.keySet()) {
            context.write(new Text(wordKey + "@" + key.toString()), new Text(tempCounter.get(wordKey) + "/"
                    + sumOfWordsInDocument));
        }
    }
}
Job 2, Reducer Unit Test
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the reducer of the word counts.
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordCountsForDocsReducerTest extends TestCase {

    private Reducer<Text, Text, Text, Text> reducer;
    private ReduceDriver<Text, Text, Text, Text> driver;

    @Before
    public void setUp() {
        reducer = new WordCountsForDocsReducer();
        driver = new ReduceDriver<Text, Text, Text, Text>(reducer);
    }

    @Test
    public void testMultiWords() {
        List<Pair<Text, Text>> out = null;

        try {
            List<Text> values = new ArrayList<Text>();
            values.add(new Text("car=50"));
            values.add(new Text("hadoop=15"));
            values.add(new Text("algorithms=25"));
            out = driver.withInput(new Text("document"), values).run();

        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("car@document"), new Text("50/90")));
        expected.add(new Pair<Text, Text>(new Text("hadoop@document"), new Text("15/90")));
        expected.add(new Pair<Text, Text>(new Text("algorithms@document"), new Text("25/90")));
        assertListEquals(expected, out);
    }

}

Once again, following our Test-Driven Development approach, let’s test our Mapper and Reducer classes in order to verify its “correctness” of the generated data. The JUnit 4 Test suit is updated as follows:

Tests Suit
package index;

import junit.framework.Test;
import junit.framework.TestSuite;

/**
 * All tests for the TF-IDF algorithm
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public final class AllTests  {

  private AllTests() { }

  public static Test suite() {
    TestSuite suite = new TestSuite("Tests for the TF-IDF algorithm");

    suite.addTestSuite(WordFreqMapperTest.class);
    suite.addTestSuite(WordFreqReducerTest.class);
    suite.addTestSuite(WordCountsForDocsMapperTest.class);
    suite.addTestSuite(WordCountsForDocsReducerTest.class);

    return suite;
  }
}

Just testing it with the ANT task test, defined in the build.xml artifact.

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml

compile:
[javac] Compiling 12 source files to /home/training/git/exercises/shakespeare/bin

test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 7, Failures: 0, Errors: 0, Time elapsed: 0.424 sec
[junit] Tests run: 7, Failures: 0, Errors: 0, Time elapsed: 0.424 sec
[junit]

BUILD SUCCESSFUL
Similar to the previous Part 1, the the execution of the Driver is safer to proceed with tested classes. Furthermore, it includes the definitions of the mapper and reducer classes, as well as defining the combiner class to be the same as the reducer class. Also, note that the definition of the outputKeyClass and outputValueClass are the same as the ones defined by the Reducer class!!! Once again, Hadoop complains whey they are different 🙂
Job2, Driver
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * WordCountsInDocuments counts the total number of words in each document and
 * produces data with the relative and total number of words for each document.
 * Hadoop 0.20.1 API
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordCountsInDocuments extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "2-word-counts";

    // where to read the data from.
    private static final String INPUT_PATH = "1-word-freq";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Words Counts");

        job.setJarByClass(WordCountsInDocuments.class);
        job.setMapperClass(WordCountsForDocsMapper.class);
        job.setReducerClass(WordCountsForDocsReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordCountsInDocuments(), args);
        System.exit(res);
    }
}

The input data is located in the directory of the first step “1-word-freq”, and the output persisted in the directory “2-word-counts” as listed in the main training directory in the HDFS. If you need to take a look at the ANT build and other classes, go to my personal resources at my Google Code Library Project. Recompile the project and generate the updated Jar with the driver.

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

compile:
  [javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
  [javac] Note: Some input files use or override a deprecated API.
  [javac] Note: Recompile with -Xlint:deprecation for details.

jar:
  [jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL
Total time: 1 second

Now, executing the driver…

 training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordCountsInDocuments
10/01/06 16:28:04 INFO input.FileInputFormat: Total input paths to process : 1
10/01/06 16:28:04 INFO mapred.JobClient: Running job: job_200912301017_0048
10/01/06 16:28:05 INFO mapred.JobClient: map 0% reduce 0%
10/01/06 16:28:12 INFO mapred.JobClient: map 100% reduce 0%
10/01/06 16:28:18 INFO mapred.JobClient: map 100% reduce 100%
10/01/06 16:28:20 INFO mapred.JobClient: Job complete: job_200912301017_0048
10/01/06 16:28:20 INFO mapred.JobClient: Counters: 17
10/01/06 16:28:20 INFO mapred.JobClient: Job Counters
10/01/06 16:28:20 INFO mapred.JobClient: Launched reduce tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: Launched map tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: Data-local map tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: FileSystemCounters
10/01/06 16:28:20 INFO mapred.JobClient: FILE_BYTES_READ=1685803
10/01/06 16:28:20 INFO mapred.JobClient: HDFS_BYTES_READ=1588239
10/01/06 16:28:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=3371638
10/01/06 16:28:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1920431
10/01/06 16:28:20 INFO mapred.JobClient: Map-Reduce Framework
10/01/06 16:28:20 INFO mapred.JobClient: Reduce input groups=0
10/01/06 16:28:20 INFO mapred.JobClient: Combine output records=0
10/01/06 16:28:20 INFO mapred.JobClient: Map input records=48779
10/01/06 16:28:20 INFO mapred.JobClient: Reduce shuffle bytes=1685803
10/01/06 16:28:20 INFO mapred.JobClient: Reduce output records=0
10/01/06 16:28:20 INFO mapred.JobClient: Spilled Records=97558
10/01/06 16:28:20 INFO mapred.JobClient: Map output bytes=1588239
10/01/06 16:28:20 INFO mapred.JobClient: Combine input records=0
10/01/06 16:28:20 INFO mapred.JobClient: Map output records=48779
10/01/06 16:28:20 INFO mapred.JobClient: Reduce input records=48779

Note that the execution generates tens of thousands of documents shuffled from ~1.6 million entries. Let’s check the result using the hadoop fs -cat command once again and navigate through the result. The most important thing to note is that the relation n/N are maintained throughout the results, for each word and each total number for each document.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 2-word-counts/part-r-00000 | less
....
relished@all-shakespeare 1/738781
therefore@all-shakespeare 652/738781
eastward@all-shakespeare 1/738781
....
irrespective@leornardo-davinci-all.txt 1/149612
ignorance@leornardo-davinci-all.txt 12/149612
drawing@leornardo-davinci-all.txt 174/149612
relief@leornardo-davinci-all.txt 36/149612
...
answer@the-outline-of-science-vol1.txt 25/70650
sleeve@the-outline-of-science-vol1.txt 1/70650
regard@the-outline-of-science-vol1.txt 22/70650

Part 3 will conclude this job by combining two different steps. I’m still using the original basic tutorial from Cloudera, but using the Hadoop 0.20.1 API. Any suggestions for improvements are welcomed:

  • How to write data pipes between 2 different jobs?
  • How to write a custom input format?

Those questions might be answered after the training in Sunnyvale on January 19-21, during the Hadoop Training I’m excited to attend. I can also accept suggestions!!! 😀

Hadoop 0.20.1 API: refactoring the InvertedLine example from Cloudera Training, removing deprecated classes (JobConf, others)

January 5, 2010 8 comments

I’ve been learning Hadoop for the past 15 days and I have found lots of examples of source-code. The basic training offered by Cloudera uses the 0.18 API, as well as the Yahoo developer’s tutorial that describe the example of a the Inverted Line Index example. The input of this example is a list of one or more text files containing books, and the output is the index of words appearing on each of the files in the format “”, where word is found on a given line of the given fileName at the byte offset given. Although the example works without a problem, I’ve read documentations about the Pig application where the majority of the warnings are caused by the API change. I’m particularly in favour of clean code without warnings, whenever possible. So, I started dissecting the API and could re-implement the examples using the Hadoop 0.20.1. Furthermore, the MRUnit must also be refactored in order to make use of the new API.

Both the Yahoo Hadoop Tutorial and the Cloudera Basic Training documentation “Writing MapReduce Programs” give the example of the InvertedIndex application. I used the Cloudera VMWare implementation and source-code as a starting point.

The first major change was the inclusion of the mapreduce package, containing the new implementation of the Mapper and Reducer classes, which were Interfaces in the previous APIs in the package “mapred”.

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

class MyMapper extends Mapper {
    ...
    ...
}

class MyReducer extends Reducer {
    ...
    ...
}

Also, note that these classes use the Java generics capabilities and therefore, the methods “map()” and “reduce()” must follow the convention given in your implementation. Both methods removed the use of the reporter and collector by the use of a Context class, that is a static member class of each of the Mapper and Reducer classes.

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

class MyMapper extends Mapper {
    ...
    protected void map(K key, V value, Mapper.Context context) {
       ...
    }
}

class MyReducer extends Reducer {
    ...
    protected void reduce(K key, Iterable<V> values, Context context)
}

Consider K and V as generic Writable classes from the Hadoop API, they must be used in the implementation. For instance, I used to have an Iterable implementation for the key in the reducer, and the reduce method was never called with the wrong method signature. So, it is important to verify that you’re using the same Iterable class for the values.

The mapper class just need the new API from the new package. The new imported classes are highlighted in the mapper and reducer codes.

Mapper Class

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * LineIndexMapper Maps each observed word in a line to a (filename@offset) string.
 */
public class LineIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

    public LineIndexMapper() {
    }

    /**
     * Google's search Stopwords
     */
    private static Set<String> googleStopwords;

    static {
        googleStopwords = new HashSet<String>();
        googleStopwords.add("I"); googleStopwords.add("a"); 
        googleStopwords.add("about"); googleStopwords.add("an"); 
        googleStopwords.add("are"); googleStopwords.add("as");
        googleStopwords.add("at"); googleStopwords.add("be"); 
        googleStopwords.add("by"); googleStopwords.add("com"); 
        googleStopwords.add("de"); googleStopwords.add("en");
        googleStopwords.add("for"); googleStopwords.add("from"); 
        googleStopwords.add("how"); googleStopwords.add("in"); 
        googleStopwords.add("is"); googleStopwords.add("it");
        googleStopwords.add("la"); googleStopwords.add("of"); 
        googleStopwords.add("on"); googleStopwords.add("or"); 
        googleStopwords.add("that"); googleStopwords.add("the");
        googleStopwords.add("this"); googleStopwords.add("to"); 
        googleStopwords.add("was"); googleStopwords.add("what"); 
        googleStopwords.add("when"); googleStopwords.add("where");
        googleStopwords.add("who"); googleStopwords.add("will"); 
        googleStopwords.add("with"); googleStopwords.add("and"); 
        googleStopwords.add("the"); googleStopwords.add("www");
    }

    /**
     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param output has the method "collect()" to output the key,value pair
     * @param reporter allows us to retrieve some information about the job (like the current filename)
     *
     *     POST-CONDITION: Output <"word", "filename@offset"> pairs
     */
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Compile all the words using regex
        Pattern p = Pattern.compile("\\w+");
        Matcher m = p.matcher(value.toString());

        // Get the name of the file from the inputsplit in the context
        String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

        // build the values and write <k,v> pairs through the context
        StringBuilder valueBuilder = new StringBuilder();
        while (m.find()) {
            String matchedKey = m.group().toLowerCase();
            // remove names starting with non letters, digits, considered stopwords or containing other chars
            if (!Character.isLetter(matchedKey.charAt(0)) || Character.isDigit(matchedKey.charAt(0))
                    || googleStopwords.contains(matchedKey) || matchedKey.contains("_")) {
                continue;
            }
            valueBuilder.append(fileName);
            valueBuilder.append("@");
            valueBuilder.append(key.get());
            // emit the partial <k,v>
            context.write(new Text(matchedKey), new Text(valueBuilder.toString()));
            valueBuilder.setLength(0);
        }
    }
}

Reducer Class

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * LineIndexReducer Takes a list of filename@offset entries for a single word and concatenates them into a list.
 */
public class LineIndexReducer extends Reducer<Text, Text, Text, Text> {

    public LineIndexReducer() {
    }

    /**
     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *
     *      PRE-CONDITION: receive a list of <"word", "filename@offset"> pairs
     *        <"marcello", ["a.txt@3345", "b.txt@344", "c.txt@785"]>
     *
     *      POST-CONDITION: emit the output a single key-value where all the file names
     *        are separated by a comma ",".
     *        <"marcello", "a.txt@3345,b.txt@344,c.txt@785">
     */
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder valueBuilder = new StringBuilder();

        for (Text val : values) {
            valueBuilder.append(val);
            valueBuilder.append(",");
        }
        //write the key and the adjusted value (removing the last comma)
        context.write(key, new Text(valueBuilder.substring(0, valueBuilder.length() - 1)));
        valueBuilder.setLength(0);
    }
}

These are the changes necessary for the Mapper and Reducer classes, without the need to extend the base classes. In order to unit test these classes, changes on the MRUnit are also necessary. The drivers were also added a new “mapreduce” package with the same counterparts.

Instead of the mrunit.MapDriver, use the mapreduce.MapDriver. The same for the Reducer class. The rest of the code is just the same.

import org.apache.hadoop.mrunit.MapDriver;

import org.apache.hadoop.mrunit.mapreduce.MapDriver;

JUnit’s MapperTest

Some changes also are required in the MRUnit API classes, following the same pattern as the main API: the addition of the package “mapreduce” and new implementing classes.

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mock.MockInputSplit;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the inverted index mapper.
 */
public class MapperTest extends TestCase {

    private Mapper<LongWritable, Text, Text, Text> mapper;
    private MapDriver<LongWritable, Text, Text, Text> driver;

    /** We expect pathname@offset for the key from each of these */
    private final Text EXPECTED_OFFSET = new Text(MockInputSplit.getMockPath().toString() + "@0");

    @Before
    public void setUp() {
        mapper = new LineIndexMapper();
        driver = new MapDriver<LongWritable, Text, Text, Text>(mapper);
    }

    @Test
    public void testEmpty() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();

        assertListEquals(expected, out);
    }

    @Test
    public void testOneWord() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("foo")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("foo"), EXPECTED_OFFSET));

        assertListEquals(expected, out);
    }

    @Test
    public void testMultiWords() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("foo bar baz!!!! ????")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("foo"), EXPECTED_OFFSET));
        expected.add(new Pair<Text, Text>(new Text("bar"), EXPECTED_OFFSET));
        expected.add(new Pair<Text, Text>(new Text("baz"), EXPECTED_OFFSET));

        assertListEquals(expected, out);
    }
}

JUnit’s ReducerTest

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the inverted index reducer.
 */
public class ReducerTest extends TestCase {

    private Reducer<Text, Text, Text, Text> reducer;
    private ReduceDriver<Text, Text, Text, Text> driver;

    @Before
    public void setUp() {
        reducer = new LineIndexReducer();
        driver = new ReduceDriver<Text, Text, Text, Text>(reducer);
    }

    @Test
    public void testOneOffset() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("word"), new Text("offset")));

        assertListEquals(expected, out);
    }

    @Test
    public void testMultiOffset() {
        List<Pair<Text, Text>> out = null;

        try {
            out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset1")).withInputValue(
                    new Text("offset2")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();
        expected.add(new Pair<Text, Text>(new Text("word"), new Text("offset1,offset2")));

        assertListEquals(expected, out);
    }
}

You can test them using the command “ant test” on the source-code directory as usual to confirm that the implementation is correct:

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml</span></span>

compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin

test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit]

BUILD SUCCESSFUL
Total time: 2 seconds

Replacing JobConf and other deprecated classes

Other changes related to the API is on the configuration of the execution of the jobs. The class “JobConf” was deprecated, but most of the tutorials have not been updated. So, here’s the updated version of the main example driver using the Configuration and Context classes. Note that the job is configured and executed with the default version of the configuration. It is the class responsible for configuring the execution of the tasks. Once again, the replacement of the classes located at the package “mapred” is important, since the new classes are located at the package “mapreduce”. The following code highlights the new classes imported and how they are used throughout the Driver.

InvertedIndex driver

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * LineIndexer Creates an inverted index over all the words in a document corpus, mapping each observed word to a list
 * of filename@offset locations where it occurs.
 */
public class LineIndexer extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "output";

    // where to read the data from.
    private static final String INPUT_PATH = "input";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Line Indexer 1");

        job.setJarByClass(WordFrequenceInDocument.class);
        job.setMapperClass(LineIndexMapper.class);
        job.setReducerClass(LineIndexReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordFrequenceInDocument(), args);
        System.exit(res);
    }
}

After updating, make sure to run generate a new jar, remove anything under the directory “output” (since the program does not clean that up), and execute the new version.

training@training-vm:~/git/exercises/shakespeare$ ant jar
Buildfile: build.xml</span></span>

compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin

jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL
Total time: 1 second

I have added 2 ASCII books in the input directory: the works from Leonardo Da Vinci and the first volume of the book “The outline of science”.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls input
Found 3 items
-rw-r--r--   1 training supergroup    5342761 2009-12-30 11:57 /user/training/input/all-shakespeare
-rw-r--r--   1 training supergroup    1427769 2010-01-04 17:42 /user/training/input/leornardo-davinci-all.txt
-rw-r--r--   1 training supergroup     674762 2010-01-04 17:42 /user/training/input/the-outline-of-science-vol1.txt</span></span>

The execution and output of running this example is shown as follows.

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.LineIndexer
10/01/04 21:11:55 INFO input.FileInputFormat: Total input paths to process : 3
10/01/04 21:11:56 INFO mapred.JobClient: Running job: job_200912301017_0017
10/01/04 21:11:57 INFO mapred.JobClient:  map 0% reduce 0%
10/01/04 21:12:07 INFO mapred.JobClient:  map 33% reduce 0%
10/01/04 21:12:10 INFO mapred.JobClient:  map 58% reduce 0%
10/01/04 21:12:13 INFO mapred.JobClient:  map 63% reduce 0%
10/01/04 21:12:16 INFO mapred.JobClient:  map 100% reduce 11%
10/01/04 21:12:28 INFO mapred.JobClient:  map 100% reduce 77%
10/01/04 21:12:34 INFO mapred.JobClient:  map 100% reduce 100%
10/01/04 21:12:36 INFO mapred.JobClient: Job complete: job_200912301017_0017
10/01/04 21:12:36 INFO mapred.JobClient: Counters: 17
10/01/04 21:12:36 INFO mapred.JobClient:   Job Counters
10/01/04 21:12:36 INFO mapred.JobClient:     Launched reduce tasks=1
10/01/04 21:12:36 INFO mapred.JobClient:     Launched map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient:     Data-local map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient:   FileSystemCounters
10/01/04 21:12:36 INFO mapred.JobClient:     FILE_BYTES_READ=58068623
10/01/04 21:12:36 INFO mapred.JobClient:     HDFS_BYTES_READ=7445292
10/01/04 21:12:36 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=92132872
10/01/04 21:12:36 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=26638259
10/01/04 21:12:36 INFO mapred.JobClient:   Map-Reduce Framework
10/01/04 21:12:36 INFO mapred.JobClient:     Reduce input groups=0
10/01/04 21:12:36 INFO mapred.JobClient:     Combine output records=0
10/01/04 21:12:36 INFO mapred.JobClient:     Map input records=220255
10/01/04 21:12:36 INFO mapred.JobClient:     Reduce shuffle bytes=34064153
10/01/04 21:12:36 INFO mapred.JobClient:     Reduce output records=0
10/01/04 21:12:36 INFO mapred.JobClient:     Spilled Records=2762272
10/01/04 21:12:36 INFO mapred.JobClient:     Map output bytes=32068217
10/01/04 21:12:36 INFO mapred.JobClient:     Combine input records=0
10/01/04 21:12:36 INFO mapred.JobClient:     Map output records=997959
10/01/04 21:12:36 INFO mapred.JobClient:     Reduce input records=997959

The index entry for the word “abandoned” is an example of one present in all of the books:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat output/part-r-00000 | less
 ...
 ...
abandoned       leornardo-davinci-all.txt@1257995,leornardo-davinci-all.txt@652992,all-shakespeare@4657862,all-shakespeare@738818,the-outline-of-science-vol1.txt@642211,the-outline-of-science-vol1.txt@606442,the-outline-of-science-vol1.txt@641585
...
...

TF-IDF in Hadoop Part 1: Word Frequency in Doc

December 31, 2009 4 comments

My interest about parallel computing dates since my undergraduate school, just one or two years after Google’s paper was published about how to make efficient data processing. From that time on, I was wondering how they manage to index “the web”. As I started learning the API and the HDFS, as well as exploring the implementation of the TF-IDF algorithm, as explained by the Cloudera training. I started this implementation after I implemented the InvertedIndex example using both the Hadoop 0.18 and the 0.20.1 APIs. The parts of my experiences are defined as follows:

This code uses the Hadoop 0.20.1 API.

7 years passed and while writing my thesis project, I started dealing with the same questions regarding large datasets… How to process them on a database level? I mean, how to efficiently process with the computational resources you’ve got? Interestingly enough, my first contact with a MapReduce processing was with the mongoDB’s MapReduce API to access data in parallel in different shards in of a database cluster. If the data is stored in different shards depending on different properties of the data. And of course, one of the tools to process the distributed data is a MapReduce API. I learned how to use that API thanks to the Cloudera’s Basic Training on MapReduce and HDFS. This first documentation was produced after studying and completing the first exercises of the Cloudera’s InverseIndex Example using Hadoop, where I have downloaded the VMPlayer image and played with initial examples, driven by the PDF explaining the exercises. Although the source-code works without a problem, it uses the Hadoop 0.18 API, and if you get buzzed by the warnings on Eclipse, I have updated and documented the necessary changes to remove those and use the refactored version of InverseIndex using the Hadoop 0.20.1 API.

I finally found the Cloudera basic introduction training on MapReduce and Hadoop… and let me tell you, they made the nicest introduction to MapReduce I’ve ever seen 🙂 The slides and documentation are very well structured and nice to follow (considering you came from the academic world)… They actually worked closely with Google and the University of Washington to get to that level… I’m was very pleased to read and understand the concept… My only need on that time was to use that knowledge on the MapReduce engine from mongoDB… I did a simple application and it proved to be interesting…

So, I’ve been studying the Cloudera basic training in Hadoop, and that was the only way I could learn MapReduce! If you have a good background on Java 5/6, Linux, Operating System, Shell, etc, you can definitely move on… If you don’t have experience with Hadoop, I definitely suggest following the basic training from sessions 1 – 5, including the InvertedIndex exercise. You will find the exercises describing the TF-IDF algorithm in one of the PDFs.

The first implementation I did with Hadoop was the implementation of the indexing of words on All the Shakespeare collection. However, I was intrigued and could not resist and downloaded more e-books from the Gutenberg project (all Da-Vinci books and The Outline of Science Vol1). The input directory includes the collection from Shakespeare books, but I had to put the new ones into the filesystem. You can add the downloaded files to the Hadoop File System by using the “copyFromLocal” command:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -copyFromLocal the-outline-of-science-vol1.txt input
training@training-vm:~/git/exercises/shakespeare$ hadoop fs -copyFromLocal leornardo-davinci-all.txt input

You can verify if the files were added by listing the contents of the “input” directory.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls input
Found 3 items
-rw-r--r--   1 training supergroup    5342761 2009-12-30 11:57 /user/training/input/all-shakespeare
-rw-r--r--   1 training supergroup    1427769 2010-01-04 17:42 /user/training/input/leornardo-davinci-all.txt
-rw-r--r--   1 training supergroup     674762 2010-01-04 17:42 /user/training/input/the-outline-of-science-vol1.txt

Note that the command “hadoop fs” proxies any unix program to its filesystem. “-ls”, “-cat”, among others. Following the suggestion of the documentation, the approach I took to easily understand the concepts was to device-to-conquer. Each of the jobs are executed in separate as an exercise, saving the generated reduced values into the HDFS.

Job 1: Word Frequency in Doc

As mentioned before, the word frequency phase is designed in a Job whose task is to count the number of words in each of the documents in the input directory. In this case, the specification of the Map and Reduce are as follows:

  • Map:
    • Input: (document, each line contents)
    • Output: (word@document, 1))
  • Reducer
    • n = sum of the values of for each key “word@document”
    • Output: ((word@document), n)

In order to decrease the payload received by reducers, I’m considering the very-high-frequency words such as “the” as the Google’s stopwords list. Also, the result of each job is the intermediate values for the next jobs are saved to the regular file, followed by the next MapReduce pass. In general, the strategy is:

  1. Reduces the map phase by using the lower-case values, because they will be aggregated before the reduce phase;
  2. Don’t use unnecessary words by verifying in the stopwords dictionary (Google search stopwords);
  3. Use RegEx to select only words, removing punctuation and other data anomalies;

Job1, Mapper

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * WordFrequenceInDocMapper implements the Job 1 specification for the TF-IDF algorithm
 */
public class WordFrequenceInDocMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    public WordFrequenceInDocMapper() {
    }

    /**
     * Google's search Stopwords
     */
    private static Set<String> googleStopwords;

    static {
        googleStopwords = new HashSet<String>();
        googleStopwords.add("I"); googleStopwords.add("a");
        googleStopwords.add("about"); googleStopwords.add("an");
        googleStopwords.add("are"); googleStopwords.add("as");
        googleStopwords.add("at"); googleStopwords.add("be");
        googleStopwords.add("by"); googleStopwords.add("com");
        googleStopwords.add("de"); googleStopwords.add("en");
        googleStopwords.add("for"); googleStopwords.add("from");
        googleStopwords.add("how"); googleStopwords.add("in");
        googleStopwords.add("is"); googleStopwords.add("it");
        googleStopwords.add("la"); googleStopwords.add("of");
        googleStopwords.add("on"); googleStopwords.add("or");
        googleStopwords.add("that"); googleStopwords.add("the");
        googleStopwords.add("this"); googleStopwords.add("to");
        googleStopwords.add("was"); googleStopwords.add("what");
        googleStopwords.add("when"); googleStopwords.add("where");
        googleStopwords.add("who"); googleStopwords.add("will");
        googleStopwords.add("with"); googleStopwords.add("and");
        googleStopwords.add("the"); googleStopwords.add("www");
    }

    /**
     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param output has the method "collect()" to output the key,value pair
     * @param reporter allows us to retrieve some information about the job (like the current filename)
     *
     *     POST-CONDITION: Output <"word", "filename@offset"> pairs
     */
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Compile all the words using regex
        Pattern p = Pattern.compile("\\w+");
        Matcher m = p.matcher(value.toString());

        // Get the name of the file from the inputsplit in the context
        String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

        // build the values and write <k,v> pairs through the context
        StringBuilder valueBuilder = new StringBuilder();
        while (m.find()) {
            String matchedKey = m.group().toLowerCase();
            // remove names starting with non letters, digits, considered stopwords or containing other chars
            if (!Character.isLetter(matchedKey.charAt(0)) || Character.isDigit(matchedKey.charAt(0))
                    || googleStopwords.contains(matchedKey) || matchedKey.contains("_")) {
                continue;
            }
            valueBuilder.append(matchedKey);
            valueBuilder.append("@");
            valueBuilder.append(fileName);
            // emit the partial <k,v>
            context.write(new Text(valueBuilder.toString()), new IntWritable(1));
        }
    }
}

Job1, Mapper Unit Test

Note that the unit tests use the JUnit 4 API. The MRUnit API is also updated to use the Hadoop 0.20.1 API for the Mapper and the respective MapDriver. Generics are used to emulate the actual implementation as well.

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mock.MockInputSplit;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the word frequency mapper.
 */
public class WordFreqMapperTest extends TestCase {

    private Mapper<LongWritable, Text, Text, IntWritable> mapper;
    private MapDriver<LongWritable, Text, Text, IntWritable> driver;

    /** We expect pathname@offset for the key from each of these */
    private final Text KEY_SUFIX = new Text("@" + MockInputSplit.getMockPath().toString());

    @Before
    public void setUp() {
        mapper = new WordFrequenceInDocMapper();
        driver = new MapDriver<LongWritable, Text, Text, IntWritable>(mapper);
    }

    @Test
    public void testEmpty() {
        List<Pair<Text, IntWritable>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, Text>> expected = new ArrayList<Pair<Text, Text>>();

        assertListEquals(expected, out);
    }

    @Test
    public void testOneWord() {
        List<Pair<Text, IntWritable>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("foo")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, IntWritable>> expected = new ArrayList<Pair<Text, IntWritable>>();
        expected.add(new Pair<Text, IntWritable>(new Text("foo" + KEY_SUFIX), new IntWritable(1)));

        assertListEquals(expected, out);
    }

    @Test
    public void testMultiWords() {
        List<Pair<Text, IntWritable>> out = null;

        try {
            out = driver.withInput(new LongWritable(0), new Text("foo bar baz!!!! ????")).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, IntWritable>> expected = new ArrayList<Pair<Text, IntWritable>>();
        expected.add(new Pair<Text, IntWritable>(new Text("foo" + KEY_SUFIX), new IntWritable(1)));
        expected.add(new Pair<Text, IntWritable>(new Text("bar" + KEY_SUFIX), new IntWritable(1)));
        expected.add(new Pair<Text, IntWritable>(new Text("baz" + KEY_SUFIX), new IntWritable(1)));

        assertListEquals(expected, out);
    }
}

Job1, Reducer

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * LineIndexReducer Takes a list of filename@offset entries for a single word and concatenates them into a list.
 */
public class WordFrequenceInDocReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    public WordFrequenceInDocReducer() {
    }

    /**
     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *
     *      PRE-CONDITION: receive a list of <"word@filename",[1, 1, 1, ...]> pairs
     *        <"marcello@a.txt", [1, 1]>
     *
     *      POST-CONDITION: emit the output a single key-value where the sum of the occurrences.
     *        <"marcello@a.txt", 2>
     */
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        //write the key and the adjusted value (removing the last comma)
        context.write(key, new IntWritable(sum));
    }
}

Job1, Reducer Unit Test

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
 * Test cases for the inverted index reducer.
 */
public class WordFreqReducerTest extends TestCase {

    private Reducer<Text, IntWritable, Text, IntWritable> reducer;
    private ReduceDriver<Text, IntWritable, Text, IntWritable> driver;

    @Before
    public void setUp() {
        reducer = new WordFrequenceInDocReducer();
        driver = new ReduceDriver<Text, IntWritable, Text, IntWritable>(reducer);
    }

    @Test
    public void testOneItem() {
        List<Pair<Text, IntWritable>> out = null;

        try {
            out = driver.withInputKey(new Text("word")).withInputValue(new IntWritable(1)).run();
        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, IntWritable>> expected = new ArrayList<Pair<Text, IntWritable>>();
        expected.add(new Pair<Text, IntWritable>(new Text("word"), new IntWritable(1)));

        assertListEquals(expected, out);
    }

    @Test
    public void testMultiWords() {
        List<Pair<Text, IntWritable>> out = null;

        try {
            List<IntWritable> values = new ArrayList<IntWritable>();
            values.add(new IntWritable(2));
            values.add(new IntWritable(5));
            values.add(new IntWritable(8));
            out = driver.withInput(new Text("word1"), values).run();

        } catch (IOException ioe) {
            fail();
        }

        List<Pair<Text, IntWritable>> expected = new ArrayList<Pair<Text, IntWritable>>();
        expected.add(new Pair<Text, IntWritable>(new Text("word1"), new IntWritable(15)));

        assertListEquals(expected, out);
    }
}

Before executing the hadoop application, make sure that the Mapper and Reducer classes are passing the unit tests for each of them. Test-Driven Development helps during the development of the Mappers and Reducers by identifying problems related to incorrect inherited methods (Generics in special), where wrong “map” or “reduce” method signatures may lead to skipping designed phases. Therefore, run the test cases before the actual execution of the driver classes is safer.

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml

compile:
[javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.

test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 4, Failures: 0, Errors: 0, Time elapsed: 0.279 sec
[junit] Tests run: 4, Failures: 0, Errors: 0, Time elapsed: 0.279 sec
[junit]

BUILD SUCCESSFUL
Total time: 2 seconds

Then, the execution of the Driver can proceed. It includes the definitions of the mapper and reducer classes, as well as defining the combiner class to be the same as the reducer class. Also, note that the definition of the outputKeyClass and outputValueClass are the same as the ones defined by the Reducer class!!! If not, Hadoop will complain! 🙂

Job1, Driver
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * WordFrequenceInDocument Creates the index of the words in documents,
 * mapping each of them to their frequency.
 */
public class WordFrequenceInDocument extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "1-word-freq";

    // where to read the data from.
    private static final String INPUT_PATH = "input";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Word Frequence In Document");

        job.setJarByClass(WordFrequenceInDocument.class);
        job.setMapperClass(WordFrequenceInDocMapper.class);
        job.setReducerClass(WordFrequenceInDocReducer.class);
        job.setCombinerClass(WordFrequenceInDocReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordFrequenceInDocument(), args);
        System.exit(res);
    }
}
As specified by the Driver class, the data is read from the books listed in the input directory from the HDFS and the output is the directory from this first step “1-word-freq”. The training virtual machine contains the necessary build scripts to compile and generate the jars for the execution of the map reduce application, as well as running Unit Tests for each of the Mapper and Reducer classes.
training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

compile:
[javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.

jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL
Total time: 1 second

After making sure that everything is working according to the tests, it is time to execute the main driver.

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordFrequenceInDocument
10/01/05 16:34:54 INFO input.FileInputFormat: Total input paths to process : 3
10/01/05 16:34:54 INFO mapred.JobClient: Running job: job_200912301017_0046
10/01/05 16:34:55 INFO mapred.JobClient:  map 0% reduce 0%
10/01/05 16:35:10 INFO mapred.JobClient:  map 50% reduce 0%
10/01/05 16:35:13 INFO mapred.JobClient:  map 66% reduce 0%
10/01/05 16:35:16 INFO mapred.JobClient:  map 100% reduce 0%
10/01/05 16:35:19 INFO mapred.JobClient:  map 100% reduce 33%
10/01/05 16:35:25 INFO mapred.JobClient:  map 100% reduce 100%
10/01/05 16:35:27 INFO mapred.JobClient: Job complete: job_200912301017_0046
10/01/05 16:35:27 INFO mapred.JobClient: Counters: 17
10/01/05 16:35:27 INFO mapred.JobClient:   Job Counters
10/01/05 16:35:27 INFO mapred.JobClient:     Launched reduce tasks=1
10/01/05 16:35:27 INFO mapred.JobClient:     Launched map tasks=3
10/01/05 16:35:27 INFO mapred.JobClient:     Data-local map tasks=3
10/01/05 16:35:27 INFO mapred.JobClient:   FileSystemCounters
10/01/05 16:35:27 INFO mapred.JobClient:     FILE_BYTES_READ=3129067
10/01/05 16:35:27 INFO mapred.JobClient:     HDFS_BYTES_READ=7445292
10/01/05 16:35:27 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4901739
10/01/05 16:35:27 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1588239
10/01/05 16:35:27 INFO mapred.JobClient:   Map-Reduce Framework
10/01/05 16:35:27 INFO mapred.JobClient:     Reduce input groups=0
10/01/05 16:35:27 INFO mapred.JobClient:     Combine output records=94108
10/01/05 16:35:27 INFO mapred.JobClient:     Map input records=220255
10/01/05 16:35:27 INFO mapred.JobClient:     Reduce shuffle bytes=1772576
10/01/05 16:35:27 INFO mapred.JobClient:     Reduce output records=0
10/01/05 16:35:27 INFO mapred.JobClient:     Spilled Records=142887
10/01/05 16:35:27 INFO mapred.JobClient:     Map output bytes=27375962
10/01/05 16:35:27 INFO mapred.JobClient:     Combine input records=1004372
10/01/05 16:35:27 INFO mapred.JobClient:     Map output records=959043
10/01/05 16:35:27 INFO mapred.JobClient:     Reduce input records=48779

The execution generates the output as shown in the following listing (note that I had piped the cat process to the less process for you to navigate over the stream). Searching for the word “therefore” shows its use on the different documents.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 1-word-freq/part-r-00000 | less</span>
...
therefore@all-shakespeare       652
therefore@leornardo-davinci-all.txt     124
therefore@the-outline-of-science-vol1.txt       36
...

The results produced are the intermediate data necessary as the input for the execution of the Job 2, specified in the Part 2 of this tutorial.

%d bloggers like this: