本篇文章是对论文MapReduce-OSDI04的原创翻译,转载请严格遵守CC BY-NC-SA协议。
作者
Jeffrey Dean and Sanjay Ghemawat
jeff@google.com, sanjay@google.com
Google, Inc
摘要
MapReduce是一个用来处理和生成大型数据集的编程模型和相关实现。用户需要指定map函数和reduce函数。map函数处理键值对并生成一组由键值对组成的中间值,reduce函数将所有键相同的中间值合并。就像本文中展示的那样,现实世界中的很多任务都可以通过这个模型表示。
以这种函数式风格编写的程序可以自动地作为并行程序在大型商用机集群上执行,运行时(run-time)系统负责对输入数据分区、在一系列机器间调度程序执行、处理机器故障、管理必要的机器间的通信。这让没有任何并行程序和分布式系统开发经验的编程人员能够轻松利用一个大型分布式系统的资源。
我们的MapReduce实现是高度可伸缩的,其运行在一个由商用机器组成的大型分布式集群上。通常,一个MapReduce计算会处理上千台机器上数TB的数据。每天都有数百个MapReduce程序提交的高达上千个MapReduce任务在Google集群上执行。开发人员认为这个系统非常易用。
1. 引言
在过去的五年中,本文作者和其他在Google的开发者实现了数以百计的计算程序,以计算处理不同来源的大规模原始数据(如爬取到的文档、web请求日志等)。这些程序可能用来计算倒排索引(inverted index)、web文档在图论中的各种表示、每个主机爬取到的页面数量之和、给定的某天中查询最频繁的集合等等。虽然大部分的计算程序逻辑非常简单,但是由于其输入数据的规模通常很大,所以这些程序必须在成百上千台机器上分布式执行以在可可接受的时间内完成。解决并行计算、数据分布、故障处理等问题需要大量复杂的代码,让原本简单的问题不再简单。
为了应对这种复杂性,我们设计了一个新的程序抽象。其允许我们通过简单的描述表达我们要执行的计算,同时将并行化、容错、数据分布、负载均衡等细节隐藏在库中。我们的抽象收到了Lisp和许多其他函数式语言中的map和reduce原语的启发。我们意识到,我们大部分的计算都设计map操作和reduce操作。首先对输入数据中每条逻辑记录应用map操作以计算出一系列的中间键值对,然后对所有键相同的值应用reduce操作以合理地整合这些派生数据。用户可以自定义map和reduce操作,这让大型计算的并行化更为简单,且可以使用“重跑(re-execution)”的方法作为主要容错机制。
本工作的主要贡献为一个简单且功能强大的能实现自动并行化、高伸缩性分布式计算的的接口,和该接口在大型商用PC集群上的高性能的实现。
第二章描述了基本编程模型,并给出了几个例子。第三章描述了为我们基于集群的计算环境定制的MapReduce接口实现。第四章描述了该编程模型中我们认为有帮助的细节。第五章我们的实现在各种任务重的性能测试。第六章探究了MapReduce在Google中的使用,其中包括了我们以MapReduce为基础重写我们产品索引系统的经历。第七章探讨了相关工作与未来的工作。
2. 编程模型
计算任务以一系列输入键值对作为输入,并产出一系列输出键值对作为输出。MapReduce库的用户将计算表示为两个函数:map和reduce。
用户编写的map函数将输入键值对处理为一系列中间键值对。MapReduce库将键相同的所有中间键值对的值与其对应的键传递给reduce函数。
用户编写的reduce函数接收中间键值对的键和该键对应的一系列值。它将这些值合并,并生产一个可能更小的一系列值。每个reduce函数调用通常产出0个或1个输出值。中间键值对中的值通过一个迭代器(iterator)供用户编写的reduce函数使用。这让我们能够处理因过大而无法放入内存中的值列表。
2.1 示例
考虑如下一个问题:统计一个大量文档集合中每个单词出现的次数。用户会编写如下的伪代码。
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map计算出每个单词与其(译注:在每个文档中的)出现的次数(在本例中为“1”)。reduce函数会求出每个单词出现次数的和。
另外,用户编写代码来一个mapreduce specification(规格/规范)对象,填写输入输出文件名和可选的调节参数。随后,用户调用MapReduce函数,将mapreduce specification对象作为参数传入。用户代码会被与MapReduce库(C++实现)链接到一起。附录A包含本示例的完整程序。
2.2 类型
尽管前面的伪代码中使用了字符串作为输入输出类型,但理论上用户提供的map和reduce函数可以使用相关联的类型:
map (k1,v1) -> list(k2,v2)
reduce (k2,list(v2)) -> list(v2)
即输入的键和值与输出的键和值的类型域不同,而中间键与值和输出键域值的类型与相同。
在我们的C++实现中,我们通过字符串将接受或传入用户定义的函数的参数,将字符串与适当类型的转换留给用户代码去实现。
2.3 更多示例
本节中,我们给出了一些简单的示例。这些示例是可以简单地通过MapReduce计算表示的有趣的程序。
-
分布式“grep”:如果一行文本匹配给定的模板,那么map函数会输出该行。reduce作为一个恒等函数,它仅将提供的中间数据复制到输出。
-
URL访问频率计数:map函数处理web网页请求日志,并按照输出。reduce函数对相同的值求和,并输出键值对。
-
反转web链接拓扑图:map函数对名为的页面中每个名为的URL链接输出一个键值对。reduce函数按照所有相同的合并为一个列表,并与其相应的URL关联,输出键值对。
-
每个主机的词向量统计:词向量是对是对一个或一系列文档中最重要的词的总结,其形式为键值对列表。map函数为每篇输入文档输出一个键值对(其中由文档到的URL解析而来)。reduce函数会受到对于给定的主机上每篇文章的所有的词向量。其将这些词向量加在一起,丢弃掉低频词,并最终输出键值对。
-
倒排索引:map函数对每篇文档进行提取,输出一个的序列。reduce函数接受给定词的所有键值对,并按照排序。输出一个键值对。所有输出的键值对的集合组成了一个简单的倒排索引。如果需要持续跟踪词的位置,仅需简单的增量计算。
-
分布式排序:map提取每条记录中的键,输出一个的键值对。reduce函数不对中间变量作修改直接输出所有的键值对。排序计算依赖章节4.1中介绍的分区机制和章节4.2介绍的排序属性。
3. 实现
MapReduce接口可能有很多不同的实现。如何作出正确的选择取决于环境。例如,一种实现可能适合小型的共享内存的机器,一种实现可能适合大型NUMA多处理器主机,或者一种实现可能适合更大型的通过网络连接的机器集群。
本节中,我们将介绍一个中面向Google中常用的计算环境的实现。Google的常用计算环境为彼此通过交换机以太网[4]连接的大型商用PC集群。在我们的环境中:
-
机器通常使用双核x86处理器,2-4GB内存,运行Linux系统。
-
使用商用网络硬件:每台机器带宽通常为100Mbps或1Gbps,但平均分到的带宽要小得多。(译注:可能受交换机间带宽限制,每台机器平均分到的带宽远小于其单机带宽。)
-
一个集群由成百上千的机器组成,因此机器故障是常态。
-
存储由直接连接到独立的机器上IDE(译注:本文IDE指集成设备电路Intergated Drive Electronics)磁盘提供。我们为了管理这些磁盘上的数据,开发了一个内部的分布式文件系统[8]。该文件系统使用副本的方式在不可靠的硬件上提供了可用性和可靠性。
-
用户将工作(job)提交到一个调度系统中。每个工作由一系列的任务(task)组成,这些任务被*scheduler(调度器)*映射到集群中一系列可用的机器上。
3.1 执行概览
输入数据会自动被分割为个分片(split),这样,map函数调用可以在多个机器上分布式执行,每个输入的分片可以在不同机器上并行处理。中间键值对的键空间会通过被分区函数(例如,)分割为个分区,这样,reduce函数也可以分布式执行。其中分区的数量()和分区函数由用户指定。
6.1 大规模索引
目前,我们使用MapReduce做的最重要的工作之一是完全重写了一个索引系统,该系统被用作生成用于Google web搜索服务的数据结构。该索引系统将大量被我们爬虫系统检索到的文档(作为GFS文件存储)作为输入。这些文档的原始内容的数据大小超过20TB。索引进程会运行一系列5~10个MapReduce操作。使用MapReduce(而不是旧版索引系统中ad-hoc分布式传递方案)提供了很多好处:
-
索引代码更简单、短、便于理解,因为处理容错、分布式和并行的代码被隐藏在了MapReduce库中。例如,计算中的有一个阶段的代码量从3800行C++代码所见到了700行使用MapReduce的代码。
-
MapReduce库的性能足够好,这让我们可以将概念上不相关的计算分离开,而不是将它们混合在一起,这样可以避免传递过多额外的数据。这使改变索引程序变得非常简单。例如,在我们旧的索引系统中,一处修改会花费几个月的时间,而新的系统仅需要几天就能实现。
-
索引系统变得更容易操作。大部分因机器故障、缓慢的机器、网络不稳定等引起的问题都被MapReduce库自动处理了,不需要引入额外的操作。此外,向索引集群添加新机器以获得更好的性能变得更加简单。
7. 相关工作
许多系统提供了受限制的编程模型,并通过这些限制来进行自动化并行计算。例如,使用并行前缀和计算(parallel prefix computation)[6, 9, 13],可以使用个处理器上在的时间内计算有个元素的数组中所有前缀和。MapReduce可被看做是对一些这类模型基于我们在现实世界中对大型计算的经验做出的简化和升华。更重要的是,我们提供了适用于大规模的数千个处理器的带有容错机制的实现。相反,大部分并行处理系统仅被小规模使用,且将处理机器故障的细节留给了开发者。
BSP模型(Bulk Synchronous Programming)[17]和一些MPI(Message Passing Interface,消息传递接口)[11]原语提供了让开发者编写并行程序更简单的高层抽象。这些系统和MapReduce的关键区别在于MapReduce提供了一个受限的编程模型,以自动地并行化用户程序,并提供了透明的容错机制。
我们的局部性优化的灵感来自于如活动磁盘(active disk)[12, 15]技术,即计算程序被推送到靠近本地磁盘的处理设备中,这减少了I/O子系统或者网络的总数据发送量。我们在直连少量磁盘的商用处理器上运行程序,而不是直接在磁盘控制处理器上运行,但最终目的都是一样的。
我们的任务副本机制类似Charlotte System[3]中使用的Eager调度机制。简单的Eager调度的一个缺点是,当一个任务反复故障时,整个计算都无法完成。我们通过跳过损坏记录的方式来解决导致该问题的一些情况。
MapReduce的实现依赖了一个内部的集群管理系统,该系统负责在大量共享的机器上分配并运行用户任务。该系统比较神似如Condor[16]的其他系统,但这并不是本文的重点。
MapReduce中的排序机制在操作上类似NOW-Sort[1]。源机器(map worker)将待排序的数据分区,并将其发送到个reduce worker之一。每个reduce worker将其数据在本地排序(如果可以,会在内存中执行)。当然,NOW-Sort不支持用户自定义map和reduce函数,这让我们的库适用范围更广。
River[2]提供了一个通过分布式队列发送数据来处理程序间交互的编程模型。就像MapReduce,River系统试图在存在由异构硬件或系统干扰导致的性能不均匀的情况下提供良好的平均性能。River通过小心地调度磁盘和网络传输以使计算时间平衡的方式实现这一点。而MapReduce框架通过对编程模型进行限制,将问题划分为大量更细致的任务。这些任务在可用的worker间动态调度,以让更快的worker处理更多任务。这种受限的编程模型还允许在工作末期调度冗余执行的任务,这样可以大大缩减离群机器(如慢速或者卡死的worker)中的计算时间。
BAD-FS[5]采用了和MapReduce区别非常大的编程模型。与MapReduce不同,BAD-FS的目标是在广域网中执行工作。然而,有两个基本点很相似。(1)二者都使用了冗余执行的方式恢复因故障丢失的数据。(2)二者都使用了有位置感知(locality-aware)调度方式来减少拥堵的网络连接中数据发送的总量。
TACC[7]是一个为简化高可用网络服务设计的系统。像MapReduce一样,TACC依赖重新执行的方式作为容错机制。
8. 结论
MapReduce编程模型被成功应用于Google中的很多目标。我们将这种成功归结于几个原因。第一,因为该模型隐藏了并行化、容错、本地优化和复杂均衡的细节,所以甚至没有相关经验的程序员都可以轻松使用。第二,很多不同的问题都可以被表示为MapReduce计算。例如,MapReduce在Google的生产系统的web搜索服务、排序、数据挖掘、机器学习和很多其他系统中被作为数据生成工具使用。第三,我们开发了一个适用于由上千台机器组成的大型集群的MapReduce实现。该实现可以高效利用这些机器的资源,因此其非常适用于Google中的大型计算问题。
我们从这项工作中学习到了很多事。第一,对编程模型进行限制可以让并行化、分布式计算、容错等更加简单。第二,网络带宽是非常稀缺的资源。我们系统中的大量优化都是为了减少网络发送的数据量:局部性优化允许我们从本地磁盘读取数据,在本地磁盘中写单个中间数据的副本同样节约了网络带宽。第三,冗余执行可以用来减少缓慢的机器带俩的影响,并可以用来处理机器故障和数据丢失。
致谢
Josh Levenberg在修订和扩展用户级MapReduce API方面提供了很大帮助,他根据自己对MapReduce的使用经验和其他人对功能增强的建议,提供了很多新特性。MapReduce从GFS[8]读取输入并写入输出。感谢Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung和Josh Redstone在开发GFS中做出做出的工作。同样感谢Percy Liang和Olcan Sercinoglu在MapReduce使用的集群管理系统中做出的工作。Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike和Debby Wallach为本文的早期草稿提供了有帮助的评论。OSDI的匿名审稿者和我们的领导者Eric Brewer对本文的改进提供了帮助。最后,我们希望感谢来自Google工程师的MapReduce使用者,他们给出了很多有帮助的反馈、建议和bug报告。
参考文献
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
[4] Luiz A. Barroso, Jeffrey Dean, and Urs Holzle. ¨ Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78–91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003.
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/. [11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103–111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
附录A 词频统计
本节包含了一个对通过命令行指定的一系列输入文件中每个单词出现次数技术的程序。
#include "mapreduce/mapreduce.h"
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ’result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}