此系列为Cousera上Standford的Mining Massive Datasets课程学习笔记。 这是该系列的第一篇笔记:MapReduce
分布式文件系统 Distributed File Systems
Cluster Architecture
传统的数据挖掘都在一台主机上搞定,而如今数据量之大早已不是一台主机可以处理的了,拿Google举例:
- 100亿个网页
- 每个网页平均大小 = 20KB
- 总共 10 billion * 20 KB = 200TB
- 磁盘读取速度 = 50 MB/sec
- 总共读取时间 = 四百万秒 = 46天
- 读取数据时间甚至比处理数据时间还要长
所以现在的集群架构(Cluster Architecture)是这样的:
构成树形结构,上面的节点都是交换机,底层叶子节点是Linux主机集群,每个集群包含14-64个主机节点。
这种集群架构虽然可以通过并发处理解决IO问题,但是它也面临新的问题: * Linux节点会挂掉 * 如何做到即使某个主机挂了也能永久保存其中的数据? * 如何处理在计算中节点挂掉的情况?
- 网络瓶颈
- 目前网络带宽 = 1 Gbps
- 移动10TB的数据大约需要一天
- 编写分布式的程序比较困难!
- 需要一个简单的模型来消除大部分困难
MapReduce
Map-Reduce 解决了集群计算的一些困难与挑战。 * 以冗余的方式把数据存储在多个节点上来保持数据的永久性和可用性。 * 让计算靠近数据来最小化数据的移动。 * 简单的编程模型
冗余存储基础设施 Redundant Storage Infrastructure
- 分布式文件系统
- Google GFS
- Hadoop HDFS
- 典型使用场景
- 数据量大
- 少更新
- 读取频繁
文件分布系统就像以下: 每一“块”的服务器也有计算功能,不光是存储数据!
计算模型 Conputational model
Word Count
一个例子 Word Count: * 有一个非常大的文本文件 * 需要统计每个单词出现的次数
- Case 1 : 整个文件无法读取进内存,但是所有的
<word, count>
键值对可以存储在内存里。
这种情况我们可以简单写一个Hash Table,以word作为key,count作为value来实现。
- Case 2 : 连
<word, count>
都无法完全保存在内存里。
这种情况可以用一条Linux命令来解决: 1
words(doc.txt) | sort | uniq -c
words
是一个程序,用来把文件输出成单词,一行一个。
这种方法正是MapReduce模型的计算方法,而且还天然地可以并行计算。
整体概揽
具体分析
Map
输入一些原始的键值对,输出中间状态的键值对,一对一或一对多,在上述例子里就是words
这个程序,输入文件名,输出<word, 1>
的键值对。
Reduce
把中间状态的键值对按照key
分组整理,输出的键值对都已经分好组,在上述例子中就是sort
这个命令。
最后是把每组键值对的value
合并成一个。
整个处理过程中,程序员只需要写两个函数:Map(k, v)
和Reduce(K, <V>*)
,对于解决不同的问题两个函数的实现各有不同,但是框架都是这一个。
并行计算
- 大文件拆分成多个部分
- 每部分文件分别在一个主机上进行Map计算。
- 通过Hash把所有主机Map之后的导入到一个或多个主机里,使具有同一个
key
的键值对都出现在一个主机里,并在每个主机里分别进行排序。 - 在各个主机里分别Reduce
伪码示例: 1
2
3
4
5
6
7
8
9
10
11map(key, value):
# key: document name; value: text of the document
for each word w in value:
emit(w, 1)
reduce(key, values):
# key: a word; value: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)
Examples
调度和数据流动 Scheduling and Data Flow
描述Map-Reduce的工作图:
并行处理模型,上一节已经描述过了:
Environment
Map-Reduce过程需要考虑的问题: * 对输入数据划分 * 程序的执行在一大堆机器里进行调度 * 执行按key
分组这一步骤 * 处理节点(机器)崩溃 * 管理必要地机器之间的通信
Data Flow
输入和最终输出是储存在分布式文件系统上的(DFS),调度者应尽量调度Map任务"靠近"输入数据实际所存储的地方,让计算靠近数据。
中间结果存储在Map工作者和Reduce工作者本地的文件系统里。
一个Map-Reduce任务的输出通常是另一个Map-Reduce任务的的输入。
Coordination
每个Map-Reduce任务都有一个Master节点,Master节点需要负责总体任务的协调与调度。
Master节点需要考虑的问题: * 每个节点的任务状态:空闲的(idle), 正在工作的(in-progress), 已完成的(completed) * 有空闲任务的话就找可用的节点去处理 * 当一个Map任务完成时,工作者向Master节点发送R个中间文件的位置和大小,R是Reduce工作者的个数,每个Reduce工作者获得一个中间文件。 * Master节点把数据推送到Reducers节点。 * Master节点还要时不时地ping一下工作者看机器是否还活着。
失败处理办法
- 处理Map任务的节点挂了
由于Map任务输出的中间结果是存储在本地的,所以机器挂了输出的数据也就没了,只能重新做了。 把挂了节点的Map任务重设成空闲,不管挂之前是处理中还是已完成,反正数据都没了,然后等着有其他可用节点来完成这个节点的工作。
- 处理Reduce任务的节点挂了
由于Reduce任务输出的最终结果是保存在分布式文件系统上的,我们前面已经讨论过了,一个节点坏了没关系,还有其他节点保存着备份,所以如果该节点是完成Reduce任务之后挂的那就不用管了,数据还在; 如果是没完成就挂了,那就需要重做了。 只把没做完就挂了的任务重设为空闲,等着其他节点有空来作。
- Master节点挂了怎么办
失去了领导者这个任务只能终止,然后等待从头重新开始咯。 不过机器的平均使用寿命在1000天左右,单台机器挂掉的概率是很小的,在Map任务和Reduce任务节点的集群里,很可能会出现有一两台挂掉的情况,对于Master这个单一节点来讲,挂掉的几率非常小,可以不考虑。
Map和Reduce的工作数量
设有M个Map任务,R个Reduce任务,我们的目的就是确定M和R。
拇指规则(经验): * 让M远大于集群里节点个数 * 每个DFS区块处理一个map任务,可以减少单个节点的任务量 * 提高动态负载均衡和从挂掉节点恢复的速度 * 通常R比M小
优化 Refinements
Combiners
通常一个Map任务会处理出许多个具有相同key
的键值对,直接把这样的数据传给Reducer会很巨大,网络带宽占用多,有时候可以在传输前预合并一下,极大的减少了数据量,发送速度也显著提高。
合并(combine)函数通常与Reduce函数是一样的。
回到之前Word Count的例子 Map任务输出结果中可能会有大量的<word, 1>
具有相同的key
,比如the
可能出现了1000次,那么里面就有1000个('the', 1)
,这时候可以通过预合并把具有相同key
的键值对合并,就变为了('the', 1000)
,然后再推送到Reduce任务中,极大减少了传输数据量。 如图所示:
但预合并并不是什么时候都管用的,它要求Reduce函数满足交换律和结合律。
举个例子,比如求和(sum)满足交换律和结合律,即a + b = b + a
和(a + b) + c = a + (b + c)
,所以它可以在本地预处理。
如果Reduce函数是求平均值(Average)呢? 我们都过把大数据分成了很多小块,在每个小块里进行map,然后把输出结果具有相同key
的键值对加起来再取平均,然后发送给Reduce任务,Reduce把每个小块的平均值加起来再取平均,得到最终结果。
得到的是正确结果吗? 显然不是!
求平均值这个运算不满足结合律,分成小块取平均然后加起来再取平均与直接把所有数据加起来取平均是不相等的!
所以这个例子不能用与Reduce一样的合并函数,但也不是没有办法,可以只求出它们的和,不取平均,然后发送到Reduce里,Reduce函数把所有的区块的和加起来再取平均,得到的就是正确结果了。
再换一个,如果要求中位数(Median)呢? 那就没辙了,只能直接传原始map输出数据了。
分块函数
实现 Implementations
- Hadoop
- 开源、用java实现
- 使用HDFS作为稳定的存储
- 下载: http://lucene.apache.org/hadoop/
- Hive, Pig
- 在hadoop MapReduce层上面的抽象
- Google MapReduce
- 使用Google File System作为存储
- 只有google内部可以使用
资源和延伸阅读
课件pdf下载
延伸阅读: MapReduce: Simplified Data Processing on Large Clusters
资源:
- Hadoop Wiki
- 介绍
- Getting Start
- MapReduce 概揽
- http://wiki.apache.org/lucene-hadoop/HadoopMapRedClasses
- Eclipse环境