Archive

Archive for the ‘mongodb’ Category

mongoDB Shards, Cluster and MapReduce: experiments for Sensor Networks

This documents the use of mongoDB as the persistence layer for the collected data from NetBEAMS. It is divided into sections of setup and the CRUD (Create, Retrieve, Update, Delete) operations, as well as advanced topics such as data replication, the use of MapReduce, etc. This document is a copy of the experiments performed for my Masters Thesis Report entitled “A Key-Value-Based Persistence Layer for Sensor Networks“. The original wiki documentation can be found at MongoDBShardsClusterAndMapReduce.

The setup of the mongoDB shards must be performed on each cluster node. First, the relevant processes are started, and then the cluster must be configured with each of the shards, as well as indexes of the collections to be used. Before continuing on this section, refer to the following documentation:

In order to start collecting data, the mongoDB’s server must be set up on a single or distributed way. Using the distributed cluster version requires starting the commands on the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ ps aux | grep mongo
marcello  3391  0.0  0.2  67336  3328 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-1/ --port 20001
marcello  3397  0.0  0.2  59140  3280 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-2/ --port 20002
marcello  3402  0.0  0.2  59140  3276 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/shard-3/ --port 20003
marcello  3406  0.0  0.3 157452  3980 pts/1    Sl   12:38   0:01 mongod --dbpath data/shards/config --port 10000
marcello  3431  0.4  0.2  62004  3332 pts/1    Sl   12:38   0:35 mongos -vvv --configdb localhost:10000
marcello  3432  0.0  0.0   5196   704 pts/1    S    12:38   0:00 tee logs/mongos-cluster-head.log
In summary, these processes are defined as follows:
  • Shards Node: each shard process “mongod” is responsible for managing its own “chunks” of data on a given “dbpath” directory, on a given port number. These processes are used by the cluster head “mongos”;
  • Cluster Metadata Server Node: the main metadata server of the cluster can be located on a local or foreign host. This listing above shows the metadata server “config” located in the same server, managed by the “mongod” process. It carries information about the databases, the list of shards, and the list of “chunks” of each database, including the location “Ip_address:port” of them;
  • Cluster Head Server: the orchestration of the cluster is performed by the “mongos” process. It connects to the cluster head to select which shard to be used, statistics about counters, etc. This is the main process that accepts the client requests.

Make sure to proxy the output of the processes to log files. As shown in the Listing above, the process “tee” is capturing the output for the process “mongos”. mongoDB’s process has additional parameters for that matter as well.

Considering that the proper processes are running, specially the metadata server and the main cluster head, the client process can be started to issue the commands to enable shards on a given database system. Since mongoDB client’s interface uses Javascript as the main programming language abstraction to manipulate data, a script can be used to automate the process of setting up the server. Before continuing, make sure you have covered the mongoDB’s documentation on how to setup database shards:

First, connect to the server using the client process “mongo”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo
MongoDB shell version: 1.2.0
url: test
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

After connected to the server through the client, get references to 2 important databases: the “admin” and “config”. The “admin” is a database system responsible for running commands to the cluster server, while the “config” is the reference to the metadata server. The following listing shows the use of the method “db.getSisterDB()” to retrieve those references:

> admin = db.getSisterDB("admin")
admin
> config = db.getSisterDB("config")
config
> 

Once the references are available, the use of the names as shortcuts makes the access better. Let’s add each shards that are running on the local and on the foreign servers (192.168.1.2) on different communication ports. It is important to note that the issued commands are executed on the metadata server “config”.

> admin.runCommand( { addshard: "192.168.1.2:20001" } )
Sun Dec 20 16:04:02 Request::process ns: admin.$cmd msg id:-2097268492 attempt: 0
Sun Dec 20 16:04:02 single query: admin.$cmd  { addshard: "192.168.1.2:20001" }  ntoreturn: -1

> admin.runCommand( { addshard: "192.168.1.2:20002" } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268491 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "192.168.1.2:20002" }  ntoreturn: -1

> admin.runCommand( { addshard: "localhost:20001", allowLocal: true } )
> 

In order to be added into the list, a shard server must be running. In case the shard is down at this point, it will be not added into the list of available shards. On the other hand, if it is added and it goes down, the mongos keeps sending heartbeat to verify if the shard has come back. Anyway, use the command “listshards” to list the existing shards that the cluster head can use.

> admin.runCommand( { listshards:1 } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268490 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { addshard: "localhost:20001", allowLocal: true }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268489 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { listshards: 1.0 }  ntoreturn: -1
{
        "shards" : [
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ea"),
                        "host" : "192.168.1.2:20001"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6eb"),
                        "host" : "192.168.1.2:20002"
                },
                {
                        "_id" : ObjectId("4b2e8b3f5e90e01ce34de6ec"),
                        "host" : "localhost:20001"
                }
        ],
        "ok" : 1
}
> 

Enabling the shards means to give the metadata server “config” the name of the database to be sharded, as well as the definition of the shard keys. The function “enablesharding” receives the name of the database system. The following listing shows the database “netbeams” being enabled. Later, the definition of the shard key must be given, as the key “observation.pH” is defined as the shard key:

> admin.runCommand({enablesharding:"netbeams"})
{"ok" : 1}
admin.runCommand( { shardcollection: "netbeams.SondeDataContainer", key: { "observation.pH" : 1} } )
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268488 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { enablesharding: "netbeams" }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: admin.$cmd msg id:-2097268487 attempt: 0
Sun Dec 20 16:04:03 single query: admin.$cmd  { shardcollection: "netbeams.SondeDataContainer", key: { observation.pH: 1.0 } }  ntoreturn: -1
{"collectionsharded" : "netbeams.SondeDataContainer" , "ok" : 1}
> 

The chunks show the different sections of the data. By using the reference to the metadata database server, list the different shards “config.chunks.find()” to list the documents.

> config.chunks.find()
{ "lastmod" : { "t" : 1261341503000, "i" : 1 }, "ns" : "netbeams.SondeDataContainer", "min" : { "observation" : { "pH" : { $minKey : 1 } } },
"minDotted" : { "observation.pH" : { $minKey : 1 } }, "max" : { "observation" : { "pH" : { $maxKey : 1 } } }, "maxDotted" : { "observation.pH" : { $maxKey : 1 } },
"shard" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec9") }
> 

The next step is to create the indexes of the expected keys. This procedure can be defined after the documents are inserted. In general, defining indexes slows down on “Create” operations, but speeds up “Retrieval” ones. In order to proceed, make sure you have covered the documentation on mongoDB’s Indexes.

  • mongoDB Indexes: this is the documentation regarding indexes of keys on mongoDB.

Note, in the following Listing, that the keys are written to the metadata server “config”. A reference to the database “netbeams” is acquired by using the function “db.getSisterDB()” as it was used for the databases “config” and “admin”. The method “db.collection.ensureIndex()” is used.

> netbeams = db.getSisterDB("netbeams")
netbeams
> netbeams.SondeDataContainer.ensureIndex( { "message_id":1 } )
Sun Dec 20 16:04:03 Request::process ns: netbeams.system.indexes msg id:-2097268486 attempt: 0
Sun Dec 20 16:04:03  .system.indexes write for: netbeams.system.indexes
Sun Dec 20 16:04:03 Request::process ns: netbeams.$cmd msg id:-2097268485 attempt: 0
Sun Dec 20 16:04:03 single query: netbeams.$cmd  { getlasterror: 1.0 }  ntoreturn: -1
Sun Dec 20 16:04:03 Request::process ns: test.$cmd msg id:-2097268484 attempt: 0
Sun Dec 20 16:04:03 single query: test.$cmd  { getlasterror: 1.0 }  ntoreturn: -1

netbeams.SondeDataContainer.ensureIndex( { "sensor.ip_address":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.latitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "sensor.location.longitude":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.valid":1 } )
netbeams.SondeDataContainer.ensureIndex( { "time.transaction":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.WaterTemperature":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.SpecificConductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Conductivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Resistivity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Salinity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Pressure":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Depth":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pH":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.pHmV":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Turbidity":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODOSaturation":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.ODO":1 } )
netbeams.SondeDataContainer.ensureIndex( { "observation.Battery":1 } )

Actually, you can verify the setup performed by accessing each of the collections of the config server. Using a client to access the server in a different shell, you can directly access and modify (NOT RECOMMENDED) the settings of the metadata server, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo config
MongoDB shell version: 1.2.0
url: config
connecting to: config
type "help" for help
> Sun Dec 20 16:31:57 connection accepted from 127.0.0.1:48589 #7
show collections
Sun Dec 20 16:32:01 Request::process ns: config.system.namespaces msg id:-128400130 attempt: 0
Sun Dec 20 16:32:01 single query: config.system.namespaces  { query: {}, orderby: { name: 1.0 } }  ntoreturn: 0
chunks
databases
shards
system.indexes
version

So the method “find()” can be used to list the contents of each of the collections. An example is to list the databases configured, showing the properties of each of them (partitioned or not, server host, etc), as shown in the following listing.

> db.databases.find()
Sun Dec 20 16:47:48 Request::process ns: config.databases msg id:-128400129 attempt: 0
Sun Dec 20 16:47:48 single query: config.databases  {}  ntoreturn: 0
{ "name" : "admin", "partitioned" : false, "primary" : "localhost:10000", "_id" : ObjectId("4b2e8b3fb342bcd910b62ec7") }
{ "name" : "netbeams", "partitioned" : true, "primary" : "192.168.1.2:20002",
                  "sharded" : { "netbeams.SondeDataContainer" : { "key" : { "observation" : { "pH" : 1 } }, "unique" : false } },
                  "_id" : ObjectId("4b2e8b3fb342bcd910b62ec8") }
{ "name" : "test", "partitioned" : false, "primary" : "192.168.1.2:20002", "_id" : ObjectId("4b2e8b3fb342bcd910b62eca") }

Before proceeding, make sure you have covered the basics of mongoDB use:

Using the mongoDB client process “mongo”, access a given server “mongos” or “mongod”. The client access to “mongos” process executes the commands in the context of the entire cluster through the use of the metadata server “config”, while the “mongod” is used to access a given shard server, if necessary for debug processes. Use the commands specifying the server location and which database to use. The following listing shows the command to access a given shard on a given port, using the database “netbeams”.

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo 192.168.1.2:20001/netbeams
MongoDB shell version: 1.2.0
url: 192.168.1.2:20001/netbeams
connecting to: 192.168.1.2:20001/netbeams
type "help" for help

In order to verify the stats of a collection, use the function “collection.stats()”. This function verifies the counters stored in the metadata server.

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The access of a given document is randomly chosen from one of the shards by using the function “collection.findOne()”. It is a way to verify one example of the collected data.

> db.SondeDataContainer.findOne()
Sun Dec 20 14:59:08 Request::process ns: netbeams.SondeDataContainer msg id:-1701410103 attempt: 0
Sun Dec 20 14:59:08 shard query: netbeams.SondeDataContainer  {}
Sun Dec 20 14:59:08  have to set shard version for conn: 0x2909de0 ns:netbeams.SondeDataContainer my last seq: 0  current: 4
Sun Dec 20 14:59:08     setShardVersion  192.168.1.2:20002  netbeams.SondeDataContainer  { setShardVersion: "netbeams.SondeDataContainer",
configdb: "localhost:10000", version: Timestamp 1261341503000|1, serverID: ObjId(4b2e8b3eb342bcd910b62ec6) } 0x2909de0
Sun Dec 20 14:59:08       setShardVersion success!
{
        "_id" : ObjectId("e26f40072f68234b6af3d600"),
        "message_id" : "b405e634-fd4b-450c-9466-82dc0555ea06",
        "sensor" : {
                "ip_address" : "192.168.0.178",
                "location" : {
                        "latitude" : 37.89155,
                        "longitude" : -122.4464
                }
        },
        "time" : {
                "valid" : "Sun Dec 06 2009 10:18:22 GMT-0800 (PST)",
                "transaction" : "Sat Dec 12 2009 01:52:42 GMT-0800 (PST)"
        },
        "observation" : {
                "WaterTemperature" : 23.45,
                "SpecificConductivity" : 35.4,
                "Conductivity" : 139.6,
                "Resistivity" : 899.07,
                "Salinity" : 0.02,
                "Pressure" : 0.693,
                "Depth" : 2.224,
                "pH" : 6.25,
                "pHmV" : -76,
                "Turbidity" : 0.2,
                "ODOSaturation" : 31.3,
                "ODO" : 54.83,
                "Battery" : 1.1
        }
}

MapReduce?

In order to proceed with this section, make sure you have the necessary background in the programming model “MapReduce?“. The recommended documentation and tutorials are as follows:

  • Introduction to MapReduce: this training video class describes the MapReduce? concepts using Hadoop and the Hadoop Distributed File System, which can be directly related to the mongoDB’s implementation; A Must watching before proceeding;
  • mongoDB’s MapReduce HowTo: this is the main documentation of the MapReduce? implementation and use on mongoDB. This covers the basic and how the functions “map” and “reduce” can be implemented for a given collection of documents.

The first basic example of the use of MapReduce? in distribute system is counting. In my opinion, it is a good example on how to have the counting process spread out into different machines. By using the regular client process “mongo”, access the database “netbeams”, as shown in the following listing:

marcello@netbeams-mongo-dev02:~/development/workspaces/netbeams/persistence$ mongo netbeams
MongoDB shell version: 1.2.0
url: netbeams
connecting to: netbeams
Sun Dec 20 14:22:49 connection accepted from 127.0.0.1:39899 #5
type "help" for help

At this point, you’re connected to the server running in the main host. Refer to the setup process described in the beginning of this documentation for more details. Our goal is to report the number of collected data from different servers given by the IP address of them. In this case, our strategy is to define a map function that emits the value 1 as the counter, and use a reduce function to count the consolidated result after the mongoDB’s MapReduce? engine returns the intermediary results to be reduced.

  • The Map function: The following defines the single map function that defines the key as the IP address of the sensor, and the count as the value. Note that mongoDB’s implementation differs from the Hadoop implementation. It does not include the key as a parameter to the map function, because it uses the concept of “this”, that refers to the collection object being used during the execution.
> m1 = function () {
    emit(this.sensor.ip_address, {count:1});
}
  • The Reduce function: the following defines the single reduce function that receives the consolidated results mapping the given keys (ip addresses) and the counting values found. The function iterates over the values returned and increments the total variable with the value of the variable “count”, which in this case is equals to “1” on each of the elements. The “…” are the spaces returned from the mongoDB client shell”. The result is returned using the key “count”.
> r1 = function (key, values) {
    var total = 0;
    for (var i = 0; i < values.length; i++) {
        total += values[i].count;
    }
    return {count:total};
}

By defining each of the function “map” and “reduce”, you can use the collection function “db.collection.mapReduce”, using the function references as parameters. The following listing shows the execution of the command using the mongoDB’s shell, displaying the definition of each of the “map” and “reduce” functions before the execution:

> res = db.SondeDataContainer.mapReduce(m1, r1);
Sun Dec 20 14:26:02 Request::process ns: netbeams.$cmd msg id:-1701410106 attempt: 0
Sun Dec 20 14:26:02 single query: netbeams.$cmd  { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va... }  ntoreturn: -1

After processing the execution of the function on each of the shards, the cluster head process “mongos” returns the values and consolidates the results. The output is temporarily stored in a collection called “dbres.result?“, saving the values on a separate chunk. The output is shown as follows:

Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 creating new connection for pool to:192.168.1.2:20002
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
{
        "result" : "tmp.mr.mapreduce_1261348395_10",
        "shardCounts" : {
                "192.168.1.2:20002" : {
                        "input" : 2364851,
                        "emit" : 2364851,
                        "output" : 254
                }
        },
        "counts" : {
                "emit" : 2364851,
                "input" : 2364851,
                "output" : 254
        },
        "ok" : 1,
        "timeMillis" : 433282,
        "timing" : {
                "shards" : 433193,
                "final" : 89
        },
        "ok" : 1,
}

As shown in this output, the MapReduce? result returns the number of counts of emit, input, and final output. Since there are 253 definitions of IP address being used on the network IP “192.168.1.254?” (0 – subnet address, 255 – broadcast address). The values are related to the total number of observations inserted during the Create operation. The Retrieve section shows the total number of documents as 2.36 million documents. Again, the output of the function “db.collection.stats()” shows the total number of documents:

> db.SondeDataContainer.stats()
Sun Dec 20 14:54:24 Request::process ns: netbeams.$cmd msg id:-1701410104 attempt: 0
Sun Dec 20 14:54:24 single query: netbeams.$cmd  { collstats: "SondeDataContainer" }  ntoreturn: -1
Sun Dec 20 14:54:24 passing through unknown command: collstats { collstats: "SondeDataContainer" }
{
        "ns" : "netbeams.SondeDataContainer",
        "count" : 2364851,
        "size" : 1155567036,
        "storageSize" : 1416246240,
        "nindexes" : 40,
        "ok" : 1
}

The number of “emits” is the number of total documents visited by the “map” function. The reduced is referred to the output value of the counts. In order to see the result, just access the database reference dbres.result? and use the function “find()” to list the results, as shown in the following listing, showing just 20 items from the result:

> db[res.result].find()                        
Sun Dec 20 14:34:43 Request::process ns: netbeams.tmp.mr.mapreduce_1261348395_10 msg id:-1701410105 attempt: 0
Sun Dec 20 14:34:43 single query: netbeams.tmp.mr.mapreduce_1261348395_10  {}  ntoreturn: 0
Sun Dec 20 14:34:43 creating new connection for pool to:192.168.1.2:20002
{ "_id" : "192.168.0.10", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.100", "value" : { "count" : 9371 } }
{ "_id" : "192.168.0.101", "value" : { "count" : 9408 } }
{ "_id" : "192.168.0.102", "value" : { "count" : 9500 } }
{ "_id" : "192.168.0.103", "value" : { "count" : 9363 } }
{ "_id" : "192.168.0.104", "value" : { "count" : 9355 } }
{ "_id" : "192.168.0.105", "value" : { "count" : 9281 } }
{ "_id" : "192.168.0.106", "value" : { "count" : 9320 } }
{ "_id" : "192.168.0.107", "value" : { "count" : 9341 } }
{ "_id" : "192.168.0.108", "value" : { "count" : 9464 } }
{ "_id" : "192.168.0.109", "value" : { "count" : 9285 } }
{ "_id" : "192.168.0.11", "value" : { "count" : 9201 } }
{ "_id" : "192.168.0.110", "value" : { "count" : 9397 } }
{ "_id" : "192.168.0.111", "value" : { "count" : 9258 } }
{ "_id" : "192.168.0.112", "value" : { "count" : 9242 } }
{ "_id" : "192.168.0.113", "value" : { "count" : 9231 } }
{ "_id" : "192.168.0.114", "value" : { "count" : 9446 } }
{ "_id" : "192.168.0.115", "value" : { "count" : 9550 } }
{ "_id" : "192.168.0.116", "value" : { "count" : 9409 } }
{ "_id" : "192.168.0.117", "value" : { "count" : 9256 } }
has more

Note that the final result shows the key “id” being the IP address, as defined during the “map” function, and the result is “value.count”, since “value” is the default output of the MapReduce? engine and “count” was used in the “reduce” function.

Other use cases can be performed. The execution of this map reduce was not fast because of the use of one single shard. MapReduce? is designed to perform related to the proportion of servers available. If the load is distributed in more shards, the execution result is returned in a faster way.

The shard logs reveals the details of the map and reduce operations. The following listing is from the log of the process “mongod” server, showing the instants of creation of the temporary database tables for intermediate results. First, the request is received and both the map and reduce is setup to be executed.

Sun Dec 20 14:26:02 query netbeams.$cmd ntoreturn:1 reslen:179 nscanned:0 { mapreduce: "SondeDataContainer", map: function () {
    emit(this.sensor.ip_address, {count:1});
}, reduce: function (key, values) {
    var total = 0;
    for (var i = 0; i < va..., out: "tmp.mrs.SondeDataContainer_1261347962_5" }  nreturned:1 433257ms
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9
Sun Dec 20 14:26:02 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc

The “map phase” is first executed, and it must be completely executed before the “reduce phase” takes place. In the scenario used to count the number of documents per IP address, it happens in different instants as shown in the following listing. In addition, it shows the process of indexing the intermediate results during the “map phase” and saves the data into the database “netbeams.tmp.mr.mapreduce_1261347962_9_inc”:

                43700/2364851   1%
                96000/2364851   4%
                148300/2364851  6%
                200300/2364851  8%
                250900/2364851  10%
                300600/2364851  12%
                351600/2364851  14%
                403800/2364851  17%
                455800/2364851  19%
                508000/2364851  21%
                560500/2364851  23%
                601100/2364851  25%
                647500/2364851  27%
                699900/2364851  29%
                752300/2364851  31%
                804300/2364851  34%
                856100/2364851  36%
                907900/2364851  38%
                959000/2364851  40%
                1009800/2364851 42%
                1060800/2364851 44%
                1112800/2364851 47%
                1164100/2364851 49%
                1209400/2364851 51%
                1253700/2364851 53%
                1305400/2364851 55%
                1350900/2364851 57%
                1401700/2364851 59%
                1453100/2364851 61%
                1503100/2364851 63%
                1551500/2364851 65%
                1602600/2364851 67%
                1637100/2364851 69%
                1687600/2364851 71%
                1736800/2364851 73%
                1787600/2364851 75%
                1839900/2364851 77%
                1891100/2364851 79%
                1941400/2364851 82%
                1989900/2364851 84%
                2041800/2364851 86%
                2094300/2364851 88%
                2145500/2364851 90%
                2193500/2364851 92%
                2245100/2364851 94%
                2296200/2364851 97%
                2341700/2364851 99%
Sun Dec 20 14:28:24 building new index on { 0: 1 } for netbeams.tmp.mr.mapreduce_1261347962_9_inc...
Sun Dec 20 14:28:24 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9_inc idxNo:0
       { ns: "netbeams.tmp.mr.mapreduce_1261347962_9_inc", key: { 0: 1 }, name: "0_1" }
Sun Dec 20 14:28:40      external sort used : 0 files  in 16 secs
Sun Dec 20 14:28:46 done for 1796343 records 22.486secs
Sun Dec 20 14:28:24 insert netbeams.system.indexes 22486ms
Sun Dec 20 14:28:47 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261347962_9...
Sun Dec 20 14:28:47 Buildindex netbeams.tmp.mr.mapreduce_1261347962_9 idxNo:0
      { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261347962_9", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:28:47 done for 0 records 0.02secs

The execution of the “reduce phase” stars and processes the intermediate results of the “map phase”, saving the final results in the new temporary database “netbeams.tmp.mr.mapreduce_1261348395_10”.

                100/1796343     0%
                200/1796343     0%
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mr.mapreduce_1261347962_9_inc
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 end connection 192.168.1.10:38231
Sun Dec 20 14:33:15 connection accepted from 192.168.1.10:44062 #15
Sun Dec 20 14:33:15 connection accepted from 192.168.1.2:60641 #16
Sun Dec 20 14:33:15 building new index on { _id: ObjId(000000000000000000000000) } for netbeams.tmp.mr.mapreduce_1261348395_10...
Sun Dec 20 14:33:15 Buildindex netbeams.tmp.mr.mapreduce_1261348395_10 idxNo:0
         { name: "_id_", ns: "netbeams.tmp.mr.mapreduce_1261348395_10", key: { _id: ObjId(000000000000000000000000) } }
Sun Dec 20 14:33:15 done for 0 records 0secs
Sun Dec 20 14:33:15  mapreducefinishcommand netbeams.tmp.mr.mapreduce_1261348395_10 253
Sun Dec 20 14:33:15 CMD: drop netbeams.tmp.mrs.SondeDataContainer_1261347962_5
Sun Dec 20 14:33:15 ~ScopedDBConnection: _conn != null
Sun Dec 20 14:33:15 end connection 192.168.1.2:60641
Sun Dec 20 14:33:15 end connection 192.168.1.10:44062
Sun Dec 20 14:34:43 connection accepted from 192.168.1.10:44063 #17

NOTE: If the results are important, make sure to save the temporary results into a new database system, since the results returned by a map-reduce function are purged upon new access of the server through the mongoDB client.