MIT 6.846 Lab 1

Part I: Word count

问题描述

代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mapreduce.RunSingle(5, 3, os.Args[2], Map, Reduce)

func RunSingle(nMap int, nReduce int, file string,
Map func(string) *list.List,
Reduce func(string, *list.List) string) {

mr := InitMapReduce(nMap, nReduce, file, "")
mr.Split(mr.file)
for i := 0; i < nMap; i++ {
DoMap(i, mr.file, mr.nReduce, Map)
}
for i := 0; i < mr.nReduce; i++ {
DoReduce(i, mr.file, mr.nMap, Reduce)
}
mr.Merge()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (mr *MapReduce) Split(fileName string) {
fmt.Printf("Split %s\n", fileName)
//open file and calculate the chunk size
infile, err := os.Open(fileName)
defer infile.Close()
fi, err := infile.Stat()
size := fi.Size()
nchunk := size / int64(mr.nMap)
nchunk += 1


outfile, err := os.Create(MapName(fileName, 0))
writer := bufio.NewWriter(outfile)
m := 1
i := 0

scanner := bufio.NewScanner(infile)
for scanner.Scan() {
//if file is fill the open a new file
if int64(i) > nchunk*int64(m) {
writer.Flush()
outfile.Close()
outfile, err = os.Create(MapName(fileName, m))
writer = bufio.NewWriter(outfile)
m += 1
}
line := scanner.Text() + "\n"
writer.WriteString(line)
i += len(line)
}

writer.Flush()
outfile.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func MapName(fileName string, MapJob int) string {
return "mrtmp." + fileName + "-" + strconv.Itoa(MapJob)
}
func ReduceName(fileName string, MapJob int, ReduceJob int) string {
return MapName(fileName, MapJob) + "-" + strconv.Itoa(ReduceJob)
}
func DoMap(JobNumber int, fileName string,
nreduce int, Map func(string) *list.List) {
//get the splited file name and open
name := MapName(fileName, JobNumber)
file, err := os.Open(name)
fi, err := file.Stat()
size := fi.Size()
fmt.Printf("DoMap: read split %s %d\n", name, size)
b := make([]byte, size)
_, err = file.Read(b)
file.Close()
res := Map(string(b))

// XXX a bit inefficient. could open r files and run over list once
for r := 0; r < nreduce; r++ {
file, err = os.Create(ReduceName(fileName, JobNumber, r))
enc := json.NewEncoder(file)
for e := res.Front(); e != nil; e = e.Next() {
kv := e.Value.(KeyValue)
if ihash(kv.Key)%uint32(nreduce) == uint32(r){
err := enc.Encode(&kv)
}
}
file.Close()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func DoReduce(job int, fileName string, nmap int,
Reduce func(string, *list.List) string) {
kvs := make(map[string]*list.List)
for i := 0; i < nmap; i++ {
name := ReduceName(fileName, i, job)
fmt.Printf("DoReduce: read %s\n", name)
file, err := os.Open(name)
dec := json.NewDecoder(file)
for {
var kv KeyValue
err = dec.Decode(&kv)
if err != nil {
break
}
_, ok := kvs[kv.Key]
if !ok {
kvs[kv.Key] = list.New()
}
kvs[kv.Key].PushBack(kv.Value)
}
file.Close()
}
var keys []string
for k := range kvs {
keys = append(keys, k)
}
sort.Strings(keys)
p := MergeName(fileName, job)
file, err := os.Create(p)
enc := json.NewEncoder(file)
for _, k := range keys {
res := Reduce(k, kvs[k])
enc.Encode(KeyValue{k, res})
}
file.Close()
}