Archive

Posts Tagged ‘mapreduce’

mongoDB Shards, Cluster and MapReduce: experiments for Sensor Networks

This documents the use of mongoDB as the persistence layer for the collected data from NetBEAMS. It is divided into sections of setup and the CRUD (Create, Retrieve, Update, Delete) operations, as well as advanced topics such as data replication, the use of MapReduce, etc. This document is a copy of the experiments performed for my Masters Thesis Report entitled “A Key-Value-Based Persistence Layer for Sensor Networks“. The original wiki documentation can be found at MongoDBShardsClusterAndMapReduce.

The setup of the mongoDB shards must be performed on each cluster node. First, the relevant processes are started, and then the cluster must be configured with each of the shards, as well as indexes of the collections to be used. Before continuing on this section, refer to the following documentation:

In order to start collecting data, the mongoDB’s server must be set up on a single or distributed way. Using the distributed cluster version requires starting the commands on the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ ps aux | grep mongo
marcello  3391  0.0  0.2  67336  3328 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-1/ --port 20001
marcello  3397  0.0  0.2  59140  3280 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-2/ --port 20002
marcello  3402  0.0  0.2  59140  3276 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-3/ --port 20003
marcello  3406  0.0  0.3 157452  3980 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/config --port 10000
marcello  3431  0.4  0.2  62004  3332 pts/1    Sl   12:38   0:35 mongos -vvv --configdb localhost:10000
marcello  3432  0.0  0.0   5196   704 pts/1    S    12:38   0:00 tee logs/mongos-cluster-head.log
In summary, these processes are defined as follows:
  • Shards Node: each shard process “mongod” is responsible for managing its own “chunks” of data on a given “dbpath” directory, on a given port number. These processes are used by the cluster head “mongos”;
  • Cluster Metadata Server Node: the main metadata server of the cluster can be located on a local or foreign host. This listing above shows the metadata server “config” located in the same server, managed by the “mongod” process. It carries information about the databases, the list of shards, and the list of “chunks” of each database, including the location “Ip_address:port” of them;
  • Cluster Head Server: the orchestration of the cluster is performed by the “mongos” process. It connects to the cluster head to select which shard to be used, statistics about counters, etc. This is the main process that accepts the client requests.

Make sure to proxy the output of the processes to log files. As shown in the Listing above, the process “tee” is capturing the output for the process “mongos”. mongoDB’s process has additional parameters for that matter as well.

Considering that the proper processes are running, specially the metadata server and the main cluster head, the client process can be started to issue the commands to enable shards on a given database system. Since mongoDB client’s interface uses Javascript as the main programming language abstraction to manipulate data, a script can be used to automate the process of setting up the server. Before continuing, make sure you have covered the mongoDB’s documentation on how to setup database shards:

First, connect to the server using the client process “mongo”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo
MongoDB shell version: 1.2.0
url: test
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

After connected to the server through the client, get references to 2 important databases: the “admin” and “config”. The “admin” is a database system responsible for running commands to the cluster server, while the “config” is the reference to the metadata server. The following listing shows the use of the method “db.getSisterDB()” to retrieve those references:

> admin = db.getSisterDB("admin")
admin
> config = db.getSisterDB("config")
config
> 

Once the references are available, the use of the names as shortcuts makes the access better. Let’s add each shards that are running on the local and on the foreign servers (192.168.1.2) on different communication ports. It is important to note that the issued commands are executed on the metadata server “config”.

> admin.runCommand( { addshard: "192.168.1.2:20001" } )
Sun Dec 20 16:04:02 Request::process ns: admin.$cmd msg id:-2097268492 attempt: 0
Sun Dec 20 16:04:02 single query: admin.$cmd  { addshard: "192.168.1.2:20001" }  ntoreturn: -1

> admin.runCommand( { addshard: "192.168.1.2:20002" } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268491 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "192.168.1.2:20002" }  ntoreturn: -1

> admin.runCommand( { addshard: "localhost:20001", allowLocal: true } )
> 

In order to be added into the list, a shard server must be running. In case the shard is down at this point, it will be not added into the list of available shards. On the other hand, if it is added and it goes down, the mongos keeps sending heartbeat to verify if the shard has come back. Anyway, use the command “listshards” to list the existing shards that the cluster head can use.

> admin.runCommand( { listshards:1 } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268490 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "localhost:20001", allowLocal: true }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268489 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { listshards: 1.0 }  ntoreturn: -1
{
        "shards" : [
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ea"),
                        "host" : "192.168.1.2:20001"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6eb"),
                        "host" : "192.168.1.2:20002"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ec"),
                        "host" : "localhost:20001"
                }
        ],
        "ok" : 1
}
> 

Enabling the shards means to give the metadata server “config” the name of the database to be sharded, as well as the definition of the shard keys. The function “enablesharding” receives the name of the database system. The following listing shows the database “netbeams” being enabled. Later, the definition of the shard key must be given, as the key “observation.pH” is defined as the shard key:

> admin.runCommand({enablesharding:"netbeams"})
{"ok" : 1}
admin.runCommand( { shardcollection: "netbeams.SondeDataContainer", key: { "observation.pH" : 1} } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268488 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { enablesharding: "netbeams" }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268487 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { shardcollection: "netbeams.SondeDataContainer", key: { observation.pH: 1.0 } }  ntoreturn: -1
{"collectionsharded" : "netbeams.SondeDataContainer" , "ok" : 1}
> 

The chunks show the different sections of the data. By using the reference to the metadata database server, list the different shards “config.chunks.find()” to list the documents.

> config.chunks.find()
{ "lastmod" : { "t" : 1261341503000, "i" : 1 }, "ns" : "netbeams.SondeDataContainer", "min" : { "observation" : { "pH" : { $minKey : 1 } } },
"minDotted" : { "observation.pH" : { $minKey : 1 } }, "max" : { "observation" : { "pH" : { $maxKey : 1 } } }, "maxDotted" : { "observation.pH" : { $maxKey : 1 } },
"shard" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec9") }
> 

The next step is to create the indexes of the expected keys. This procedure can be defined after the documents are inserted. In general, defining indexes slows down on “Create” operations, but speeds up “Retrieval” ones. In order to proceed, make sure you have covered the documentation on mongoDB’s Indexes.

  • mongoDB Indexes: this is the documentation regarding indexes of keys on mongoDB.

Note, in the following Listing, that the keys are written to the metadata server “config”. A reference to the database “netbeams” is acquired by using the function “db.getSisterDB()” as it was used for the databases “config” and “admin”. The method “db.collection.ensureIndex()” is used.

> netbeams = db.getSisterDB("netbeams")
netbeams
> netbeams.SondeDataContainer.ensureIndex( { "message_id":1 } )
Sun Dec 20 16:04:03 Request::process ns: netbeams.system.indexes msg id:-2097268486 attempt: 0
Sun Dec 20 16:04:03  .system.indexes write for: netbeams.system.indexes
Sun Dec 20 16:04:03 Request::process ns: netbeams.$cmd msg id:-2097268485 attempt: 0
Sun Dec 20 16:04:03 single query: netbeams.$cmd  { getlasterror: 1.0 }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: test.$cmd msg id:-2097268484 attempt: 0
Sun Dec 20 16:04:03 single query: test.$cmd  { getlasterror: 1.0 }  ntoreturn: -1

netbeams.SondeDataContainer.ensureIndex( { "sensor.ip_address":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.latitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.longitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.valid":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.transaction":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.WaterTemperature":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.SpecificConductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Conductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Resistivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Salinity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Pressure":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Depth":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pH":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pHmV":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Turbidity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODOSaturation":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODO":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Battery":1 } )

Actually, you can verify the setup performed by accessing each of the collections of the config server. Using a client to access the server in a different shell, you can directly access and modify (NOT RECOMMENDED) the settings of the metadata server, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo config
MongoDB shell version: 1.2.0
url: config
connecting to: config
type "help" for help
> Sun Dec 20 16:31:57 connection accepted from 127.0.0.1:48589 #7
show collections
Sun Dec 20 16:32:01 Request::process ns: config.system.namespaces msg id:-128400130 attempt: 0
Sun Dec 20 16:32:01 single query: config.system.namespaces  { query: {}, orderby: { name: 1.0 } }  ntoreturn: 0
chunks
databases
shards
system.indexes
version

So the method “find()” can be used to list the contents of each of the collections. An example is to list the databases configured, showing the properties of each of them (partitioned or not, server host, etc), as shown in the following listing.

> db.databases.find()
Sun Dec 20 16:47:48 Request::process ns: config.databases msg id:-128400129 attempt: 0
Sun Dec 20 16:47:48 single query: config.databases  {}  ntoreturn: 0
{ "name" : "admin", "partitioned" : false, "primary" : "localhost:10000", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec7") }
{ "name" : "netbeams", "partitioned" : true, "primary" : "192.168.1.2:20002",
                  "sharded" : { "netbeams.SondeDataContainer" : { "key" : { "observation" : { "pH" : 1 } }, "unique" : false } },
                  "_id" : ObjectId("4b2e8b3fb342bcd910b62ec8") }
{ "name" : "test", "partitioned" : false, "primary" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62eca") }

Before proceeding, make sure you have covered the basics of mongoDB use:

Using the mongoDB client process “mongo”, access a given server “mongos” or “mongod”. The client access to “mongos” process executes the commands in the context of the entire cluster through the use of the metadata server “config”, while the “mongod” is used to access a given shard server, if necessary for debug processes. Use the commands specifying the server location and which database to use. The following listing shows the command to access a given shard on a given port, using the database “netbeams”.

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo 192.168.1.2:20001/netbeams
MongoDB shell version: 1.2.0
url: 192.168.1.2:20001/netbeams
connecting to: 192.168.1.2:20001/netbeams
type "help" for help

In order to verify the stats of a collection, use the function “collection.stats()”. This function verifies the counters stored in the metadata server.

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The access of a given document is randomly chosen from one of the shards by using the function “collection.findOne()”. It is a way to verify one example of the collected data.

> db.SondeDataContainer.findOne()
Sun Dec 20 14:59:08 Request::process ns: netbeams.SondeDataContainer msg id:-1701410103 attempt: 0
Sun Dec 20 14:59:08 shard query: netbeams.SondeDataContainer  {}
Sun Dec 20 14:59:08  have to set shard version for conn: 0x2909de0 ns:netbeams.SondeDataContainer my last seq: 0  current: 4
Sun Dec 20 14:59:08     setShardVersion  192.168.1.2:20002  netbeams.SondeDataContainer  { setShardVersion: "netbeams.SondeDataContainer",
configdb: "localhost:10000", version: Timestamp 1261341503000|1, serverID: ObjId(4b2e8b3eb342bcd910b62ec6) } 0x2909de0
Sun Dec 20 14:59:08       setShardVersion success!
{
        "_id" : ObjectId("e26f40072f68234b6af3d600"),
        "message_id" : "b405e634-fd4b-450c-9466-82dc0555ea06",
        "sensor" : {
                "ip_address" : "192.168.0.178",
                "location" : {
                        "latitude" : 37.89155,
                        "longitude" : -122.4464
                }
        },
        "time" : {
                "valid" : "Sun Dec 06 2009 10:18:22 GMT-0800 (PST)",
                "transaction" : "Sat Dec 12 2009 01:52:42 GMT-0800 (PST)"
        },
        "observation" : {
                "WaterTemperature" : 23.45,
                "SpecificConductivity" : 35.4,
                "Conductivity" : 139.6,
                "Resistivity" : 899.07,
                "Salinity" : 0.02,
                "Pressure" : 0.693,
                "Depth" : 2.224,
                "pH" : 6.25,
                "pHmV" : -76,
                "Turbidity" : 0.2,
                "ODOSaturation" : 31.3,
                "ODO" : 54.83,
                "Battery" : 1.1
        }
}

MapReduce?

In order to proceed with this section, make sure you have the necessary background in the programming model “MapReduce?“. The recommended documentation and tutorials are as follows:

  • Introduction to MapReduce: this training video class describes the MapReduce? concepts using Hadoop and the Hadoop Distributed File System, which can be directly related to the mongoDB’s implementation; A Must watching before proceeding;
  • mongoDB’s MapReduce HowTo: this is the main documentation of the MapReduce? implementation and use on mongoDB. This covers the basic and how the functions “map” and “reduce” can be implemented for a given collection of documents.

The first basic example of the use of MapReduce? in distribute system is counting. In my opinion, it is a good example on how to have the counting process spread out into different machines. By using the regular client process “mongo”, access the database “netbeams”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo netbeams
MongoDB shell version: 1.2.0
url: netbeams
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

At this point, you’re connected to the server running in the main host. Refer to the setup process described in the beginning of this documentation for more details. Our goal is to report the number of collected data from different servers given by the IP address of them. In this case, our strategy is to define a map function that emits the value 1 as the counter, and use a reduce function to count the consolidated result after the mongoDB’s MapReduce? engine returns the intermediary results to be reduced.

  • The Map function: The following defines the single map function that defines the key as the IP address of the sensor, and the count as the value. Note that mongoDB’s implementation differs from the Hadoop implementation. It does not include the key as a parameter to the map function, because it uses the concept of “this”, that refers to the collection object being used during the execution.
> m1 = function () {
    emit(this.sensor.ip_address, {count:1});
}
  • The Reduce function: the following defines the single reduce function that receives the consolidated results mapping the given keys (ip addresses) and the counting values found. The function iterates over the values returned and increments the total variable with the value of the variable “count”, which in this case is equals to “1” on each of the elements. The “…” are the spaces returned from the mongoDB client shell”. The result is returned using the key “count”.
> r1 = function (key, values) {
    var total = 0;
    for (var i = 0; i < values.length; i++) {
        total += values[i].count;
    }
    return {count:total};
}

By defining each of the function “map” and “reduce”, you can use the collection function “db.collection.mapReduce”, using the function references as parameters. The following listing shows the execution of the command using the mongoDB’s shell, displaying the definition of each of the “map” and “reduce” functions before the execution:

> res = db.SondeDataContainer.mapReduce(m1, r1);
Sun Dec 20 14:26:02 Request::process ns: netbeams.$cmd msg id:-1701410106 attempt: 0
Sun Dec 20 14:26:02 single query: netbeams.$cmd  { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va... }  ntoreturn: -1

After processing the execution of the function on each of the shards, the cluster head process “mongos” returns the values and consolidates the results. The output is temporarily stored in a collection called “dbres.result?“, saving the values on a separate chunk. The output is shown as follows:

Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 creating new connection for pool to:192.168.1.2:20002
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
{
        "result" : "tmp.mr.mapreduce_1261348395_10",
        "shardCounts" : {
                "192.168.1.2:20002" : {
                        "input" : 2364851,
                        "emit" : 2364851,
                        "output" : 254
                }
        },
        "counts" : {
                "emit" : 2364851,
                "input" : 2364851,
                "output" : 254
        },
        "ok" : 1,
        "timeMillis" : 433282,
        "timing" : {
                "shards" : 433193,
                "final" : 89
        },
        "ok" : 1,
}

As shown in this output, the MapReduce? result returns the number of counts of emit, input, and final output. Since there are 253 definitions of IP address being used on the network IP “192.168.1.254?” (0 – subnet address, 255 – broadcast address). The values are related to the total number of observations inserted during the Create operation. The Retrieve section shows the total number of documents as 2.36 million documents. Again, the output of the function “db.collection.stats()” shows the total number of documents:

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The number of “emits” is the number of total documents visited by the “map” function. The reduced is referred to the output value of the counts. In order to see the result, just access the database reference dbres.result? and use the function “find()” to list the results, as shown in the following listing, showing just 20 items from the result:

> db[res.result].find()                        
Sun Dec 20 14:34:43 Request::process ns: netbeams.tmp.mr.mapreduce_1261348395_10 msg id:-1701410105 attempt: 0
Sun Dec 20 14:34:43 single query: netbeams.tmp.mr.mapreduce_1261348395_10  {}  ntoreturn: 0
Sun Dec 20 14:34:43 creating new connection for pool to:192.168.1.2:20002
{ "_id" : "192.168.0.10", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.100", "value" : { "count" : 9371 } }
{ "_id" : "192.168.0.101", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.102", "value" : { "count" : 9500 } }
{ "_id" : "192.168.0.103", "value" : { "count" : 9363 } }
{ "_id" : "192.168.0.104", "value" : { "count" : 9355 } }
{ "_id" : "192.168.0.105", "value" : { "count" : 9281 } }
{ "_id" : "192.168.0.106", "value" : { "count" : 9320 } }
{ "_id" : "192.168.0.107", "value" : { "count" : 9341 } }
{ "_id" : "192.168.0.108", "value" : { "count" : 9464 } }
{ "_id" : "192.168.0.109", "value" : { "count" : 9285 } }
{ "_id" : "192.168.0.11", "value" : { "count" : 9201 } }
{ "_id" : "192.168.0.110", "value" : { "count" : 9397 } }
{ "_id" : "192.168.0.111", "value" : { "count" : 9258 } }
{ "_id" : "192.168.0.112", "value" : { "count" : 9242 } }
{ "_id" : "192.168.0.113", "value" : { "count" : 9231 } }
{ "_id" : "192.168.0.114", "value" : { "count" : 9446 } }
{ "_id" : "192.168.0.115", "value" : { "count" : 9550 } }
{ "_id" : "192.168.0.116", "value" : { "count" : 9409 } }
{ "_id" : "192.168.0.117", "value" : { "count" : 9256 } }
has more

Note that the final result shows the key “id” being the IP address, as defined during the “map” function, and the result is “value.count”, since “value” is the default output of the MapReduce? engine and “count” was used in the “reduce” function.

Other use cases can be performed. The execution of this map reduce was not fast because of the use of one single shard. MapReduce? is designed to perform related to the proportion of servers available. If the load is distributed in more shards, the execution result is returned in a faster way.

The shard logs reveals the details of the map and reduce operations. The following listing is from the log of the process “mongod” server, showing the instants of creation of the temporary database tables for intermediate results. First, the request is received and both the map and reduce is setup to be executed.

Sun Dec 20 14:26:02 query netbeams.$cmd ntoreturn:1 reslen:179 nscanned:0 { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va..., out: "tmp.mrs.SondeDataContainer_1261347962_5" }  nreturned:1 433257ms
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc

The “map phase” is first executed, and it must be completely executed before the “reduce phase” takes place. In the scenario used to count the number of documents per IP address, it happens in different instants as shown in the following listing. In addition, it shows the process of indexing the intermediate results during the “map phase” and saves the data into the database “netbeams.tmp.mr.mapreduce_1261347962_9_inc”:

                43700/2364851   1%
                96000/2364851   4%
                148300/2364851  6%
                200300/2364851  8%
                250900/2364851  10%
                300600/2364851  12%
                351600/2364851  14%
                403800/2364851  17%
                455800/2364851  19%
                508000/2364851  21%
                560500/2364851  23%
                601100/2364851  25%
                647500/2364851  27%
                699900/2364851  29%
                752300/2364851  31%
                804300/2364851  34%
                856100/2364851  36%
                907900/2364851  38%
                959000/2364851  40%
                1009800/2364851 42%
                1060800/2364851 44%
                1112800/2364851 47%
                1164100/2364851 49%
                1209400/2364851 51%
                1253700/2364851 53%
                1305400/2364851 55%
                1350900/2364851 57%
                1401700/2364851 59%
                1453100/2364851 61%
                1503100/2364851 63%
                1551500/2364851 65%
                1602600/2364851 67%
                1637100/2364851 69%
                1687600/2364851 71%
                1736800/2364851 73%
                1787600/2364851 75%
                1839900/2364851 77%
                1891100/2364851 79%
                1941400/2364851 82%
                1989900/2364851 84%
                2041800/2364851 86%
                2094300/2364851 88%
                2145500/2364851 90%
                2193500/2364851 92%
                2245100/2364851 94%
                2296200/2364851 97%
                2341700/2364851 99%
Sun Dec 20 14:28:24 building new index on { 0: 1 } for netbeams.tmp.mr.mapreduce_1261347962_9_inc...
Sun Dec 20 14:28:24 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9_inc idxNo:0
       { ns: "netbeams.tmp.mr.mapreduce_1261347962_9_inc", key: { 0: 1 }, name: "0_1" }
Sun Dec 20 14:28:40      external sort used : 0 files  in 16 secs
Sun Dec 20 14:28:46 done for 1796343 records 22.486secs
Sun Dec 20 14:28:24 insert netbeams.system.indexes 22486ms
Sun Dec 20 14:28:47 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261347962_9...
Sun Dec 20 14:28:47 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9 idxNo:0
      { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261347962_9", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:28:47 done for 0 records 0.02secs

The execution of the “reduce phase” stars and processes the intermediate results of the “map phase”, saving the final results in the new temporary database “netbeams.tmp.mr.mapreduce_1261348395_10”.

                100/1796343     0%
                200/1796343     0%
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 end connection 192.168.1.10:38231
Sun Dec 20 14:33:15 connection accepted from 192.168.1.10:44062 #15
Sun Dec 20 14:33:15 connection accepted from 192.168.1.2:60641 #16
Sun Dec 20 14:33:15 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261348395_10...
Sun Dec 20 14:33:15 Buildindex netbeams.tmp.mr.mapreduce_1261348395_10 idxNo:0
         { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261348395_10", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:33:15 done for 0 records 0secs
Sun Dec 20 14:33:15  mapreducefinishcommand netbeams.tmp.mr.mapreduce_1261348395_10 253
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 end connection 192.168.1.2:60641
Sun Dec 20 14:33:15 end connection 192.168.1.10:44062
Sun Dec 20 14:34:43 connection accepted from 192.168.1.10:44063 #17

NOTE: If the results are important, make sure to save the temporary results into a new database system, since the results returned by a map-reduce function are purged upon new access of the server through the mongoDB client.

Advertisements

TF-IDF in Hadoop Part 3: Documents in Corpus and TFIDF Computation

The previous 2 parts of this post did the small part of the job for calculating the TF-IDF for each “term” in different documents in “corpus”. Since the implementation depends on concepts of Information Retrieval, specially for starters in Information Retrieval, take a look at the book Christopher D. ManningPrabhakar Raghavan and Hinrich SchützeIntroduction to Information Retrieval, Cambridge University Press. 2008. The authors are professors at Stanford and Stuttgart Universities, have different exercises in the subject, and I found out good resources in Chapter 7, showing the basic concepts of the TF-IDF algorithm. As I mentioned before, I had first read the term 7 years ago when I was writing my BS in Computer Science degree report (Portuguese) for the problem of user profile matching and clustering. Interestingly enough, I started learning hadoop about 2 weeks ago and I was stoked about it, because my first contact with MapReduce was actually using mongoDB during my MS in Computer Science thesis report when I needed to generate a report over a data-centric collection of data collected from an environmental Sensor Network using the mongoDB’s MapReduce API over distributed mongoDB Shards. All in all, it seems the time to put this in practice is now :).

Job 3: Documents in Corpus and TF-IDF computation

In order to summarize the idea of scoring words based on its occurrence in corpus, I will use graphical and textual examples of the algorithm to take advantage of this post and make it clear what the exercises really are. Ricky Ho has implemented TF-IDF using Apache PIG and documented exactly the steps described by the algorithm in a very nice diagram shown below. Ricky also made a good summary about the terms “term frequency” and “inverse document frequency”, so check his website out in case you need.

The TF-IDF MapReduce Phases by Ricky Ho

This post implements the third round of the implementation, where the count of words are done by counting the size of the array that brings all the documents for each of the words, taking into consideration the output of the previous phase. Let’s take a look at the data format from the previous job and see if it matches the description of the diagram presented (note that the order of the terms are not sorted. I selected the term “therefore” at random):

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 2-word-counts/part-r-00000 | less
...
therefore@all-shakespeare       652/738781
therefore@leornardo-davinci-all.txt     124/149612
therefore@the-outline-of-science-vol1.txt       36/70650
...

The output shows each term at each document, and the number of its occurrence on the given document, accompanied by the total number of terms in the document. So, the final Mapper and Reducer were defined as follows:

  • Map:
    • Input: ((term@document), n/N)
    • Re-arrange the mapper to have the word as the key, since we need to count the number of documents where it occurs
    • Output: (term, document=n/N)
  • Reducer:
    • D = total number of documents in corpus. This can be passed by the driver as a constant;
    • d = number of documents in corpus where the term appears. It is a counter over the reduced values for each term;
    • TFIDF = n/N * log(D/d);
    • Output: ((word@document), d/D, (n/N), TFIDF)

This post shows the implementation of the third step, which counts the number of documents in which a “term” appears in each document in corpus and calculates the TF-IDF. I have made some assumptions for the final output to better present the results and of course to deal with the scope of the example.

  • The first problem was to maintain this step as the last one for the completion of the exercise. In order to do so, the calculation of the number of documents in corpus could be made by another MapReduce phase as described in the Cloudera’s documentation. However, they concluded the class slides by mentioning that this last phase could be done without such additional phase. I remembered in the classes that you use the JobConf for the purpose of parameters passed to the jobs. So, I used the FileSystem class to count the number of documents 😀 in the original input directory, since that is a constant number. I tried using the Context/Configuration classes of the Hadoop 0.20.1 API to pass that number to the last Reducer, but the get(key) returns null. So, the only way I could pass the number of documents was using the JobName 🙂 I know, it is a dirty hack, but it works;
  • Since the number of documents in corpus is small, the chances that a word appears in all documents are higher than applying the algorithm for web indexing on thousands or millions of documents. Therefore, the term log(totalDocs/docsPerWord) can result in “nulling” the result (log(3/3)=0) , so I simplified the calculation by using tfIdf = tf, since the log function results in 100% of occurrence in all documents in corpus (you could implement it as tfIdf=tf * 1 as well);
  • I decided to add more information to the output just in purpose of documentation. It shows [word@document, documentsFrequency/documentsCorpus, wordFrequency/totalWordsInDocument, TF-IDF];
  • The final result is formatted to a smaller to have only a few decimal points for purposes of displaying the values in this exercise. Therefore, in production these values matter.

Job3, Mapper

package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * WordsInCorpusTFIDFMapper implements the Job 3 specification for the TF-IDF algorithm
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordsInCorpusTFIDFMapper extends Mapper<LongWritable, Text, Text, Text> {

    public WordsInCorpusTFIDFMapper() {
    }

    /**
     * @param key is the byte offset of the current line in the file;
     * @param value is the line from the file
     * @param output has the method "collect()" to output the key,value pair
     * @param reporter allows us to retrieve some information about the job (like the current filename)
     *
     *     PRE-CONDITION: marcello@book.txt  \t  3/1500
     *     POST-CONDITION: marcello, book.txt=3/1500
     */
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] wordAndCounters = value.toString().split("\t");
        String[] wordAndDoc = wordAndCounters[0].split("@");                 //3/1500
        context.write(new Text(wordAndDoc[0]), new Text(wordAndDoc[1] + "=" + wordAndCounters[1]));
    }
}
Job3, Reducer
package index;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * WordsInCorpusTFIDFReducer calculates the number of documents in corpus that a given key occurs and the TF-IDF computation.
 * The total number of D is acquired from the job name 🙂 It is a dirty hack, but the only way I could communicate the number from
 * the driver.
 * @author Marcello de Sales (marcello.desales@gmail.com)
 */
public class WordsInCorpusTFIDFReducer extends Reducer<Text, Text, Text, Text> {

    private static final DecimalFormat DF = new DecimalFormat("###.########");

    public WordsInCorpusTFIDFReducer() {
    }

    /**
     * @param key is the key of the mapper
     * @param values are all the values aggregated during the mapping phase
     * @param context contains the context of the job run
     *
     *             PRECONDITION: receive a list of <word, ["doc1=n1/N1", "doc2=n2/N2"]>
     *             POSTCONDITION: <"word@doc1,  [d/D, n/N, TF-IDF]">
     */
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // get the number of documents indirectly from the file-system (stored in the job name on purpose)
        int numberOfDocumentsInCorpus = Integer.parseInt(context.getJobName());
        // total frequency of this word
        int numberOfDocumentsInCorpusWhereKeyAppears = 0;
        Map<String, String> tempFrequencies = new HashMap<String, String>();
        for (Text val : values) {
            String[] documentAndFrequencies = val.toString().split("=");
            numberOfDocumentsInCorpusWhereKeyAppears++;
            tempFrequencies.put(documentAndFrequencies[0], documentAndFrequencies[1]);
        }
        for (String document : tempFrequencies.keySet()) {
            String[] wordFrequenceAndTotalWords = tempFrequencies.get(document).split("/");

            //Term frequency is the quocient of the number of terms in document and the total number of terms in doc
            double tf = Double.valueOf(Double.valueOf(wordFrequenceAndTotalWords[0])
                    / Double.valueOf(wordFrequenceAndTotalWords[1]));

            //interse document frequency quocient between the number of docs in corpus and number of docs the term appears
            double idf = (double) numberOfDocumentsInCorpus / (double) numberOfDocumentsInCorpusWhereKeyAppears;

            //given that log(10) = 0, just consider the term frequency in documents
            double tfIdf = numberOfDocumentsInCorpus == numberOfDocumentsInCorpusWhereKeyAppears ?
                    tf : tf * Math.log10(idf);

            context.write(new Text(key + "@" + document), new Text("[" + numberOfDocumentsInCorpusWhereKeyAppears + "/"
                    + numberOfDocumentsInCorpus + " , " + wordFrequenceAndTotalWords[0] + "/"
                    + wordFrequenceAndTotalWords[1] + " , " + DF.format(tfIdf) + "]"));
        }
    }
}

I have implemented the TestCases for both the Mapper and Reducer classes, but for simplification of this post, I will skip those. Let’s take a look a the driver written, since it captures the total number of documents directly from the filesystem using the buil-in Hadoop API. Definitely no need for another MapReduce phase for that. As described in the Cloudera’s training, the less the better since we are saving resources utilization :). Anyway, let’s go to the Driver implementation.

Job3, Driver
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * WordFrequenceInDocument Creates the index of the words in documents,
 * mapping each of them to their frequency.
 * @author Marcello de Sales (marcello.desales@gmail.com)
 * @version "Hadoop 0.20.1"
 */
public class WordsInCorpusTFIDF extends Configured implements Tool {

    // where to put the data in hdfs when we're done
    private static final String OUTPUT_PATH = "3-tf-idf";

    // where to read the data from.
    private static final String INPUT_PATH = "2-word-counts";

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job job = new Job(conf, "Word in Corpus, TF-IDF");

        job.setJarByClass(WordsInCorpusTFIDF.class);
        job.setMapperClass(WordsInCorpusTFIDFMapper.class);
        job.setReducerClass(WordsInCorpusTFIDFReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //Getting the number of documents from the original input directory.
        Path inputPath = new Path("input");
        FileSystem fs = inputPath.getFileSystem(conf);
        FileStatus[] stat = fs.listStatus(inputPath);

        //Dirty hack to pass the total number of documents as the job name.
        //The call to context.getConfiguration.get("docsInCorpus") returns null when I tried to pass
        //conf.set("docsInCorpus", String.valueOf(stat.length)) Or even
        //conf.setInt("docsInCorpus", stat.length)
        job.setJobName(String.valueOf(stat.length));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordsInCorpusTFIDF(), args);
        System.exit(res);
    }
}

Continuing with the implementation, the only thing to do is to compile and run the final driver. Note that the input directory is the one containing the partial counts of documents of job 2, that is, “2-word-counts”. The output is the directory reserved fro step 3, or “3-tf-idf”. Then, the only way that I could send the total number in corpus was using the jobName. The Hadoop 0.20.1 API does not pass the values of the configuration at any cost. As documented, I tried using the context reference in the reducer class, but the reference only returned “null” for the call “context.getConfiguration().get(“docsInCorpus”)”. I gave up and looked for an option, and the JobName was the only way I could :).

I skipped the test session and compiled everything and ran the driver as follows:

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

compile:
    [javac] Compiling 11 source files to /home/training/git/exercises/shakespeare/bin

jar:
      [jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL

Then, finally running the calculator of words:

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordsInCorpusTFIDF
10/01/09 21:41:40 INFO input.FileInputFormat: Total input paths to process : 1
10/01/09 21:41:41 INFO mapred.JobClient: Running job: job_200912301017_0115
10/01/09 21:41:42 INFO mapred.JobClient:  map 0% reduce 0%
10/01/09 21:41:51 INFO mapred.JobClient:  map 100% reduce 0%
10/01/09 21:42:00 INFO mapred.JobClient:  map 100% reduce 100%
10/01/09 21:42:02 INFO mapred.JobClient: Job complete: job_200912301017_0115
10/01/09 21:42:02 INFO mapred.JobClient: Counters: 17
10/01/09 21:42:02 INFO mapred.JobClient:   Job Counters
10/01/09 21:42:02 INFO mapred.JobClient:     Launched reduce tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Launched map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:     Data-local map tasks=1
10/01/09 21:42:02 INFO mapred.JobClient:   FileSystemCounters
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_READ=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_READ=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4036022
10/01/09 21:42:02 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2943390
10/01/09 21:42:02 INFO mapred.JobClient:   Map-Reduce Framework
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input groups=0
10/01/09 21:42:02 INFO mapred.JobClient:     Combine output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map input records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce shuffle bytes=2017995
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce output records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Spilled Records=97558
10/01/09 21:42:02 INFO mapred.JobClient:     Map output bytes=1920431
10/01/09 21:42:02 INFO mapred.JobClient:     Combine input records=0
10/01/09 21:42:02 INFO mapred.JobClient:     Map output records=48779
10/01/09 21:42:02 INFO mapred.JobClient:     Reduce input records=48779

Note that the final number of values are the same as the previous Job step. So, everything went ok as expected. Taking a look and the file in the output directory:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls 3-tf-idf
Found 2 items
drwxr-xr-x   - training supergroup          0 2010-01-09 21:41 /user/training/3-tf-idf/_logs
-rw-r--r--   1 training supergroup    2943390 2010-01-09 21:41 /user/training/3-tf-idf/part-r-00000

I decided to take a look at the same word I mentioned above “therefore”. Here’s the result for them, this time the output was automatically sorted by Hadoop.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 3-tf-idf/part-r-00000 | less
...
abook@leornardo-davinci-all.txt [1/3 , 3/149612 , 0.00000957]
aboriginal@the-outline-of-science-vol1.txt      [1/3 , 1/70650 , 0.00000675]
abortive@all-shakespeare        [2/3 , 4/738781 , 0.00000095]
...
therefore@all-shakespeare       [3/3 , 652/738781 , 0.00088253]
therefore@the-outline-of-science-vol1.txt       [3/3 , 36/70650 , 0.00050955]
therefore@leornardo-davinci-all.txt     [3/3 , 124/149612 , 0.00082881]
...

Taking a look at chapter 7 of the book, it is clear to make the following conclusions about the output presented:

  • The term “therefore” is more relevant in the document “all-shakespeare”, since its occurrence is more likely to happen than in the other documents;
  • Other terms that does not appear in all documents such as “abook”, “aboriginal” and “abortive” have very small relevance for the given corpus of documents.

What’s Next?

I had so much fun with my first contact with Hadoop that I am going to the Cloudera Training here in the Bay Area in about 10 days. Although the training covers the Hadoop 0.18 API, I decided to use the Hadoop 0.20.1 API because I just wanted to try a “cleaner” API.

As for next steps from this exercise, one could do the categorized classification of the terms per document in a data-centric way (document -> term -> tf-idf), or whatever your need is. Time to go play with Apache PIG and HBase.

%d bloggers like this: