hadoop集群部署 - 图文
更新时间:2024-04-02 22:14:01 阅读量: 综合文库 文档下载
这里我们搭建一个由三台机器组成的集群:
172.16.77.15 aboutyun/123456 master 172.16.77.16 aboutyun/123456 slave1 172.16.77.17 aboutyun/123456 slave1 1.1 上面各列分别为IP、user/passwd、hostname
1.2 Hostname可以在/etc/hostname中修改,hostname,hosts的修改详细可以看ubuntu修改hostname
对于三台机器都需要修改:
下面是master的修改:通过命令 01.vi /etc/hosts
02.复制代码然后对你里面的内容修改: 下面修改hostname 01.vi /etc/hostname
复制代码修改为master即可
上面hosts基本都一样,只不过hostname有所差别。 2、打通master到slave节点的SSH无密码登陆
这里面打通无密码登录,很多新手遇到了问题,这里安装的时候,具体的操作,可以查阅其他资料:
Hadoop伪分布安装过程:Hadoop单机环境搭建指南(ubuntu)
CentOS6.4之图解SSH无验证双向登陆配置这是个人总结的哦命令,相信对你有所帮助 个人常用知识总结然后这里在展示一下,authorized_keys是什么样子的:
上面的原理,就是我把工钥放到里面,然后本台机器就可以ssh无密码登录了。如果想彼此无密码登录,那么就需要把彼此的工钥(*.pub)放到authorized_keys里面 然后我们进行下面步骤: 3.1 安装ssh
一般系统是默认安装了ssh命令的。如果没有,或者版本比较老,则可以重新安 装: sudo apt-get install ssh 3.2设置local无密码登陆 具体步骤如下: 第一步:产生密钥
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa 第二部:导入authorized_keys
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
第二部导入的目的是为了无密码等,这样输入如下命令: 01.ssh localhost
复制代码就可以无密码登录了。
下面展示一下hosts的配置,及无密码登录的效果 locahost的配
无密码登录效果:
3.3设置远程无密码登陆 进入master的.ssh目录
scp authorized_keys aboutyun@slave1:~/.ssh/authorized_keys_from_master 进入slave1的.ssh目录
cat authorized_keys_from_master >> authorized_keys 至此,可以在master上面ssh slave1进行无密码登陆了。 【注意】:以上操作在每台机器上面都要进行。 这里在强调一下原理:
就是把工钥放到里面,然后本台机器就可以ssh无密码登录了。如果想彼此无密码登录,那么就需要把彼此的工钥(*.pub)放到authorized_keys里面 ------------------------------------------------------------------------ 根据上面的资料相信你能得到互通,这里展现一下效果: 上面是slave1无密码登录master 这里是master无密码登录slave1
----------------------------------------------------------------------- 4、安装jdk
安装JDK还是比较简单的,这里 4.1、下载jdk
http://yunpan.cn/QiujtEVgRTJ4S 访问密码 b488 4.2、安装jdk(这里以.tar.gz版本,64位系统为例)
jdk的安装可以参考Hadoop伪分布安装过程:Hadoop单机环境搭建指南(ubuntu) 这里直接解压到了/usr/jdk1.7下面: 上面首先第一步:
至此,jkd安装完毕,下面配置环境变量 一、PATH配置
这里提供一个简单的方法: 通过下面命令
1.export PATH=$PATH:/usr/java/jdk1.7.0_51/bin 通过cat命令,可以查看
2.为了保证生效执行下面命令 01.source /etc/environment 二、CLASSTH配置
上面只是配置了PATH,还需在配置CLASSTH
export CLASSPATH=.:/usr/java/jdk1.7.0_51/jre/lib 执行配置完毕
如果不起作用,采用通过下面配置: java.sh配置
因为重启之后,很有会被还原,下面还需要配置java.sh 这里可以通过 cd /etc/profile.d vi java.sh
把下面两行放到java.sh
export PATH=$PATH:/usr/java/jdk1.7.0_51/bin export CLASSPATH=.:/usr/java/jdk1.7.0_51/jre/lib 保存。这样就配置完毕了。 执行下面命令: 01. source java.sh 复制代码
现在在执行 java -version就ok了
【注意】每台机器执行相同操作,最后将java安装在相同路径下 三、关闭每台机器的防火墙 ufw disable (重启生效)
第三部分 Hadoop 2.2安装过程 一、需要注意的问题
hadoop2.2的配置还是比较简单的,但是可能会遇到各种各样的问题。最常讲的就是看不到进程。 看
1.你的配置文件有问题。
对于配置文件,主机名,空格之类的这些都不要带上。仔细检查 2.Linux的权限不正确。
最常出问题的是core-site.xml,与hdfs-site.xml。 core-site.xml
01.
02.
03.
04.
复制代码说一下上面参数的含义,这里是hadoop的临时文件目录,file的含义是使用本地目录。也就是使用的是Linux的目录,一定确保下面目录
01./home/aboutyun/tmp
复制代码的权限所属为你创建的用户。并且这里面我也要会变通,aboutyun,为我创建的用户名,如果你创建了zhangsan或则lisi,那么这个目录就会变 01./home/zhangsan/tmp
复制代码这里不熟悉,是因为对Linux的不熟悉的原因。这里在来张图: 注意:1和2对比。如果你所创建的tmp属于root,那么你会看不到进程。 hdfs-site.xml
同样也是:要注意下面,你是需要改成自己的用户名的
上面讲完,我们开始配置
hadoop集群中每个机器上面的配置基本相同,所以我们先在master上面进行配置部署,然后再复制到其他节点。所以这里的安装过程相当于在每台机器上面都要执行。
【注意】:master和slaves安装的hadoop路径要完全一样,用户和组也要完全一致 1、 解压文件
将第一部分中下载的
01.tar zxvf hadoop-2.2.0_x64.tar.gz 复制代码
01.mv hadoop-2.2.0 hadoop 复制代码
解压到/usr路径下 并且重命名,效果如下
2、 hadoop配置过程
配置之前,需要在master本地文件系统创建以下文件夹: ~/dfs/name ~/dfs/data ~/tmp
这里文件权限:创建完毕,你会看到红线部分,注意所属用户及用户组。如果不再新建的用户组下面,可以使用下面命令来修改:让你真正了解chmod和chown命令的用法 这里要涉及到的配置文件有7个:
~/hadoop-2.2.0/etc/hadoop/hadoop-env.sh ~/hadoop-2.2.0/etc/hadoop/yarn-env.sh ~/hadoop-2.2.0/etc/hadoop/slaves
~/hadoop-2.2.0/etc/hadoop/core-site.xml ~/hadoop-2.2.0/etc/hadoop/hdfs-site.xml ~/hadoop-2.2.0/etc/hadoop/mapred-site.xml ~/hadoop-2.2.0/etc/hadoop/yarn-site.xml
以上文件默认不存在的,可以复制相应的template文件获得。下面举例: 配置文件1:hadoop-env.sh
修改JAVA_HOME值(export JAVA_HOME=/usr/jdk1.7) 配置文件2:yarn-env.sh
修改JAVA_HOME值(export JAVA_HOME=/usr/jdk1.7) 配置文件3:slaves (这个文件里面保存所有slave节点) 写入以下内容:
配置文件4:core-site.xml 01.
03.
04.
07.
10.
11.
12.
13.
16.
20.
复制代码
配置文件5:hdfs-site.xml 01.
03.
07.
08.
11.
12.
15.
19.
复制代码
配置文件6:mapred-site.xml 01.
06.
10.
复制代码
配置文件7:yarn-site.xml 01.
03.
06.
08.
11.
15.
16.
19.
23.
27.
28.
3、复制到其他节点
上面配置完毕,我们基本上完成了90%了剩下就是复制。我们可以把整个hadoop复制过去:使用如下命令
01.sudo scp -r /usr/hadoop aboutyun@slave1:~/
复制代码这里记得先复制到home/aboutyun下面,然后在转移到/usr下面。
后面我们会经常遇到问题,经常修改配置文件,所以修改完一个配置文件后,其他节点都需要修改,这里附上脚本操作方便: 一、节点之间传递数据: 第一步:vi scp.sh
第二步:把下面内容放到里面(记得修改下面红字部分,改成自己的) #!/bin/bash #slave1
scp /usr/hadoop/etc/hadoop/core-site.xml aboutyun@slave1:~/
scp /usr/hadoop/etc/hadoop/hdfs-site.xml aboutyun@slave1:~/ #slave2
scp /usr/hadoop/etc/hadoop/core-site.xml aboutyun@slave2:~/ scp /usr/hadoop/etc/hadoop/hdfs-site.xml aboutyun@slave2:~/ 第三步:保存scp.sh
第四步:bash scp.sh执行 二、移动文件夹: 可以自己写了。 4.配置环境变量 第一步:
01.vi /etc/environment
复制代码第二步:添加如下内容:记得如果你的路径改变了,你也许需要做相应的改变。 4、启动验证 4.1 启动hadoop 格式化namenode:
01.hdfs namenode –format 复制代码或则使用下面命令: 01.hadoop namenode format 复制代码 启动hdfs: 01.start-dfs.sh 复制代码
此时在master上面运行的进程有: namenode
secondarynamenode
slave节点上面运行的进程有:datanode
启动yarn: 01.start-yarn.sh
复制代码我们看到如下效果: master有如下进程: slave1有如下进程
此时hadoop集群已全部配置完成!!!
【注意】:而且所有的配置文件
然后我们输入:(这里有的同学没有配置hosts,所以输出master访问不到,如果访问不到输入ip地址即可) 01.http://master:8088/ 复制代码
如何修改hosts:
win7 进入下面路径:
01.C:\\Windows\\System32\\drivers\\etc 复制代码找打hosts
然后打开,进行如下配置即可看到 看到下图: 到此全部完毕。
使用hadoop集群,更详细内容,可以查看
hadoop2.X使用手册1:通过web端口查看主节点、slave1节点及集群运行状态 hadoop2.X使用手册2:如何运行自带wordcount protected void setup(Context context) throws IOException, InterruptedException { // NOTHING } protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup(Context context) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
由上面的代码,我们可以了解到,当调用到map时,通常会先执行一个setup函数, 最后会执行一个cleanup函数。而默认情况下,这两个函数的内容都是nothing。
因此,当map方法不符合应用要求时,可以试着通过增加setup和cleanup的内容来满足应用的需求。
?Mapreduce shuffle和排序
Mapreduce为了确保每个reducer的输入都按键排序。系统执行排序的过程-----将map的输出作为输入传给reducer 称为shuffle。学习shuffle是如何工作的有助于我们理解mapreduce工作机制。shuffle属于hadoop不断被优化和改进的代码库的一部分。从许多方面看,shuffle是mapreduce的“心脏”,是奇迹出现的地方。
下面这张图介绍了mapreduce里shuffle的工作原理:
从图可以看出shuffle发生在map端和reduce端之间,将map端的输出与reduce端的输入对应。
map 端
map函数开始产生输出时,并不是简单地将它输出到磁盘。这个过程更复杂,利用缓冲的方式写到内存,并出于效率的考虑进行预排序。shuffle原理图就看出来。每个map任务都有一个环形内存缓冲区,用于存储任务的输出。默认情况是100MB,可以通过io.sort.mb属性调整。一旦缓冲内容达到阀值(io.sort.spill.percent,默认0.80,或者80%),一个后台线程开始把内容写到磁盘中。在写磁盘过程中,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到写磁盘过程完成。在写磁盘之前,线程首先根据数据
最终要传送到reducer把数据划分成相应的分区,在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。 reducer通过HTTP方式得到输出文件的分区。用于文件分区的工作线程的数量由任务的tracker.http.threads属性控制,此设置针对每个tasktracker,而不是针对每个map任务槽。默认值是40,在运行大型作业的大型集群上,此值可以根据需要调整。 reducer端
map端输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker需要为分区文件运行reduce任务。更进一步,reduce任务需要集群上若干个map任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,所以能并行取得map输出。默认值是5个线程,可以通过设置mapred.reduce.parallel.copies属性改变。在这个过程中我们由于要提到一个问题,reducer如何知道要从那个tasktracker取得map输出呢?map任务成功完成之后,它们通知其父tasktracker状态已更新,然后tasktracker通知jobtracker。这些通知都是通过心跳机制传输的。因此,对于指定作业,jobtracker知道map输出和tasktracker之间的映射关系。reduce中的一个线程定期询问jobtracker以便获得map输出的位置,直到它获得所有输出位置。
由于reducer可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们。相反,tasktracker会等待,直到jobtracker告知它可以删除map输出,这是作业完成后执行的。
如果map输出相当小,则会被复制到reduce tasktracker的内存(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制),否则,map输出被复制到磁盘。一旦内存缓冲区达到阀值大小(由mapred.job.shuffle.merge.percent决定)或达到map输出阀值(mapred.inmem.merge.threshold控制),则合并后溢出写到磁盘中。
随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出都必须在内存中被解压缩。
复制完所有map输出被复制期间,reduce任务进入排序阶段(sort phase 更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10 (10默认值设置,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。 在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将5个文件合并成一个已排序的文件作为最后一趟。最后的合并既可来自内存和磁盘片段。
在reduce阶段,对已排序输出中的每个键都要调用reduce函数。此阶段的输出直接写到输出文件系统中。
Hbase
引言
跟Hadoop的无缝集成使得使用MapReduce对HBase的数据进行分布式计算
非常方便,本文将以前面的blog示例,介绍HBase下MapReduce开发要点。很好理解本文前提是你对Hadoop MapReduce有一定的了解。 HBase MapReduce核心类介绍
首先一起来回顾下MapReduce的基本编程模型,
可以看到最基本的是通过Mapper和Reducer来处理KV对,Mapper的输出经Shuffle及Sort后变为Reducer的输入。除了Mapper和Reducer外,另外两个重要的概念是InputFormat和OutputFormat,定义了Map-Reduce的输入和输出相关的东西。HBase通过对这些类的扩展(继承)来方便MapReduce任务来读写HTable中的数据。
实例分析
我们还是以最初的blog例子来进行示例分析,业务需求是这样:找到具有相同兴趣的人,我们简单定义为如果author之间article的tag相同,则认为两者有相同兴趣,将分析结果保存到HBase。除了上面介绍的blog表外,我们新增一张表tag_friend,RowKey为tag,Value为authors,大概就下面这样。
我们省略了一些跟分析无关的Column数据,上面的数据按前面描述的业务需求经过MapReduce分析,应该得到下面的结果 实际的运算过程分析如下 代码实现
有了上面的分析,代码实现就比较简单了。只需以下几步
定义Mapper类继承TableMapper,map的输入输出KV跟上面的分析一致。 01.public static class Mapper extends TableMapper
04.public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException {
05.ImmutableBytesWritable value = null;
06.String[] tags = null;
07.for (KeyValue kv : values.list()) {
08.if (\
09.&& \10.value = new ImmutableBytesWritable(kv.getValue()); 11.}
12.if (\13.&& \14.tags = Bytes.toString(kv.getValue()).split(\15.} 16.}
17.for (int i = 0; i < tags.length; i++) {
18.ImmutableBytesWritable key = new ImmutableBytesWritable( 19.Bytes.toBytes(tags[i].toLowerCase())); 20.try {
21.context.write(key,value);
22.} catch (InterruptedException e) { 23.throw new IOException(e); 24.}25.}26.}27.}
复制代码定义Reducer类继承TableReducer,reduce的输入输出KV跟上面分析的一致。
01.public static class Reducer extends TableReducer
03.public void reduce(ImmutableBytesWritable key,Iterable values, 04.Context context) throws IOException, InterruptedException { 05.String friends=\
06.for (ImmutableBytesWritable val : values) {
07.friends += (friends.length()>0?\08.}
09.Put put = new Put(key.get());
10.put.add(Bytes.toBytes(\11.Bytes.toBytes(friends));
12.context.write(key, put);13.}14.}
复制代码在提交作业时设置inputFormat为TableInputFormat,设置outputFormat为TableOutputFormat,可以借助TableMapReduceUtil类来简化编码。
01.public static void main(String[] args) throws Exception { 02.Configuration conf = new Configuration(); 03.conf = HBaseConfiguration.create(conf);
04.Job job = new Job(conf, \05.job.setJarByClass(FindFriend.class);
06.Scan scan = new Scan();
07.scan.addColumn(Bytes.toBytes(\08.scan.addColumn(Bytes.toBytes(\09.TableMapReduceUtil.initTableMapperJob(\scan,FindFriend.Mapper.class,
10.ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
11.TableMapReduceUtil.initTableReducerJob(\class, job);
12.System.exit(job.waitForCompletion(true) ? 0 : 1);13.} 复制代码小结
本文通过实例分析演示了使用MapReduce分析HBase的数据,需要注意的这只是一种常规的方式(分析表中的数据存到另外的表中),实际上不局限于此,不过其他方式跟此类似。如果你进行到这里,你肯定想要马上运行它看看结果,在下篇文章中将介绍如何在模拟集群环境下本机运行MapReduce任务进行测试。
hadoop对于压缩文件的支持
hadoop对于压缩格式的是透明识别,我们的MapReduce任务的执行是透明的,hadoop能够自动为我们 将压缩的文件解压,而不用我们去关心。 如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压。
hadoop对每个压缩格式的支持,详细见下表:
压缩格式 工具 算法 文件扩展名 多文件 可分割性 DEFLATE 无 DEFLATE .deflate 不 不 gzip gzip DEFLATE .gz 不 不
ZIP zip DEFLATE .zip 是 是,在文件范围内 bzip2 bzip2 bzip2 .bz2 不 是 LZO lzop LZO .lzo 不 是 如果压缩的文件没有扩展名,则需 要在执行mapreduce任务的时候指定输入格式. hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper /usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 -jobconf mapred.reduce.tasks=1 *-inputformat org.apache.hadoop.mapred.LzoTextInputFormat*
hadoop下各种压缩算法的压缩比,压缩时间,解压时间见下表:
压缩算法 原始文件大小 压缩后的文件大小 压缩速度 解压缩速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s
LZO 8.3GB 2.9GB 49.3MB/S 74.6MB/s hadoop各种压缩算法的优缺点简述
在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。考虑存储在HDFS中的未压缩的文件,其大小为1GB,HDFS的块大小为64MB,所以该文件将被存储为16块,将此文件用作输入的MapReduce作业会创建1个输人分片(split ,也称为“分块”。对于block,我们统一称为“块”。)每个分片都被作为一个独立map任务的输入单独进行处理。
现在假设。该文件是一个gzip格式的压缩文件,压缩后的大小为1GB。和前面一样,HDFS将此文件存储为16块。然而,针对每一块创建一个分块是没有用的,因为不可能从gzip数据流中的任意点开始读取,map任务也不可能独立于其他分块只读取一个分块中的数据。gzip格式使用DEFLATE来存储压缩过的数据,DEFLATE将数据作为一系列压缩过的块进行存储。问题是,每块的开始没有指定用户在数据流中任意点定位到下一个块的起始位置,而是其自身与数据流同步。因此,gzip不支持分割(块)机制。
在这种情况下,MapReduce不分割gzip格式的文件,因为它知道输入是gzip压缩格式的(通过文件扩展名得知),而gzip压缩机制不支持分割机制。这样是以牺牲本地化为代价:一个map任务将处理16个HDFS块。大都不是map的本地数据。与此同时,因为map任务少,所以作业分割的粒度不够细,从而导致运行时间变长。
在我们假设的例子中,如果是一个LZO格式的文件,我们会碰到同样的问题,因为基本压缩格式不为reader提供方法使其与流同步。但是,bzip2格式的压缩文件确实提供了块与块之间的同步标记(一个48位的PI近似值),因此它支持分割机制。
对于文件的收集,这些问题会稍有不同。ZIP是存档格式,因此它可以将多个文件合并为一个ZIP文件。每个文件单独压缩,所有文档的存储位置存储在ZIP文件的尾部。这个属性表明ZIP文件支持文件边界处分割,每个分片中包括ZIP压缩文件中的一个或多个文件
在MapReduce我们应该使用哪种压缩格式
根据应用的具体情况来决定应该使用哪种压缩格式。就个人而言,更趋向于使用最快的速度压缩,还是使用最优的空间压缩?一般来说,应该尝试不同的策略,并用具有代表性的数据集进行测试,从而找到最佳方法。对于那些大型的、没有边界的文件,如日志文件,有以下选项。 存储未压缩的文件。
使用支持分割机制的压缩格式,如bzip2。
在应用中将文件分割成几个大的数据块,然后使用任何一种支持的压缩格式单独压缩每个数据块(可不用考虑压缩格式是否支持分割)。在这里,需要选择数据块的大小使压缩后的数据块在大小上相当于HDFS的块。
使用支持压缩和分割的Sequence File(序列文件)。
对于大型文件,不要对整个文件使用不支持分割的压缩格式,因为这样会损失本地性优势,从而使降低MapReduce应用的性能 hadoop支持Splittable压缩lzo
在hadoop中使用lzo的压缩算法可以减小数据的大小和数据的磁盘读写时间,在HDFS中存储压缩数据,可以使集群能保存更多的数据,延长集群的使用寿命。不仅如此,由于mapreduce作业通常瓶颈都在IO上,存储压缩数据就意味这更少的IO操作,job运行更加的高效。
但是在hadoop上使用压缩也有两个比较麻烦的地方:第一,有些压缩格式不能被分块,并行的处理,比如gzip。第二,另外的一些压缩格式虽然支持分块处理,但是解压的过程非常的缓慢,使job的瓶颈转移到了cpu上,例如bzip2。
如果能够拥有一种压缩算法,即能够被分块,并行的处理,速度也非常的快,那就非常的理想。这种方式就是lzo。
lzo的压缩文件是由许多的小的blocks组成(约256K),使的hadoop的job可以根据block的划分来split job。不仅如此,lzo在设计时就考虑到了效率问题,它的解压速度是gzip的两倍,这就让它能够节省很多的磁盘读写,它的压缩比的不如gzip,大约压缩出来的文件比gzip压缩的大一半,但是这样仍然比没有经过压缩的文件要节省20%-50%的存储空间,这样就可以在效率上大大的提高job执行的速度。
hadoop下lzo配置文档参考http://www.tech126.com/hadoop-lzo/ 如何在MapReduce中使用压缩 1.输入的文件的压缩
如果输入的文件是压缩过的,那么在被MapReduce读取时,它们会被自动解压,根据文件扩展名来决定应该使用哪一个压缩解码器。 2.MapReduce作业的输出的压缩
如果要压缩MapReduce作业的输出,请在作业配置文件中将mapred.output.compress属性设置为true。将mapred.output.compression.codec属性设置为自己打算使用的压缩编码/解码器的类名。
如果为输出使用了一系列文件,可以设置mapred.output.compression.type属性来控制压缩类型,默认为RECORD,它压缩单独的记录。将它改为BLOCK,则可以压缩一组记录。由于它有更好的压缩比,所以推荐使用。 3.map作业输出结果的压缩
即使MapReduce应用使用非压缩的数据来读取和写入,我们也可以受益于压缩map阶段的中间输出。因为map作业的输出会被写入磁盘并通过网络传输到reducer节点,所以如果使用LZO之类的快速压缩,能得到更好的性能,因为传输的数据量大大减少了。以下代码显示了启用rnap输出压缩和设置压缩格式的配置属性。
[Java] 纯文本查看 复制代码 01
conf.setCompressMapOutput(true); 02
conf.setMapOutputCompressorClass(GzipCodec.class); 本地压缩库
考虑到性能,最好使用一个本地库(native library)来压缩和解压。例如,在一个测试中,使用本地gzip压缩库减少了解压时间50%,压缩时间大约减少了10%(与内置的Java实现相比较)。表4-4展示了Java和本地提供的每个压缩格式的实现。井不是所有的格式都有本地实现(例如bzip2压缩),而另一些则仅有本地实现(例如LZO)。
压缩格式 Java实现 本地实现 DEFLATE 是 是 gzip 是 是 bzip2 是 否 LZO 否 是
Hadoop带有预置的32位和64位Linux的本地压缩库,位于库/本地目录。对于其他平台,需要自己编译库,具体请参见Hadoop的维基百科http://wiki.apache.org/hadoop/NativeHadoop。
本地库通过Java系统属性java.library.path来使用。Hadoop的脚本在bin目录中已经设置好这个属性,但如果不使用该脚本,则需要在应用中设置属性。
默认情况下,Hadoop会在它运行的平台上查找本地库,如果发现就自动加载。这意味着不必更改任何配置设置就可以使用本地库。在某些情况下,可能希望禁用本地库,比如在调试压缩相关问题的时候。为此,将属性hadoop.native.lib设置为false,即可确保内置的Java等同内置实现被使用(如果它们可用的话)。
Hive
熟练写出增删改查(面试必备)
创建表:
hive> CREATE TABLE pokes (foo INT, bar STRING);
Creates a table called pokes with two columns, the first being an integer and the other a string
创建一个新表,结构与其他一样
hive> create table new_table like records;
创建分区表:
hive> create table logs(ts bigint,line string) partitioned by (dt String,country String);
加载分区表数据:
hive> load data local inpath '/home/hadoop/input/hive/partitions/file1' into table logs partition (dt='2001-01-01',country='GB');
展示表中有多少分区: hive> show partitions logs;
展示所有表:
hive> SHOW TABLES; lists all the tables hive> SHOW TABLES '.*s';
lists all the table that end with 's'. The pattern matching follows Java regular expressions. Check out this link for documentation
显示表的结构信息
hive> DESCRIBE invites;
shows the list of columns
更新表的名称:
hive> ALTER TABLE source RENAME TO target;
添加新一列
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
删除表:
hive> DROP TABLE records;
删除表中数据,但要保持表的结构定义 hive> dfs -rmr /user/hive/warehouse/records;
从本地文件加载数据:
hive> LOAD DATA LOCAL INPATH '/home/hadoop/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
显示所有函数: hive> show functions;
查看函数用法:
hive> describe function substr;
查看数组、map、结构
hive> select col1[0],col2['b'],col3.c from complex;
内连接:
hive> SELECT sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);
查看hive为某个查询使用多少个MapReduce作业
hive> Explain SELECT sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);
外连接:
hive> SELECT sales.*, things.* FROM sales LEFT OUTER JOIN things ON (sales.id = things.id);
hive> SELECT sales.*, things.* FROM sales RIGHT OUTER JOIN things ON (sales.id = things.id);
hive> SELECT sales.*, things.* FROM sales FULL OUTER JOIN things ON (sales.id = things.id);
in查询:Hive不支持,但可以使用LEFT SEMI JOIN
hive> SELECT * FROM things LEFT SEMI JOIN sales ON (sales.id = things.id);
Map连接:Hive可以把较小的表放入每个Mapper的内存来执行连接操作
hive> SELECT /*+ MAPJOIN(things) */ sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);
INSERT OVERWRITE TABLE ..SELECT:新表预先存在
hive> FROM records2
> INSERT OVERWRITE TABLE stations_by_year SELECT year, COUNT(DISTINCT station) GROUP BY year
> INSERT OVERWRITE TABLE records_by_year SELECT year, COUNT(1) GROUP BY year
> INSERT OVERWRITE TABLE good_records_by_year SELECT year, COUNT(1) WHERE temperature != 9999 AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = 5 OR quality = 9) GROUP BY year;
CREATE TABLE ... AS SELECT:新表表预先不存在
hive>CREATE TABLE target AS SELECT col1,col2 FROM source;
创建视图:
hive> CREATE VIEW valid_records AS SELECT * FROM records2 WHERE temperature !=9999;
查看视图详细信息:
hive> DESCRIBE EXTENDED valid_records;
-------------------------------------------------------------------------------------------------------------------------------------
传统数据库: 添加:
insert into 表名 values(); 修改:
update 表名 set a=b where b=c; 删除:
delete from 表名where a=b; 查询:
select * from 表名 where a=b;
Hive可以运行保存在文件里面的一条或多条的语句,只要用-f参数,一般情况下,保存这些Hive查询语句的文件通常用.q或者.hql后缀名,但是这不是必须的,你也可以保存你想要的后缀名。假设test文件里面有一下的Hive查询语句: select * from p limit 10; select count(*) from p;
那么我们可以用下面的命令来查询: [wyp@wyp hive-0.11.0-bin]$ bin/hive -f test ........这里省略了一些输出........... OK 196 242 3 881250949 20131102 jx 186 302 3 891717742 20131102 jx 22 377 1 878887116 20131102 jx 244 51 2 880606923 20131102 jx 166 346 1 886397596 20131102 jx 298 474 4 884182806 20131102 jx 115 265 2 881171488 20131102 jx 253 465 5 891628467 20131102 jx 305 451 3 886324817 20131102 jx 6 86 3 883603013 20131102 jx Time taken: 4.386 seconds, Fetched: 10 row(s) ........这里省略了一些输出........... OK 4000000 Time taken: 16.284 seconds, Fetched: 1 row(s) 如果你配置好了Hive shell的路径,你可以用SOURCE命令来运行那个查询文件: [wyp@wyp hive-0.11.0-bin]$ hive hive> source /home/wyp/Documents/test; ........这里省略了一些输出........... OK 196 242 3 881250949 20131102 jx 186 302 3 891717742 20131102 jx 22 377 1 878887116 20131102 jx 244 51 2 880606923 20131102 jx 166 346 1 886397596 20131102 jx 298 474 4 884182806 20131102 jx 115 265 2 881171488 20131102 jx 253 465 5 891628467 20131102 jx 305 451 3 886324817 20131102 jx 6 86 3 883603013 20131102 jx Time taken: 4.386 seconds, Fetched: 10 row(s) ........这里省略了一些输出........... OK 4000000 Time taken: 16.284 seconds, Fetched: 1 row(s)
Hive的几种常见的数据导入方式 这里介绍四种:
(1)、从本地文件系统中导入数据到Hive表; (2)、从HDFS上导入数据到Hive表;
(3)、从别的表中查询出相应的数据并导入到Hive表中;
(4)、在创建表的时候通过从别的表中查询出相应的记录并插入到所创建的表中。 一、从本地文件系统中导入数据到Hive表 先在Hive里面创建好表,如下: hive> create table wyp > (id int, name string, > age int, tel string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '\\t'
> STORED AS TEXTFILE; OK Time taken: 2.832 seconds 这个表很简单,只有四个字段,具体含义我就不解释了。本地文件系统里面有个/home/wyp/wyp.txt文件,内容如下: [wyp@master ~]$ cat wyp.txt 1 wyp 25 13188888888888 2 test 30 13888888888888 3 zs 34 899314121 wyp.txt文件中的数据列之间是使用\\t分割的,可以通过下面的语句将这个文件里面的数据导入到wyp表里面,操作如下:
hive> load data local inpath 'wyp.txt' into table wyp; Copying data from file:/home/wyp/wyp.txt Copying file: file:/home/wyp/wyp.txt Loading data to table default.wyp Table default.wyp stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 67] OK Time taken: 5.967 seconds 这样就将wyp.txt里面的内容导入到wyp表里面去了,可以到wyp表的数据目录下查看,如下命令: hive> dfs -ls /user/hive/warehouse/wyp ; Found 1 items -rw-r--r--3 wyp supergroup 67 2014-02-19 18:23 /hive/warehouse/wyp/wyp.txt
需要注意的是:
和我们熟悉的关系型数据库不一样,Hive现在还不支持在insert语句里面直接给出一组记录的文字形式,也就是说,Hive并不支持INSERT INTO ?. VALUES形式的语句 二、HDFS上导入数据到Hive表
从本地文件系统中将数据导入到Hive表的过程中,其实是先将数据临时复制到HDFS的一个目录下(典型的情况是复制到上传用户的HDFS home目录下,比如/home/wyp/),然后再将数据从那个临时目录下移动(注意,这里说的是移动,不是复制!)到对应的Hive表的数据目录里面。既然如此,那么Hive肯定支持将数据直接从HDFS上的一个目录移动到相应Hive表的数据目录下,假设有下面这个文件/home/wyp/add.txt,具体的操作如下: [wyp@master /home/q/hadoop-2.2.0]$ bin/hadoop fs -cat /home/wyp/add.txt 5 wyp1 23 131212121212 6 wyp2 24 134535353535 7 wyp3 25 132453535353 8 wyp4 26 154243434355 上面是需要插入数据的内容,这个文件是存放在HDFS上/home/wyp目录(和一中提到的不同,一中提到的文件是存放在本地文件系统上)里面,我们可以通过下面的命令将这个
文件里面的内容导入到Hive表中,具体操作如下:
hive> load data inpath '/home/wyp/add.txt' into table wyp; Loading data to table default.wyp Table default.wyp stats: [num_partitions: 0, num_files: 2, num_rows: 0, total_size: 215] OK Time taken: 0.47 seconds hive> select * from wyp; OK 5 wyp1 23 131212121212 6 wyp2 24 134535353535 7 wyp3 25 132453535353 8 wyp4 26 154243434355 1 wyp 25 13188888888888 2 test 30 13888888888888 3 zs 34 899314121 Time taken: 0.096 seconds, Fetched: 7 row(s) 从上面的执行结果我们可以看到,数据的确导入到wyp表中了!请注意load data inpath ‘/home/wyp/add.txt’ into table wyp;里面是没有local这个单词的,这个是和一中的区别。 三、从别的表中查询出相应的数据并导入到Hive表中 假设Hive中有test表,其建表语句如下所示: hive> create table test( > id int, name string > ,tel string) > partitioned by > (age int) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '\\t' > STORED AS TEXTFILE; OK Time taken: 0.261 seconds 大体和wyp表的建表语句类似,只不过test表里面用age作为了分区字段。对于分区,这里在做解释一下:
分区:在Hive中,表的每一个分区对应表下的相应目录,所有分区的数据都是存储在对应的目录中。比如wyp表有dt和city两个分区,则对应dt=20131218,city=BJ对应表的目录为/user/hive/warehouse/dt=20131218/city=BJ,所有属于这个分区的数据都存放在这个目录中。
下面语句就是将wyp表中的查询结果并插入到test表中: hive> insert into table test > partition (age='25') > select id, name, tel > from wyp; ##################################################################### 这里输出了一堆Mapreduce任务信息,这里省略 ##################################################################### Total MapReduce CPU Time Spent: 1 seconds 310 msec OK Time taken: 19.125 seconds hive> select * from test; OK 5 wyp1 131212121212 25 6 wyp2 134535353535 25 7 wyp3 132453535353 25 8 wyp4 154243434355 25 1 wyp 13188888888888 25 2 test 13888888888888 25 3 zs 899314121 25 Time taken: 0.126 seconds, Fetched: 7 row(s) 这里做一下说明:
我们知道我们传统数据块的形式insert into table values(字段1,字段2),这种形式hive是不支持的。 通过上面的输出,我们可以看到从wyp表中查询出来的东西已经成功插入到test表中去了!如果目标表(test)中不存在分区字段,可以去掉partition (age=’25′)语句。当然,我们也可以在select语句里面通过使用分区值来动态指明分区: hive> set hive.exec.dynamic.partition.mode=nonstrict; hive> insert into table test > partition (age) > select id, name, > tel, age > from wyp; ##################################################################### 这里输出了一堆Mapreduce任务信息,这里省略 ##################################################################### Total MapReduce CPU Time Spent: 1 seconds 510 msec OK Time taken: 17.712 seconds hive> select * from test; OK 5 wyp1 131212121212 23 6 wyp2 134535353535 24 7 wyp3 132453535353 25 1 wyp 13188888888888 25 8 wyp4 154243434355 26 2 test 13888888888888 30 3 zs 899314121 34 Time taken: 0.399 seconds, Fetched: 7 row(s) 这种方法叫做动态分区插入,但是Hive中默认是关闭的,所以在使用前需要先把hive.exec.dynamic.partition.mode设置为nonstrict。当然,Hive也支持insert overwrite方式来插入数据,从字面我们就可以看出,overwrite是覆盖的意思,是的,执行完这条语句的时候,相应数据目录下的数据将会被覆盖!而insert into则不会,注意两者之间的区别。例子如下:
hive> insert overwrite table test > PARTITION (age) > select id, name, tel, age > from wyp; 更可喜的是,Hive还支持多表插入,什么意思呢?在Hive中,我们可以把insert语句倒过来,把from放在最前面,它的执行效果和放在后面是一样的,如下: hive> show create table test3; OK CREATE TABLE test3( id int, name string) Time taken: 0.277 seconds, Fetched: 18 row(s) hive> from wyp > insert into table test > partition(age) > select id, name, tel, age > insert into table test3 > select id, name > where age>25; hive> select * from test3; OK 8 wyp4 2 test 3 zs Time taken: 4.308 seconds, Fetched: 3 row(s) 可以在同一个查询中使用多个insert子句,这样的好处是我们只需要扫描一遍源表就可以生成多个不相交的输出。这个很酷吧!
四、在创建表的时候通过从别的表中查询出相应的记录并插入到所创建的表中
在实际情况中,表的输出结果可能太多,不适于显示在控制台上,这时候,将Hive的查询输出结果直接存在一个新的表中是非常方便的,我们称这种情况为CTAS(create table .. as select)如下: hive> create table test4 > as > select id, name, tel > from wyp; hive> select * from test4; OK 5 wyp1 131212121212 6 wyp2 134535353535 7 wyp3 132453535353 8 wyp4 154243434355 1 wyp 13188888888888 2 test 13888888888888 3 zs 899314121 Time taken: 0.089 seconds, Fetched: 7 row(s) 数据就插入到test4表中去了,CTAS操作是原子的,因此如果select查询由于某种原因而失败,新表是不会创建的!
下面说说怎么创建索引: 1、先创建表: hive> create table user( id int, name string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '\\t' > STORED AS TEXTFILE; 2、导入数据: hive> load data local inpath '/export1/tmp/wyp/row.txt' > overwrite into table user; 3、创建索引之前测试 hive> select * from user where id =500000; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Cannot run job locally: Input Size (= 356888890) is larger than hive.exec.mode.local.auto.inputbytes.max (= 134217728) Starting Job = job_1384246387966_0247, Tracking URL = http://l-datalogm1.data.cn1:9981/proxy/application_1384246387966_0247/ Kill Command=/home/q/hadoop/bin/hadoop job -kill job_1384246387966_0247 Hadoop job information for Stage-1: number of mappers:2; number of reducers:0 2013-11-13 15:09:53,336 Stage-1 map = 0%, reduce = 0% 2013-11-13 15:09:59,500 Stage-1 map=50%,reduce=0%, Cumulative CPU 2.0 sec 2013-11-13 15:10:00,531 Stage-1 map=100%,reduce=0%, Cumulative CPU 5.63 sec 2013-11-13 15:10:01,560 Stage-1 map=100%,reduce=0%, Cumulative CPU 5.63 sec MapReduce Total cumulative CPU time: 5 seconds 630 msec Ended Job = job_1384246387966_0247 MapReduce Jobs Launched:
Job 0: Map: 2 Cumulative CPU: 5.63 sec HDFS Read: 361084006 HDFS Write: 357 SUCCESS Total MapReduce CPU Time Spent: 5 seconds 630 msec OK 500000 wyp. Time taken: 14.107 seconds, Fetched: 1 row(s) 4、对user创建索引 hive> create index user_index on table user(id) > as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' > with deferred rebuild > IN TABLE user_index_table; hive> alter index user_index on user rebuild; hive> select * from user_index_table limit 5; 0 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [0] 1 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [352] 2 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [704] 3 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [1056] 4 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [1408] Time taken: 0.244 seconds, Fetched: 5 row(s) 这样就对user表创建好了一个索引。 5、对创建索引后的user再进行测试 hive> select * from user where id =500000; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Cannot run job locally: Input Size (= 356888890) is larger than hive.exec.mode.local.auto.inputbytes.max (= 134217728) Starting Job = job_1384246387966_0247, Tracking URL = http://l-datalogm1.data.cn1:9981/proxy/application_1384246387966_0247/ Kill Command=/home/q/hadoop/bin/hadoop job -kill job_1384246387966_0247 Hadoop job information for Stage-1: number of mappers:2; number of reducers:0 2013-11-13 15:23:12,336 Stage-1 map = 0%, reduce = 0% 2013-11-13 15:23:53,240 Stage-1 map=50%,reduce=0%, Cumulative CPU 2.0 sec 2013-11-13 15:24:00,253 Stage-1 map=100%,reduce=0%, Cumulative CPU 5.27 sec 2013-11-13 15:24:01,650 Stage-1 map=100%,reduce=0%, Cumulative CPU 5.27 sec MapReduce Total cumulative CPU time: 5 seconds 630 msec Ended Job = job_1384246387966_0247 MapReduce Jobs Launched: Job 0: Map: 2 Cumulative CPU: 5.63 sec HDFS Read: 361084006 HDFS Write: 357 SUCCESS Total MapReduce CPU Time Spent: 5 seconds 630 msec OK 500000 wyp. Time taken: 13.042 seconds, Fetched: 1 row(s) 时间用了13.042s这和没有创建索引的效果差不多。
Hive配置文件中配置项的含义详解(收藏版) [复制链接]
这里面列出了hive几乎所有的配置项,下面问题只是说出了几种配置项目的作用。更多内容,可以查看内容
问题导读:
1.hive输出格式的配置项是哪个? 2.hive被各种语言调用如何配置?
3.hive提交作业是在hive中还是hadoop中?
4.一个查询的最后一个map/reduce任务输出是否被压缩的标志,通过哪个配置项?
5.当用户自定义了UDF或者SerDe,这些插件的jar都要放到这个目录下,通过那个配置项?
6.每个reducer的大小,默认是1G,输入文件如果是10G,那么就会起10个reducer;通过那个配置项可以配置?
7.group by操作是否允许数据倾斜,通过那个配置项配置? 8.本地模式时,map/reduce的内存使用量该如何配置?
9.在做表join时缓存在内存中的行数,默认25000;通过那个配置项可以修改? 10.是否开启数据倾斜的join优化,通过那个配置项可以优化?
11.并行运算开启时,允许多少作业同时计算,默认是8;该如何修改这个配置项? hive的配置:
hive.ddl.output.format:hive的ddl语句的输出格式,默认是text,纯文本,还有json格式,这个是0.90以后才出的新配置; hive.exec.script.wrapper:hive调用脚本时的包装器,默认是null,如果设置为python的话,那么在做脚本调用操作时语句会变为python
正在阅读:
hadoop集群部署 - 图文04-02
Unit 7 The Adventures of Tom Sawyer05-06
船舶结构力学(交大)习题集答案Word版05-06
肥皂泡语文教学设计汇总03-15
北京市不动产登记档案管理暂行办法01-23
小学生日记大全150字02-21
G1空白 十二五《教师专业发展研究》 重点课题 申请评审书04-23
寻根我的祖国我的家作文800字06-18
《生活中的经济学》通识课答案03-04
- 多层物业服务方案
- (审判实务)习惯法与少数民族地区民间纠纷解决问题(孙 潋)
- 人教版新课标六年级下册语文全册教案
- 词语打卡
- photoshop实习报告
- 钢结构设计原理综合测试2
- 2014年期末练习题
- 高中数学中的逆向思维解题方法探讨
- 名师原创 全国通用2014-2015学年高二寒假作业 政治(一)Word版
- 北航《建筑结构检测鉴定与加固》在线作业三
- XX县卫生监督所工程建设项目可行性研究报告
- 小学四年级观察作文经典评语
- 浅谈110KV变电站电气一次设计-程泉焱(1)
- 安全员考试题库
- 国家电网公司变电运维管理规定(试行)
- 义务教育课程标准稿征求意见提纲
- 教学秘书面试技巧
- 钢结构工程施工组织设计
- 水利工程概论论文
- 09届九年级数学第四次模拟试卷
- 集群
- 部署
- 图文
- hadoop
- 马克思主义原理讲义
- 打造单店销量提高的视觉营销
- 国家电网公司网、省调电网调度自动化专业知识题库
- 新思维综合英语1Unit5
- 设计说明书
- 赛尔公司爆炸物品管理规定(DOC)
- 悬挑脚手架方案- 16#工字钢版
- 上海市崇明县2017届高三第一次高考模拟考试化学试卷(word版)
- 考前复习2016年1月大专语文期末试题及答案
- 2014年中科大研究生录取名单
- 毕业生图像校对流程、操作步骤
- 北邮电磁场与电磁波实验 心得体会总结
- 训诂学教案
- 高档牛皮系列软革生产项目投资申请报告word
- 许昌财源孵化基地二期厂房维修工程 - 图文
- 塔吊安拆施工方案
- 家长会发言稿
- 江苏省课程游戏化六个支架的解读
- 《有机化学基础》基础知识汇总(基本概念 化学方程式分类)
- 2014学年第二学期六年级数学阶段测试试卷3