hadoop2指南

Hadoop Fundamentals

Chapter 1. Meet Hadoop

Hadoop通常指大型的生态系统,不仅仅是HDFS和MapReduce,属于分布式计算和大规模数据处理
例如,HBase,key-value存储方式,使用HDFS作为底层存储
yarn,Hadoop中新处理模型的推动者,它是一个集群资源管理系统,它允许分布式程序(不仅仅是MapReduce)在Hadoop集群处理数据
Hive,大数据集合低延迟的SQL交互查询
Spark,许多算法,例如机器学习算法,本质上是在数据集上的迭代计算,而MapReduce并不支持迭代数据持久化到内存,需要每次在磁盘载入,而Spark不需要
Solr,可在Hadoop集群上运行,索引文档存储在HDFS并提供查询服务
MapReduce有助于理解一些通用概念,例如输入格式概念和数据集的分片等
MapReduce等Hadoop处理模型都是随着数据量线性扩展的,而功能函数(例如Map和Reduce)可以在分区并行工作,因此假设据数量翻倍,而集群节点数量也翻倍,那么计算时间是不变的

Chapter 2. MapReduce

A Weather Dataset

我们以计算历年记录的最高温度为例,传统方法会逐行读取并比较获得最高温度。为了提高运行速度,可以使用并行处理。
1.我们可以使用不同的进程运行不同年份的记录,但是不同年份的数据量不同,计算时间取决于最大的文件。因此最好的办法是把输入的数据分割成大小相等的块再分别处理。
2.因为数据被分割成大小相等的块,每个数据块都会计算出不同年份的最大温度,最终我们还要找到每年的最大温度。
3.仍然会受限于单台计算机的处理能力,可以使用多台计算机,那么主要问题在与协调性和可靠性,哪台计算机执行全局的任务,如何处理失败的进程。
以上方法虽然可行但是繁杂,所以我们可以借助类似Hadoop的框架。

Analyzing the Data with Hadoop

MapReduce将操作分为两个阶段,map阶段和reduce阶段,每个阶段都是key-value对作为输入和输出,同时需要编写map和reduce方法。
map阶段的输入是NCDC原始的行数据,key是每行相对应文本开始位置的偏移值,不过可以忽略。map方法只是数据准备,只需要在每行数据中提取出年份和温度。
可视化map的工作
输入数据:
0067011990999991950051507004…9999999N9+00001+99999999999…
0043011990999991950051512004…9999999N9+00221+99999999999…
0043011990999991950051518004…9999999N9-00111+99999999999…
0043012650999991949032412004…0500001N9+01111+99999999999…
0043012650999991949032418004…0500001N9+00781+99999999999…
呈现给map方法的key-value对:
(0, 0067011990999991950051507004…9999999N9+00001+99999999999…)
(106, 0043011990999991950051512004…9999999N9+00221+99999999999…)
(212, 0043011990999991950051518004…9999999N9-00111+99999999999…)
(318, 0043012650999991949032412004…0500001N9+01111+99999999999…)
(424, 0043012650999991949032418004…0500001N9+00781+99999999999…)
map提取年份和温度,并输出:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
MapReduce框架通过key聚合排序map的输出,再传输给reduce,所以reduce的输入为:
(1949, [111, 78])
(1950, [0, 22, −11])
reduce遍历list并找出最大温度:
(1949, 111)
(1950, 22)

Scaling Out

将数据储存在分布式文件系统,使用Hadoop的资源管理系统(YARN),可使得每台机器持有一部分数据进行MapReduce计算。

Data Flow

YARN分配任务到集群节点,如果一个任务失败,会被自动分配到其它节点。Hadoop将输入数据分片并创建map任务将分片作为输入数据。
分片越小,负载均衡越好,因为一个运行速度快的机器可以处理更多的分片。
但是如果分片太小,分片的管理和map任务的创建会耗费太多的时间。分片大小最好等于HDFS的block,默认是128M。数据存在于执行map任务的节点,执行效率更高,因为节省了宝贵的集群间的带宽资源。
一个block大小的分片是可以保证存储在同一个节点的,这就是为什么最优分片大小是block size。假设分片大小是两个block size,执行map任务的节点可能没有恰好存储两个block,这就需要在节点间传输数据。
reduce任务数量可以自定义,map的输出需要通过网络传输合并传递给运行reduce的节点,reduce的输出通常存储在HDFS。
当有多个reduce时,map分割输出,相同的key必须在同一个分割,用户可以自定义分割,但是默认分割方法挺好的,

Combiner Functions

许多MapReduce任务受限于集群的可用带宽,所以研究如何最小化map和reduce间传递的数据变得很重要。combiner方法允许用户指定并作用于map的输出,最后形成reduce的输入。因为combiner方法是优化方法,hadoop不保证可以运行几次combiner。
第一个map输出:
(1950, 0)
(1950, 20)
(1950, 10)
第二个输出:
(1950, 25)
(1950, 15)
reduce输入:
(1950, [0, 20, 10, 25, 15])
reduce输出:
(1950, 25)
我们可以使用combiner方法,类似于reduce,找到每个map输出的最大值:
(1950, [20, 25])
可表示为:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
但是,如果计算平均值,不能使用mean作为combiner,因为:
mean(0, 20, 10, 25, 15) = 14
但是:
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
综上,combiner可以减少传输数据的数量,值得考虑是否可以用combiner。

Chapter 3. The Hadoop Distributed Filesystem

The Design of HDFS

HDFS is a filesystem designed for storing very large files with streaming data access
patterns, running on clusters of commodity hardware.

Very large files
文件大小数百M,G,T,P

Streaming data access
HDFS建立在最有效的数据处理模式下,一次写入,多次读取的模式。因此读取整个数据集的时间比读取第一条数据的延迟时间更重要。

Commodity hardware
Hadoop不需要昂贵,高可靠性的硬件。

Low-latency data access
HDFS对几十毫秒内的低延迟访问表现不好,因为它主要优化方向是高吞吐量,势必造成高延迟。HBase是当前达到低延迟的选择。

Lots of small files
namenode将文件系统元数据放在内存,而每个文件,目录,block占用大约150比特。假设有100万的文件,每个文件是一个block,需要300M内存。

Multiple writers, arbitrary file modifications
目前仅支持单用户在文件末尾append

HDFS Concepts

Blocks

每个block默认128M,文件被拆分成block大小,不足block大小的文件虽然是一个block,但是仅占用实际大小的空间。
block带给分布式系统好处:

  1. 文件大小可以大于单个结点的磁盘
  2. 简化储存子系统
  3. 适合复制,增加了系统容错能力,默认复制到3个结点
Namenodes and Datanodes

HDFS集群是master−worke模式,一个namenode(master)和一些datanode(workers)。namenode管理文件系统命名空间。它包含文件系统树和所有文件及目录的元数据,这些数据持久化在本地磁盘,两个文件分别是namespace image和edit log。namenode不持久化block的位置,因为每次系统启动都会通过datanode重建位置信息。
客户端访问namenode和datanode,对用户来说是透明的。
datanode存储检索block并周期性的向namenode报告blocks的存储情况。
没有namenode,HDFS将无法工作。namenode故障将使得文件系统的文件丢失,为了避免这种情况,Hadoop提供了两种机制:
第一种是备份文件系统元数据,备份是同步的和原子的,通常的配置是写入本地磁盘和原称NFS mount。
第二种是运行secondary namenode,周期性的合并namespace image及edit log。secondary namenode常运行在单独的机器,因为它需要大量的CPU和内存执行merge。如果主namenode挂了,总会出现数据丢失,在这种情况通常从NFS复制一份namenode的元数据到ssecondary namenode并将其作为namenode使用。

Block Caching

通常datanode从disk读block,但是对高频访问的block可以明确缓存到datanode的内存,可以增加读的效率。应用通过添加缓存指令到缓存池来指导namenode缓存文件。

HDFS Federation

每个文件和块的引用是保存在namenode的内存中的,我们可以添加namenode去扩展,每个namenode管理一部分的namespace,相互对立,不影响其它namenode的可用性

HDFS High Availability

secondary namenode元数据并使用辅助namenode防止数据丢失,不能获得高可用的系统。
如果namenode挂掉,新的namenode需要载入namespace image到内存,重现edit log,接收所有datanode上报的block映射关系,才能正常运行。在大型集群,需要耗费30分钟甚至更长的时间。
Hadoop 2添加高可用性(HA)支持,使用一对namenode(active-standby),active namenode挂掉,standby可以很快提供服务。
架构需要做一些改动:

  • namenode必须使用高可用性共享存储来共享编辑日志
  • datanode必须向所以namenode发送block的映射信息
  • 客户端需要配置处理故障转移,对用户透明

The Command-Line Interface

首先安装一个伪分布式的Hadoop,配置文件中的fs.defaultFS设置为hdfs://localhost/,使用HDFS作为Hadoop的文件系统。
dfs.replication设置为1(默认3),不对blocks进行复制。
获得帮助信息

hadoop fs -help 

从本地文件系统复制文件到HDFS

% hadoop fs -copyFromLocal input/docs/quangle.txt \
hdfs://localhost/user/tom/quangle.txt

我们可以省略schema和host,使用定义在core-site.xml中的值

% hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt

创建文件夹以及列出所有的文件

% hadoop fs -mkdir books
% hadoop fs -ls .

Hadoop Filesystems

Hadoop有许多种Filesystems

Interfaces

接口有Java的Native API
有HTTP REST API,通过HTTP访问HDFS有两种方式:
1、直接访问namenode,然后重定向到datanode
2、使用代理服务器,代理负责访问namenode和重定向到datanode,这中方式可以允许更严格的防火墙和带宽限制策略
HTTP接口速度比Native慢

Parallel Copying with distcp

并行复制数据,可以有效代替 hadoop fs -cp
复制一个文件

% hadoop distcp file1 file2

复制目录

% hadoop distcp dir1 dir2

如果dir2不存在,则新建dir2并且把dir1的数据拷贝到dir2。如果dir2存在,则dir1会复制到dir2/dir1。
使用 -overwrite 选项可以避免改变目录结构。
使用 -update 选项只同步更新。

% hadoop distcp -update dir1 dir2

distcp使用MapReduce任务实现,并行的maps执行复制,没有reduce。多达20个maps执行复制,可以通过 -m 参数指定数量。
distcp通常用于两个集群间传递数据

% hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo

–delete 删除namenode1不存在的文件,-p 保存文件参数,例如,block大小、权限等。
如果两个集群HDFS版本不同,可以使用webhdfs协议

% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo
Keeping an HDFS Cluster Balanced

为保证集群中每个结点的blocks数量平衡,建议使用默认的20个map执行distcp,因为如果只有一个map,那么所有复制的block都会在运行map的node建立一个副本。

Chapter 4. YARN

YARN是在Hadoop2引入的,改善MapReduce的实现,并且支持其他分布式计算框架。