Map Reduce
- Prashant Kumar
- Jun 13, 2022
- 6 min read
Updated: Jun 16, 2022
As the name suggests, Map Reduce programming model consists of two main methods, Map and Reduce. The core idea behind this model is to first divide the raw data into specific categories using a map function and then use the reduce function to convert these categorised data into final derived dataset.

In implementation, Map function writes the raw data in form of key value pairs in multiple intermediate files. Reduce function collates the data from all these intermediate files and based on some logic, generates the final output files normally with single value for each distinct key. This two step process greatly simplifies the implementation for many real world problems expressible in this model. Examples of such problems can be deriving some summary from crawled data, inverted indices, occurrences of most important words in a document, reverse web link graph etc.
Implementation
Having the basics covered, best intuition about Map reduce you can get is by actually implementing one, so let's get started with that. We would take Distinct word count as an example here. For the code, I'm using golang for multiple reasons of convenience. Different basic components can be summarised as follows:
Coordinator - We would need a continuous running program which would keep track of all the map tasks, reduce tasks and the workers. This program would also open an rpc connection to listen to the workers.
import (
"net"
"net/http"
"net/rpc"
"os"
"strconv"
)
type Coordinator struct {
}
func coordinator_sock() string {
s := "/var/tmp/my-test-map-reduce-"
s += strconv.Itoa(os.Getuid())
return s
}
func (c * Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
sock_name := coordinator_sock()
os.Remove(sock_name)
listner, exc := net.Listen("unix", sock_name)
go http.Serve(listner, nil)
}
There would be a single running instance of the coordinator. At the start of the server, coordinator can initialise a map for map tasks and a map for reduce tasks. These maps would be used to track the progress and completion of each individual map and reduce tasks. Once all the map and reduce tasks are completed, Coordinator can end its process.
Worker - A worker process would run respective map and reduce methods for a given set of raw data. Data input can be in the form of files or data stream. For the sake of simplicity, we would use files as input source here. There can be multiple instances of worker running in parallel.
First step of the worker would be to ask for a task from coordinator.
type TaskRequestArgs {
Tasktype string
}
type TaskRequestReply struct {
Filename string
TaskNumber string
}
func AskForTask(task_type string) (string, int) {
args := TaskRequestArgs{}
reply := TaskReplyArgs{}
args.Tasktype = task_type
// ReturnTask is a method in coordinator which returns tasks
ok := call("Coordinator.ReturnTask", &args, &reply)
if ok {
return reply.Filename, reply.TaskNumber
}
fmt.Printf("call Failed")
return "", 0
}
Method to return task in the coordinator:
func (c *Coordinator) ReturnTask(args *ExampleArgs, reply *ExampleReply) error {
if args.Task_Type == "map_task" {
for _, task := range c.map_tasks {
if task["is_processed"] == true {
continue
}
c.Lock.Lock()
task["is_processed"] = true
c.Lock.Unlock()
reply.Filename = task["filename"].(string)
reply.TaskNumber = task["task_number"].(int)
return nil
}
}
if args.Task_Type == "reduce_task" {
for _, task := range c.reduce_tasks {
if task["is_processed"] == true {
continue
}
c.Lock.Lock()
task["is_processed"] = true
c.Lock.Unlock()
reply.Filename = task["filename"].(string)
return nil
}
}
return nil
}Here, we're also marking the new task as processed to track the tasks already picked by any one of the workers. Also, notice the lock which is needed to handle race conditions when multiple workers would be trying to read or update the task variable. We would use plugin package to load map or reduce files and their methods in the worker. Map function is defined with name Map and a reduce function is defined with name Reduce. Here is an example of plugin package to load Map and Reduce methods:
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so")
os.Exit(1)
}
map_function, reduce_function := load_plugin(os.Args[1])
mr.Worker(map_function, reduce_function)
}
func load_plugin(file_name string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
file, err := plugin.Open(file_name)
map_function_temp, err := file.Lookup("Map")
map_function := map_function_temp.(func(string, string) []mr.KeyValue)
reduce_function_temp = file.Lookup("Reduce")
reduce_function = reduce_function_temp.(func(string, []string) string)
return map_function, reduce_function
}Once the coordinator returns a map or a reduce task, main worker function executes the corresponding map and reduce methods.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
is_complete := execute_map_task(mapf)
if is_complete {
break
}
}
time.sleep(10)
for {
is_complete := execute_reduce_task(reducef)
if is_complete {
break
}
}
}An example of Map Reduce file for counting each word in a text file:
package main
import (
"strconv"
"strings"
"unicode"
"map_reduce/mr"
)
type KeyValue struct {
Key string
Value string
}
func Map(filename string, contents string) []KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []KeyValue{}
for _, w := range words {
kv := KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}Map Function
In this specific case of finding word count for each word in a file, execute_map_task would first call the Map function and get a list of maps in return. For each map, key would be the word and value would be 1 representing occurrence each word. execute_map_task would then create intermediate files with name format as mr-{map_task_number}-{reduce_task_number}. map_task_number can be the index of map task being picked up. There would be one map task for each input file provided at the beginning of the execution. reduce_task_number can be the hash value of the key returned from the Map function.
import (
"hash/fnv"
)
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}The content of the intermediate file would be the key values returned from the Map function. Also, since we are hashing on keys, all the related words would be in the single intermediate file for one task. At the end of Map function execution, we would have N * M intermediate files where N is number of distinct keys in a task and M is number of tasks.
Reduce Function
execute_reduce_task would ask for the reduce tasks from the coordinator. There would be one reduce task for each intermediate file generated in execute_map_task call. execute_reduce_task would read the content of the intermediate file using following code:
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
kva := []KeyValue{}
var reduce_task_key string
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
execute_reduce_task would call the Reduce method with the file content. In this specific case of word count, Reduce method would return the number of occurrences of any word in the given list. execute_reduce_task would also create one output file of format mr-out-{reduce_key_number} and write the output to this output file. reduce_key_number is derived from the ihash method above. At the end of execute_reduce_task, we would have one output file for each key. For the word count problem, it would be one line with key as word and value as count of words for each task.
With this, you should go ahead and try out your own complete implementation for a Map reduce program.
Fault Tolerence
System should also be able to handle functional failures and system failures. In the above implementation, Coordinator would need to keep a lookout for tasks in progress. In case any worker fails after a certain time a task is marked in progress, coordinator would mark the task back to to_be_picked_up state considering the worker has failed to execute the task due to some reason. Currently, Coordinator can become our single point of failure. To handle this, coordinator can keep writing all the actions in a log file. In case the coordinator crashes, new coordinator instance can read from the persisted log file and start from the same state the coordinator had crashed earlier.
Multi System Implementation
Above implementation relies on files and workers being on the same system. Similar implementation can be scaled up with files and workers spread across multiple systems. With files and workers spread across multiple computers, the complexity to track the tasks and intermediate files would increase. Workers would need to connect to the coordinator over the network via RPC. Coordinator would also need to keep track of different systems or computers the workers are running in and handle communication failures due to network errors. In case of a system or hardware failure, it would need to request for spawning a new system or hardware and reassign the tasks running on that failed system.
References
Google Map Reduce Paper - https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf
Basic code implementation - https://github.com/pkumar7/map_reduce.git
Introduction Lecture on Distributed Systems:







Comments