一、剖析 MapReduce 作业运行
你可以在 Job 对象上面调用 submit() 方法或者 waitForCompletion() 方法来运行一个 MapReduce 作业。这些方法影藏了背后大量的处理过程。下面我们来揭开 Hadoop 背后运行一个作业的步骤你可以在 Job 对象上面调用 submit() 方法或者 waitForCompletion() 方法来运行一个 MapReduce 作业。这些方法影藏了背后大量的处理过程。下面我们来揭开 Hadoop 背后运行一个作业的步骤
五个核心的实体
站在更高的层面,有五个独立的实体需要提前理解
- 客户端,提交 MapReduce 作业
- YARN 资源管理器(YARN resource manager),负责协调集群上计算机资源的分配
- YARN 节点管理器(YARN node manager),负责启动和监视集群中机器上的计算容器(container)
- MRAppMaster(MapReduce application master),负责协调 MapReduce 作业的任务(tasks)的运行。MRAppMaster 和 MapReduce 任务运行在容器中,该容器由资源管理器进行调度(schedule)[此处理解为划分、分配更为合适] 且由节点管理器进行管理
- 分布式文件系统(通常是 HDFS),用来在其他实体间共享作业文件
1、作业提交
在 Job 对象上面调用 submit() 方法,在内部创建一个 JobSubmitter 实例,然后调用该实例的 submitJobInternal() 方法。如果使用 waitForCompletion() 方法来进行提交作业,该方法每隔 1 秒轮询作业的进度,如果进度有所变化,将该进度报告给控制台(console)。当作业成功完成,作业计数器被显示出来。否则,导致作业失败的错误被记录到控制台在 Job 对象上面调用 submit() 方法,在内部创建一个 JobSubmitter 实例,然后调用该实例的 submitJobInternal() 方法。如果使用 waitForCompletion() 方法来进行提交作业,该方法每隔 1 秒轮询作业的进度,如果进度有所变化,将该进度报告给控制台(console)。当作业成功完成,作业计数器被显示出来。否则,导致作业失败的错误被记录到控制台。
作业的提交过程由 JobSubmitter 实现,执行如下事情:
- 向资源管理器请求一个 application ID,该 ID 被用作 MapReduce 作业的 ID(step 2)
- 检查作业指定的输出(output)目录。例如,如果该输出目录没有被指定或者已经存在,作业不会被提交且一个错误被抛出给 MapReduce 程序
- 为作业计算输入分片(input splits)。如果分片不能被计算(可能因为输入路径(input paths)不存在),该作业不会被提交且一个错误被抛出给 MapReduce 程序
- 拷贝作业运行必备的资源,包括作业 JAR 文件,配置文件以及计算的输入分片,到一个以作业 ID 命名的共享文件系统目录中(step 3)。作业 JAR 文件以一个高副本因子(a high replication factor)进行拷贝(由 mapreduce.client.submit.file.replication 属性控制,默认值为 10),所以在作业任务运行时,在集群中有很多的作业 JAR 副本供节点管理器来访问
- 通过在资源管理器上调用 submitApplication 来提交作业(step 4)
2、作业初始化(Job Initialization)
当资源管理器接受到 submitApplication() 方法的调用,它把请求递交给 YARN 调度器(scheduler)。调度器分配了一个容器(container),资源管理器在该容器中启动 application master 进程,该进程被节点管理器管理(steps 5a and 5b)
MapReduce 作业的 application master 是一个 Java 应用,它的主类是 MRAppMaster。它通过创建一定数量的簿记对象(bookkeeping object)跟踪作业进度来初始化作业(step 6),该簿记对象接受任务报告的进度和完成情况。
接下来,application master 从共享文件系统中获取客户端计算的输入分片(step 7)。然后它为每个分片创建一个 map 任务,同样创建由 mapreduce.job.reduces 属性控制的多个reduce 任务对象(或者在 Job 对象上通过 setNumReduceTasks() 方法设置)。任务在此时给定 ID。
Applcation master 必须决定如何运行组成 MapReduce 作业的任务。如果作业比较小,application master 可能选择在和它自身运行的 JVM 上运行这些任务。这种情况发生的前提是,application master 判断分配和运行任务在一个新的容器上的开销超过并行运行这些任务所带来的回报,据此和顺序地在同一个节点上运行这些任务进行比较。这样的作业被称为 uberized,或者作为一个 uber 任务运行。
一个小的作业具有哪些资格?默认的情况下,它拥有少于 10 个 mapper,只有一个 reducer,且单个输入的 size 小于 HDFS block 的。(注意,这些值可以通过 mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, mapreduce.job.ubertask.maxbytes 进行设置)。Uber 任务必须显示地将 mapreduce.job.ubertask.enable 设置为 true。
最后,在任何任务运行之前, application master 调用 OutputCommiter 的 setupJob() 方法。系统默认是使用 FileOutputCommiter,它为作业创建最终的输出目录和任务输出创建临时工作空间(temporary working space)。
3、作业分配(Task Assignment)
如果作业没有资格作为 uber 任务来运行,那么 application master 为作业中的 map 任务和 reduce 任务向资源管理器请求容器(step 8)。首先要为 map 任务发送请求,该请求优先级高于 reduce 任务的请求,因为所有的 map 任务必须在 reduce 的排序阶段(sort phase)能够启动之前完成。Reduce 任务的请求至少有 5% 的 map 任务已经完成才会发出。
Reduce 任务可以运行在集群中的任何地方,但是 map 任务的请求有数据本地约束(data locality constraint),调度器尽力遵守该约束(try to honor)。在最佳的情况下,任务的输入是数据本地的(data local)— 也就是任务运行在分片驻留的节点上。或者,任务可能是机架本地的(rack local),也就是和分片在同一个机架上,而不是同一个节点上。有一些任务既不是数据本地的也不是机架本地的,该任务从不同机架上面获取数据而不是任务本身运行的节点上。对于特定的作业,你可以通过查看作业计数器(job’s counters)来确定任务的位置级别(locality level)。
请求也为任务指定内存需求和 CPU 数量。默认,每个 map 和 recude 任务被分配 1024 MB的内存和一个虚拟的核(virtual core)。这些值可以通过如下属性(mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores, mapreduce.reduce.cpu.vcores)在每个作业基础上进行配置(遵守 Memory settings in YARN and MapReduce 中描述的最小最大值)。
4、任务执行(Task Execution)
一旦资源调度器在一个特定的节点上为一个任务分配一个容器所需的资源,application master 通过连接节点管理器来启动这个容器(step 9a and 9b)。任务通过一个主类为 YarnChild 的 Java 应用程序来执行。在它运行任务之前,它会本地化这个任务所需的资源,包括作业配置,JAR 文件以及一些在分布式缓存中的文件(step 10)。最后,它运行 map 或者 reduce 任务(step 11)。
这个 YarnChild 在一个专用的 JVM 中运行,所以任何用户自定义的 map 和 reduce 函数的 bugs(或者甚至在 YarnChild)都不会影响到节点管理器 — 比如造成节点管理的宕机或者挂起。
每个任务能够执行计划(setup)和提交(commit)动作,它们运行在和任务本身相同的 JVM 当中,由作业的 OutputCommiter 来确定。对于基于文件的作业,提交动作把任务的输出从临时位置移动到最终位置。提交协议确保当推测执行可用时,在复制的任务中只有一个被提交,其他的都被取消掉。
Streaming
Streaming 运行特殊的 map 和 reduce 任务,达到能够运行用户提供的可执行程序并与之通信。
Streaming 任务使用标准输入和输出流与进程(可能由不同的语言编写)进行通信。在执行任务期间,Java 进程传递输入键值对到外部进程,该外部进程运行用户定义的 map 或 reduce 函数,然后传回输出键值对给 Java 进程。从节点管理器的角度来看,好像是其子进程运行 map 或 reduce 代码。
5、进度和状态更新
MapReduce 作业是长时间运行的批处理作业(long-running batch jobs),运行时间从几十秒到几小时。由于可能运行时间很长,所以用户得到该作业的处理进度反馈是很重要的。
作业和任务都含有一个状态,包括运行状态、maps 和 reduces 的处理进度,作业计数器的值,以及一个状态消息或描述(可能在用户代码中设置)。这些状态会在作业的过程中改变。那么它是如何与客户端进行通信的?
当一个任务运行,它会保持进度的跟踪(就是任务完成的比例)。对于 map 任务,就是被处理的输入的比例。对于 reduce 任务,稍微复杂一点,但是系统任然能够估算已处理的 reduce 输入的比例。通过把整个过程分为三个部分,对应于 shuffle 的三个阶段。例如,如果一个任务运行 reducer 完成了一半的输入,该任务的进度就是 5/6,因为它已经完成了 copy 和 sort 阶段(1/3 each)以及 reduce 阶段完成了一半(1/6)。
MapReduce 的进度组成
进度不总是可测的,但是它告诉 Hadoop 一个任务在做的一些事情。例如,任务的写输出记录是有进度的,即使不能用总进度的百分比(因为它自己也可能不知道到底有多少输出要写,也可能不知道需要写的总量)来表示。
进度报告非常重要,Hadoop 不会使一个报告进度的任务失败(not fail a task that’s making progress)。如下的操作构成了进度:
- 读取输入记录(在 mapper 或者 reducer 中)
- 写输出记录(在 mapper 或者 reducer 中)
- 设置状态描述(由 Reporter 的或 TaskAttempContext 的 setStatus() 方法设置)
- 计数器的增长(使用 Reporter 的 incrCounter() 方法 或者 Counter 的 increment() 方法)
- 调用 Reporter 的或者 TaskAttemptContext 的 progress() 方法
任务有一些计数器,它们在任务运行时记录各种事件,这些计数器要么是框架内置的要么是用户自定义的,例如:map 输出记录的写入。
当 map 或 reduce 任务运行时,子进程使用 umbilical 接口和父 application master 进行通信。任务每个三秒钟通过 umbilical 接口报告其进度和状态(包括计数器)给 application master,它从一个聚合的视角来看作业。
在作业执行的过程中,客户端每秒通过轮询 application master 获取最新的状态(间隔通过 mapreduce.client.progressmonitor.polinterval 设置)。客户端也可使用 Job 的 getStatus() 方法获取一个包含作业所有状态信息的 JobStatus 实例,过程如下:
6、作业完成(Job Completion)
当 application master 接受到最后一个任务完成的通知,它改变该作业的状态为 “successful”。当 Job 对象轮询状态,它知道作业已经成功完成,所以它打印一条消息告诉用户以及从 waitForCompletion() 方法返回。此时,作业的统计信息和计数器被打印到控制台。
Application master 也可以发送一条 HTTP 作业通知,如果配置了的话。当客户端想要接受回调时,可以通过 mapreduce.job.end-notification.url 属性进行配置。
最后,当作业完成,application master 和作业容器清理他们的工作状态(所以中间输入会被删除),然后 OutputCommiter 的 commitJob() 方法被调用。作业的信息被 job history server 归档,以便后续用户查看。
二、失败情况处理(Failures)
在现实情况中,用户代码错误不断,进程崩溃,机器故障,如此种种。使用Hadoop最主要的好处之一是它能处理此类故障并让你成功完成作业。我们需要考虑一下实体的失败:任务、application master、节点管理器和资源管理器。
1、任务失败(Task Failure)
首先考虑的情况是任务失败。任务失败最多的场景是用户 map 或 reduce 任务的代码抛出一个运行时异常。如果错误发生,在退出之前,任务 JVM 向父 application master 报告错误。这个错误最终会记录到用户日志中。Application master 将任务尝试标记为失败,然后释放容器资源给另一个任务。
对于 Streaming 任务,如果 Streaming 进程以一个非 0 的代码退出,那么被标记为失败。这个行为受 stream.non.zer.exit.is.failure 属性控制,默认为 true。
另一个失败的模式是任务 JVM 的突然退出 — 可能在用户代码中使用了一些引起 JVM 自身 bug 的特殊场景。这种情况下,节点管理器注意到进程已经退出并通知 application master 可以标记任务尝试为失败。
对挂起的任务处理是不同的。 Application master 注意到在一段时间内它没有收到任务的进度跟新,进而将该任务标记为失败。这个任务的 JVM 进程随后被自动杀掉。默认情况下超过 10 分钟就被认为是失败的,可以通过 mapreduce.task.timeout 属性进行配置。
这是超时值为 0 时,就等于让超时失效,所以长时间运行的任务永远也不会被标记为失败。这种情况下,挂起的任务永远不会释放它的容器,且超时也有可能导致集群变慢。所以要避免这种情况,确保任务周期的报告其进度。
当 application master 知道一个任务尝试失败了,它会重新调度任务的执行。Application master 会尝试避免重新调度这个任务在先前失败的节点管理器上。更进一步,如果一个任务失败 4 次,它将不会被重试。这个值是可以配置的(mapreduce.map.maxattemps, mapreduce.reduce.maxattemps)。默认情况下,任何任务失败四次,该作业也就失败了。
对于一些应用,并不希望少数任务的失败而丢弃该作业,可能在丢弃部分失败的作业的结果也是可用的。这种情况下,最大比例任务允许失败而不至于触发作业失败能够被设置(mapreduce.map.failures.maxpercent, mapreduce.reduce.failures.maxpercent)。
一个任务尝试被杀掉,不同于它的失败。一个尝试任务可能被杀掉因为它是一个推测副本,或者因为节点管理器运行失败,然后 application master 标记运行在该节点上的所有任务尝试被杀掉。被杀掉的任务尝试不会被记录到运行任务的尝试数目中(由 mapreduce.map.maxattempts, mapreduce.reduce.maxattempts),因为这个不是任务的过错。
2、Application Master Failure
就像 MapReduce 任务在失败时有几次尝试一样,applicatons 在 YARN 中发生失败时也会进行重试。最大的尝试次数有 mapreduce.am.max-attempts 属性控制。默认的值是 2,如果一个 MapReduce master 失败两次,它不会被再次进行尝试,作业标识为失败。
YARN 为任何运行在集群中的 application master 限制了一个最大尝试数(maximum number of attempts),每个独立的应用不能超过这个限制。这个限制有 yarn.resourcemanager.am.max-attempts 设定,默认值为 2,如果你想要增加 MapReduce application master 的尝试次数,你必须在 YARN 集群上增加限制数。
恢复工作的方式如下。一个 application master 周期性的发送心跳给资源管理器,万一 application master 失败,资源管理器将会侦测到失败,然后在一个新的容器(由节点管理器管理)中启动一个新的 master 实例。至于 MapReduce application master,它会使用作业历史(job history)来恢复先前运行成功的任务,所以这些任务不必重新运行。恢复是默认可用的,可以通过 yarn.app.mapreduce.am.job.recovery.enable 设置为 false 来禁止。
MapReduce 客户端轮询 application master 来获得进度报告,但是如果 application master 失败,客户端需要定位新的 application master。在作业初始化阶段,客户端向资源管理器请求 application master 的地址,然后缓存这个地址,所以客户端不会过载(overload)地每次向资源管理器轮询 application master。如果这个 application master 失败,客户端将会经历一次状态更新的超时,此时客户端会重新向资源管理器请求新的 application master 的地址。这个过程对于用户来说是透明的。
3、节点管理器失败(Node Manager Failure)
如果节点管理器由于宕机或者运行非常缓慢失败,它将会停止发送心跳给资源管理器(或者很少发送)。如果资源管理器超过 10 分钟(可同通过 yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms 属性设置)没有收到节点管理器的心跳,就会认为该节点管理器已经失败了。然后将该节点从资源管理器的节点池中移除,并且调度该节点上的容器。
任何任务或是 application master 运行在一个失败的节点管理器上面,将会通过前面描述的机制来进行恢复。另外,如果作业还没有完成,application master 将会安排哪些在失败节点上已经运行完成的任务重新运行,因为它们的中间输出驻留在失败的节点管理器的本地文件系统上面,这些结果可能不能被 reduce 任务访问。
节点管理器在一个应用中失败的次数很高,即使该节点管理器自身没有失败,那么该节点管理器将会被拉黑(be blacklisted)。如果在节点管理器上面超过三次任务的失败,application master 将该节点管理器拉黑,它将会重新调度任务在不同的节点上。用户可以通过 mapreduce.job.maxtaskfailures.per.tracker 属性来设置该阈值。
注意,在编写本博文时,资源管理器并不会对应用执行拉黑操作,所以新作业的任务可能会被调度到被早期作业的 application master 拉黑的坏节点上。
4、资管管理器失败(Resource Manager Failure)
资源管理器的失败是非常严重的,因为没有它,作业和任务容器都不能运行。在默认的配置中,资源管理器是单节点失败,所以万一机器失败,所有的作业都会失败且不能被恢复。
解决资源管理器高可用的方案就是以主备(active-standby)的方式来运行。如果主资源管理器失败,备资源管理器需要在没有明显中断客户端的情况下进行接管。
所有运行应用的信息存储在高可用的状态存储中(ZooKeeper 或 HDFS),以至于备资源管理器能够恢复失败主资源管理器的核心状态。节点管理器信息不被存储在状态存储中,当它发送心跳给新的资源管理器,能够通过新的资源管理器相对快速的重建。(注意到任务不是资源管理器状态的一部分,因为他们受 application master 管理)。
当一个新的资源管理器启动,它从读取状态存储中读取应用信息,然后重新启动集群中所有的 application master。这个动作不会作为一个失败应用的尝试被记录(不会通过 yarn.resourcemanager.am.max-attempts),因为应用不是因为代码的错误而失败,而是被系统强制杀掉。在实际操作中,application master 重启不是 MapReduce 应用的问题,因为它们可以通过已完成的任务来进行恢复工作。
资源管理器从主过渡到备由一个失效转移控制器(failover controller)处理的。默认的失效转移控制器是自动执行的,由 ZooKeeper 选举机制来确保在同一时刻有一个活跃的资源管理器。
和 HDFS 的高可用不同,失效转移控制器不是一个单机进程,它默认通过简单的配置内嵌到资源管理器中。所以可以进行手动配置,但是不推荐。
客户端和节点管理器必须配置处理资源管理器的失败转移,因为现在有两个资源管理器需要去通信。他们通过轮询(round-robin)的方式来连接每个资源管理器,知道发现活跃的。如果活跃的失败,他们会重试直到备用的变成活跃的。
三、Shuffle 和排序(Shuffle and Sort)
MapReduce 确保每个 reducer 的输入都是按键排过序的。由系统来执行排序,将 map 的输出转换成 reducer 的输入,这个过程称之为 shuffle。在这部分,我们探究一下 shuffle 如何工作,这个基本的理解对于你优化 MapReduce 程序是很有帮助的。shuffle 是一个持续不断改进和提升的代码库(codebase),所以下面的描述隐藏了很多细节。从多方面看,shuffle 是 MapReduce 的心脏和奇迹发生的地方。
1、Map 端(The Map Side)
当 map 函数开始产生输出,它不是简单的写入到磁盘。这个过程更加复杂,利用缓冲写入到内存,为了效率执行一些预排序(presorting)。如下图所示。
每个 map 任务有一个环形内存缓冲用于输出的写入。这个缓冲默认为 100 MB(可以通过 mapreduce.task.io.sort.mb 属性进行调整)。当缓冲中的内容达到一个确定的阈值大小(mapreduce.map.sort.spill.percent,默认值为 0.80 或者 80%),后台有一个线程将内容溢出(spill)到磁盘。当溢出操作发生时,Map 输出继续写入到缓冲中,如果此时缓冲被填满(fill up),map 将会阻塞直到溢出操作完成。溢出按照轮询的方式写入到通过 mapreduce.cluster.local.dir 属性指定的目录(在一个作业特定的子目录下)。
在内容被写入到磁盘之前,首先线程根据数据最终被发送的 reducers 来将数据进行分区(partition)。在每个分区中,后台线程在内存中按键进行排序,如果有一个 combiner 函数,它会在排过序的输出上运行。运行 combiner 函数得到更为紧凑的 map 输出,所以更少的数据写入到本地磁盘和传输到 reducer。
每次内存缓冲达到溢出阈值,一个新的溢出文件被创建,因此 map 任务在写完最后一条记录之后,可能会有几个溢出文件。在任务完成之前,这些溢出文件被合并成单个被分区过的且排序过的文件。配置属性 mapreduce.task.io.sort.factor 控制最大一次能够合并多少流(stream),默认是 10。
如果至少有三个溢出文件(由 mapreduce.map.combine.minspills 属性设置),combiner 在输出文件写入之前再次运行。combiners 可能会被重复在输入上运行而不会影响最终的结果。如果只有 1 或 2 个溢出文件,潜在的调用 combiner 的开销来减少 map 的输出大小是不值得的,所以不会为 map 的输出在调用 combiner。[此处如果超过三个,将会被合并为一个文件,少于三个就不会进行合并]。
通常压缩 map 输出写入到磁盘是一个好注意,因为带来更快的写入以及节省磁盘空间,减少传输到 reducer 的数据量。默认,输出是不压缩的,可用很容易通过设置 mapreduce.map.output.compress 为 true 来启用压缩。压缩的类库通过 mapreduce.map.output.compress.codec 来制定。
Reducer 很容易通过 HTTP 得到输出文件的部分(partitions)。通过设置 mapreduce.shuffle.max.threads 属性为每个节点管理器(而不是每个 map 任务)来控制用于服务文件部分的工作线程数。默认值 0 是设置最大线程数为机器上处理器数目的两倍。
2、Reduce 端(The Reduce Side)
map 输出文件坐落在运行 map 任务的机器本地磁盘上(注意虽然 map 输出总是写到本地磁盘,但是 reduce 输出可能不会),这些文件被运行 reduce 任务的机器所需要。再者,reduce 任务需要集群上若干个 map 任务的 map 输出作为其特殊的分区(partition)文件。map 任务可能在不同时间点完成,所以每个 map 任务完成,reduce 任务就开始拷贝它们的输出。这就是 reduce 任务的拷贝阶段(copy phase)。reduce 任务拥有少量的拷贝线程以至于能够并行抓取 map 输出。默认是 5 个线程,这个值可以通过 mapreduce.reduce.shuffle.parallelcopies 属性设置。
注意
Reducers 怎么知道从哪台机器上面抓取 map 输出呢?
当 map 任务成功完成,它们通过心跳机制通知 application master。因此,对于一个特定的作业,application master 知道 map 输出和主机之间的映射。在 reducer 中有一个线程周期性想 master 所询问有的 map 输出主机,直到所有的主机都得到。
主机不会在首个 reducer 拿到 map 输出就将其从磁盘上删除掉,因为 reducer 后续可能失败。在作业已经完成之后,主机会等待 application master 的通知来从磁盘上删除这些输出文件。
Map 输出如果足够小(缓冲的大小由),会被拷贝到 reduce 任务的 JVM 内存中(这个缓冲的大小由 mapreduce.reduce.shuffle.input.buffer.percent 控制,指定了对堆内存使用比率)。否则,这些输出将会拷贝到磁盘。当内存中的缓冲达到一个阈值大小(由mapreduce.reduce.shuffle.merge.percent)或者达到 map 输出阈值大小(mapreduce.reduce.merge.inmen.threshold),则合并溢出到磁盘中。如果一个 combiner 被指定,在合并的过程中它可能被运行以至来减少写入磁盘的数据量。
随着磁盘上的副本持续累积,一个后台线程用来将它们合并成一个更大的,排过序的文件。这会为后面的合并节省一些时间。注意,为了合并,任何在 map 压缩的输出必须在内存中进行解压缩。
当所有的 map 输出都被拷贝,reduce 任务进入到排序阶段(应该被称为合并阶段,因为排序已经在 map 端执行了),合并 map 输出,维持其排序顺序。这个是循环执行的。例如,如果有 50 个 map 输出且合并因子(merge factor)是 10(有 mapreduce.task.io.sort.factor 属性控制),因此有 5 个循环。每个循环合并 10 个文件到 1 个文件中,所以最后有 5 个中间文件。
并不是有最后一个循环来合并这 5 个文件到 1 个单独排序过的文件,在最后阶段(reduce phase)合并通过直接向 reduce 函数提供数据,从而节省了一次磁盘的往返。最后的合并可能来自内存和磁盘的混合(a mixture of in-memory and on-disk segments)。
在 reduce 阶段,在已排序的输出中,为每个键调用 reduce 函数。该阶段的输出直接写入到输出文件系统中,典型的是 HDFS。在是 HDFS 的情况下,因为节点管理器也运行在数据节点上,所以第一个块副本被写入到本地磁盘上。
四、推测执行(Speculative Execution)
MapReduce 模型是将作业分解成多个任务的并行运行使得整体作业时间小于任务的顺序执行。这个使得作业的执行时间对于慢运行的任务很敏感。
当一个作业有成千上万的任务构成,有一些挣扎的任务(straggling task)是很有可能的。
一个任务缓慢可能有多个原因,包括硬件的损坏或者软件的错误配置,但是这些原因可能很难侦测到,因为任务仍然成功地完成了,虽然只是比期望的时间稍长。Hadoop 不会试图诊断和解决慢运行的任务;相反,它会尝试去侦测当一个任务运行比期望的慢,就会启动一个相等的任务作为备份。这就是推测执行的任务。
理解推测执行不会在同一时间启动两个副本任务非常重要,因为他们之间会产生竞争。这会对集群资源造成浪费。相反,调度器跟踪所有作业的任务的进度,只会为哪些小比率的明显慢于平均水平的任务启动推测副本。当一个任务成功完成,任何副本任务会被中止,因为已经不再需要了。所以,一个原始的任务在推测任务之前完成,推测任务会被中止;相反,如果侦测任务先完成,原始任务会被中止。
推测执行是一个优化措施,而不是确保作业运行的更可靠。如果有缺陷引起任务的挂起或是变慢,依靠推测执行来避免这个问题是不明智的,也是不可靠的,因为同样的缺陷也会音响推测任务。你需要解决这个缺陷来让任务不在挂起或是变慢。
在默认情况下,推测执行是启用的。可以基于集群或是每个作业,单独为 map 任务 和 reduce 任务启用或禁用该功能。
为什么你想要关闭推测执行?推测执行的目的是减少作业的执行时间,但是这会带来集群效率的消耗。在一个繁忙的集群上,推测执行可能会减少整体的吞吐量,因为多余任务的执行企图降低单个作业的执行时间。为此,一些集群管理员宁愿关闭它,让用户显式的为个别作业开启推测执行。(老版本的 Hadoop 的推测执行会被系统过度使用)。
对于 reduce 任务,关闭推测执行是有益的,因为任何副本 reduce 任务都和原始的任务一样必须获取相同的 map 输出,这就显著地增加集群上的网络开销。
另外一个关闭推测执行的原因是非幂等性任务。然而,大多数情况下,将任务写成幂等的并使用OutputCommitter来提升任务成功时输出到最后位置的速度,这是可行的。
五、输出提交器(Output Committers)
Hadoop 的 MapReduce 使用一个提交协议来清晰地确保作业和任务要么全部要么失败。这个行为由 OutputCommitter 来为作业实现,在新的 API 中,由 OutoutFormat 的 getOutputCommitter() 方法决定。默认是 FileOutputCommitter,它适合基于文件的 MapReduce。你可以在一个已存在的 OutoutCommitter 上进行定制或者需要为作业或任务做一些特殊的设置或清理来写一个新的实现。
OutputCommitter的API如下所示:
public abstract class OutputCommitter {
public abstract void setupJob(JobContext jobContext) throws IOException;
public void commitJob(JobContext jobContext) throws IOException { }
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { }
public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;
public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;
public abstract void abortTask(TaskAttemptContext taskContext) throws IOException;
}
setupJob() 方法在作业运行之前调用,它是典型的用于执行初始化。对于 FileOutputCommitter,这个方法创建最终的输出目录(mapreduce.output.fileoutputformat.outputdir)和为任务输出创建的一个临时工作空间,_temporary,作为最终输出目录下的一个子目录
如果作业成功,commitJob() 方法被调用,默认基于文件的实现会删除临时工作空间,然后创建一个隐藏的标记文件在输出目录中,_SUCCESS,用来知识文件系统客户端这个作业已经成功完成。如果作业没有成功,abortJob() 被调用并传递一个状态对象,该对象指示作业是失败还是被中止。默认实现,会删除作业的临时工作空间
在任务级别,操作也是类似的。setUpTask() 方法在任务运行之前被调用,默认的实现什么也不做,因为临时目录的命名已经在任务输出时被创建了。
参考:Hadoop权威指南第4版。