hadoop单机部署、集群部署及win7本地Eclipse远程配置管理

更新时间:2024-05-03 06:47:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

准备工作:

Window版hadoop下载地址:

http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1-bin.tar.gz

下载Eclipse hadoop的插件地址:hadoop-eclipse-plugin-1.2.1.jar

Linux Hadoop下载地址:

http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz

在linux服务器上创建用户名为hadoop的用户: [root@localhost ~]# useradd hadoop 设置密码:

[root@localhost ~]# passwd hadoop

添加hadoop用户使用vim、vi等命令的权限: [root@localhost ~]# vim /etc/sudoers root ALL=(ALL) ALL hadoop ALL=(ALL) ALL

此处保存是可能需要使用:wq!强制保存。

以下文档如无特殊说明均使用hadoop账户进行操作

1.Hadoop单机部署

1.下载hadoop-1.2.1.tar.gz文件。

2.运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)

3.编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4.到此出Hadoop单机部署基本完成。 5.单机模式的操作方法

默认情况下,Hadoop被配置成以非分布式模式运行的一个独立Java进程。这对调试非常有帮助。

下面的实例将已解压的 conf 目录拷贝作为输入,查找并显示匹配给定正则表达式的条目。输出写入到指定的output目录。

[hadoop@localhost hadoop-1.2.1]$ mkdir input

[hadoop@localhost hadoop-1.2.1]$ cp conf/*.xml input

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop jar hadoop-examples-1.2.1.jar grep input output 'dfs[a-z.]+' $ cat output/* 注:语法不理解没关系看下面进一步说明 显示结果 1 dfsadmin

2.Hadoop伪分布式模式部署

1.下载hadoop-1.2.1.tar.gz文件。

2.运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)

3.编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4.编辑配置文件

注:以前的版本是hadoop-site.xml,可hadoop在0.20版本,配置文件由以前的

hadoop-site.xml文件变成三个配置文件core-site.xml,hdfs- site.xml,mapred-site.xml.内在的原因是因为hadoop代码量越来越宠大,拆解成三个大的分支进行独立开发,配置文件也独立了

[hadoop@localhost hadoop-1.2.1]$ vim conf/core-site.xml:

fs.default.name

hdfs://localhost:9000

[hadoop@localhost hadoop-1.2.1]$ vim conf/hdfs-site.xml:

dfs.replication

1

[hadoop@localhost hadoop-1.2.1]$ vim conf/mapred-site.xml:

mapred.job.tracker

localhost:9001

5.免密码ssh设置

现在确认能否不输入口令就用ssh登录localhost: [hadoop@localhost hadoop-1.2.1]$ ssh localhost

如果不输入口令就无法用ssh登陆localhost,执行下面的命令:

[hadoop@localhost hadoop-1.2.1]$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa

[hadoop@localhost hadoop-1.2.1]$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

6.执行

首先使用 hadoop 命令对 Hadoop File System (HDFS) 进行格式化。

首先,请求 namenode 对 DFS 文件系统进行格式化。在安装过程中完成了这个步骤,但是了解是否需要生成干净的文件系统是有用的。

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop namenode -format 接下来,启动 Hadoop 守护进程。 启动Hadoop守护进程:

[hadoop@localhost hadoop-1.2.1]$ bin/start-all.sh

注:1)Hadoop守护进程的日志写入到 ${HADOOP_LOG_DIR} 目录 (默认是 ${HADOOP_HOME}/logs)

2)启动hadoop,但ssh 端口不是默认的22怎么样?好在它可以配置。在conf/hadoop-env.sh里改下。如:

export HADOOP_SSH_OPTS=\

浏览NameNode和JobTracker的网络接口,它们的地址默认为: NameNode - http://localhost:50070/ JobTracker - http://localhost:50030/ 将输入文件拷贝到分布式文件系统:

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -put conf input

运行发行版提供的示例程序: [hadoop@localhost

hadoop-1.2.1]$ bin/hadoop jar hadoop-examples-1.2.1.jar grep input output 'dfs[a-z.]+' 查看输出文件:

将输出文件从分布式文件系统拷贝到本地文件系统查看:

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -get output output $ cat output/* 或者

在分布式文件系统上查看输出文件:

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -cat output/* 完成全部操作后,停止守护进程:

[hadoop@localhost hadoop-1.2.1]$ bin/stop-all.sh Hadoop 在这个伪分布式配置中启动 5 个守护进程:namenode、secondarynamenode、datanode、jobtracker 和 tasktracker。在启动每个守护进程时,会看到一些相关信息(指出存储日志的位置)。每个守护进程都在后台运行。图 1 说明完成启动之后伪分布式配置的架构。 图 1. 伪分布式 Hadoop 配置

3.Hadoop集群搭建

用了三台服务器作了hadoop集群的部署测试,服务器有 192.168.153.129(下面简称129) 192.168.153.130(下面简称130) 192.168.153.131(下面简称131) 架构规化如下:

1)129作为NameNode,SecondaryNameNode,JobTracker; 2)130和131作为 DataNode,TaskTracker

1.创建hadoop用户

在三台linux服务器上分别创建用户名为hadoop的用户: [root@localhost ~]# useradd hadoop 设置密码:

[root@localhost ~]# passwd hadoop

添加hadoop用户使用vim、vi等命令的权限: [root@localhost ~]# vim /etc/sudoers root ALL=(ALL) ALL hadoop ALL=(ALL) ALL

此处保存是可能需要使用:wq!强制保存。

2.配置无密码登录

以hadoop用户名登陆名称节点(129)执行

[hadoop@localhost ~]$ ssh-keygen -t rsa

然后一路回车,完毕后生成文件.ssh/id_rsa.pub 按以下步骤执行命令:

[hadoop@localhost ~]$ cd .ssh/

[hadoop@localhost .ssh]$ cp id_rsa.pub authorized_keys [hadoop@localhost .ssh]$ ssh localhost

Last login: Mon Nov 24 17:09:56 2014 from localhost [hadoop@localhost ~]$

如果不需要密码则直接登陆进去的话,就达到要求;否则需检查authorized_keys的权限,看是否为644(-rw-r--r--)。 然后执行命令

[hadoop@localhost ~]$ ssh-copy-id -i hadoop@192.168.153.130 [hadoop@localhost ~]$ ssh 192.168.153.130

如果不需要密码能登陆成功192.168.153.130,则ssh的配置结束。 同理执行

[hadoop@localhost ~]$ ssh-copy-id -i hadoop@192.168.153.131 [hadoop@localhost ~]$ ssh 192.168.153.131

如果不需要密码能登陆成功192.168.153.131,则ssh的配置结束。

(免密码登录也可以按以下方式操作:以hadoop用户登陆数据节点服务器(130,131),创建.ssh目录,并给

与600权限(chmod 600 .ssh); 再把名称129服务器上的authorized_keys复制到目录数据节点(130,131)./ssh,注意权限和目录结构跟名称节点保持一致,然后再从名称129节点用ssh登陆数据节点(130,131),如果不需要密码能登陆成功,则ssh的配置结束。)

3.hadoop软件安装及集群部署

1)下载hadoop-1.2.1.tar.gz文件。

2)运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)

3)编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4)修改masters和slaves配置

修改文件/usr/local/hadoop-1.2.1/conf/slaves及/usr/local/hadoop-1.2.1/conf/masters,把数据节点的主机名加到slaves、名称节点主机名加到masters。可以加多个,每行一个。注意主机名需要在每个服务器的/etc/hosts映射好。 在129上执行

[hadoop@localhost hadoop-1.2.1]$ vi conf/slaves 192.168.153.130 192.168.153.131 在130、131上执行

[hadoop@localhost hadoop-1.2.1]$ vi conf/masters 192.168.153.129

5)Master(129)配置

129为master结点,则129的配置文件如下: hadoop在0.20版本,配置文件由以前的hadoop-site.xml文件变成三个配置文件core-site.xml,hdfs- site.xml,mapred-site.xml。内在的原因是因为hadoop代码量越来越宠大,拆解成三个大的分支进行独立开发,配置文件也独立 了。 下面是129三个配置文件示例:

[hadoop@localhost hadoop-1.2.1]$ cat conf/core-site.xml

fs.default.name

hdfs://192.168.153.129:9000 hadoop.tmp.dir /data/hadoopData/root/tmp/hadoop

[hadoop@localhost hadoop-1.2.1]$ cat conf/hdfs-site.xml

dfs.replication 3

[hadoop@localhost hadoop-1.2.1]$ cat conf/mapred-site.xml

mapred.job.tracker 192.168.153.129:9001

6)Slave(130、131上)配置

在Slave(130、131上)上的配置文件如下(hdfs-site.xml不需要配置): conf/core-site.xml 、conf/mapred-site.xml文件与129上的相同

至此hadoop集群部署完成。

4.初始化和启动hadoop集群

4.1 初始化文件系统

初始化namenode,为HDFS作第一次运行的准备。

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop namenode -format

注:一看到format就和磁盘格式化联想到一起,然后这个format是对hdfs来说的,所以有些人害怕真的是格式化自己的文件系统了,其实大可不必担心, namenode format只是初始化一些目录和文件而已。 4.2 启动Hadoop

在master结点配置用户环境变量,在master结点192.168.153.129上启动hadoop集群程序, 执行bin目录下的start-all.sh

[hadoop@localhost hadoop-1.2.1]$ bin/start-all.sh 停止hadoop

[hadoop@localhost hadoop-1.2.1]$ bin/stop-all.sh

4.3测试

在hdfs上创建test1文件夹,上传文件到此目录下

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -mkdir test1

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -put ./README.txt test1 [hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -ls Found 1 items

drwxr-xr-x - hadoop supergroup 0 2011-07-21 19:58 /user/hadoop/test1 运行一个map-reduce示例程序wordcount,运行结果如下:

[hadoop@localhost hadoop-1.2.1]$ hadoop jar hadoop-examples-1.2.1.jar wordcount /user/hadoop/test1/README.txt output1

查看输出结果文件,这个文件在hdfs上:

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -ls output1

[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -cat output1/part-r-00000

4.4.管理界面与命令

http://192.168.153.129:50070/dfshealth.jsp http://192.168.153.129:50030/jobtracker.jsp

http://192.168.153.130:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=/

5.win7本地Eclipse远程管理配置hadoop

5.1将hadoop-eclipse-plugin-1.2.1.jar插件添加到本地eclipse\\plugins目录下

5.2将hadoop-1.2.1-bin.tar.gz解压到本地目录(我解压在D:\\myworkTools\\hadoop-1.2.1)下

5.3重启Eclipse,通过Open Perspective菜单打开Map Reduce视图,如下:

选中大象图标,编辑Hadoop配置信息:

hadoop.tmp.dir 中的值为hadoop服务器上 conf/core-site.xml中hadoop.tmp.dir的值 通过 window-->open prespective-->resource

打开Resource视图,即可看到DFS:

这样可以正常的进行HDFS分布式文件系统的管理:上传,删除等操作。

为下面测试做准备,需要先建了一个目录 user/root/input2,然后上传两个txt文件到此目录: intput1.txt 对应内容:Hello Hadoop Goodbye Hadoop intput2.txt 对应内容:Hello World Bye World HDFS的准备工作好了,下面可以开始测试了。

新建简单Mapreduce项目

通过向导新建一个Map/Reduce项目。在此过程中,点击

配置Hadoop的安装路径。

新建一个测试类WordCountTest: package com.hadoop.learn.test;

import java.io.IOException; import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.Logger; /**

* 运行测试程序 *

* @author yongboy * @date 2012-04-16 */

public class WordCountTest { private static final Logger log = Logger.getLogger(WordCountTest.class); public static class TokenizerMapper extends

}

Mapper {

private final static IntWritable one = new IntWritable(1); private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException { log.info(\ log.info(\ StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String wordStr = itr.nextToken(); word.set(wordStr); log.info(\ context.write(word, one); } }

public static class IntSumReducer extends Reducer { private IntWritable result = new IntWritable(); }

public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { log.info(\ log.info(\ int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); log.info(\ context.write(key, result); }

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println(\ System.exit(2); }

Job job = new Job(conf, \ job.setJarByClass(WordCountTest.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

右键,选择“Run Configurations”,弹出窗口,点击“Arguments”选项卡,在“Program argumetns”处预先输入参数:

hdfs://192.168.153.129:9000/user/hadoop/test1 hdfs://192.168.153.129:9000/user/hadoop/output3

“VM argumetns”处输入参数-DHADOOP_USER_NAME=hadoop

(VM argumetns如果不设置参数可能会有权限的问题Permission denied,如果你的hadoop账号为hadooptest,或者xxx 则此处设置为-DHADOOP_USER_NAME=hadooptest、-DHADOOP_USER_NAME=xxx 网上也有其他解决方案

网上的解决方案:

修改hdfs-core.xml配置文件,关闭权限验证。

dfs.permissions false ) 如图:

备注:参数为了在本地调试使用,而非真实环境。

然后,点击“Apply”,然后“Close”。现在可以右键,选择“Run on Hadoop”,运行。 但此时会出现类似异常信息:

12/04/24 15:32:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

12/04/24 15:32:44 ERROR security.UserGroupInformation:

PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path:

\\tmp\\hadoop-Administrator\\mapred\\staging\\Administrator-519341271\\.staging to 0700

Exception in thread \of path:

\\tmp\\hadoop-Administrator\\mapred\\staging\\Administrator-519341271\\.sta

ging to 0700

at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:682) at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:655) at

org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509) at

org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344) at

org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189) at

org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)

at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) at

org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)

at org.apache.hadoop.mapreduce.Job.submit(Job.java:500) at

org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530) at

com.hadoop.learn.test.WordCountTest.main(WordCountTest.java:85) 这个是Windows下文件权限问题,在Linux下可以正常运行,不存在这样的问题。 解决方法是,修改/hadoop-1.0.2/src/core/org/apache/hadoop/fs/FileUtil.java里面的checkReturnValue,注释掉即可(有些粗暴,在Window下,可以不用检查): /**

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * \ * with the License. You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an \

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */

package org.apache.hadoop.fs;

import java.io.*;

import java.util.Enumeration; import java.util.zip.ZipEntry; import java.util.zip.ZipFile;

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell;

import org.apache.hadoop.util.Shell.ShellCommandExecutor; /**

* A collection of file-processing util methods */

public class FileUtil {

private static final Log LOG = LogFactory.getLog(FileUtil.class); /**

* convert an array of FileStatus to an array of Path *

* @param stats

* an array of FileStatus objects

* @return an array of paths corresponding to the input */

public static Path[] stat2Paths(FileStatus[] stats) { if (stats == null) return null;

Path[] ret = new Path[stats.length]; for (int i = 0; i < stats.length; ++i) { ret[i] = stats[i].getPath(); } return ret; } /**

* convert an array of FileStatus to an array of Path. * If stats if null, return path * @param stats

* an array of FileStatus objects * @param path

* default path to return in stats is null * @return an array of paths corresponding to the input */

public static Path[] stat2Paths(FileStatus[] stats, Path path) { if (stats == null)

return new Path[]{path}; else

return stat2Paths(stats); } /**

* Delete a directory and all its contents. If

* we return false, the directory may be partially-deleted. */

public static boolean fullyDelete(File dir) throws IOException { if (!fullyDeleteContents(dir)) { return false; }

return dir.delete(); } /**

* Delete the contents of a directory, not the directory itself. If * we return false, the directory may be partially-deleted. */

public static boolean fullyDeleteContents(File dir) throws IOException { boolean deletionSucceeded = true; File contents[] = dir.listFiles(); if (contents != null) {

for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (!contents[i].delete()) { deletionSucceeded = false;

continue; // continue deletion of other files/dirs under dir } } else {

//try deleting the directory // this might be a symlink boolean b = false; b = contents[i].delete(); if (b){

//this was indeed a symlink or an empty directory

continue; }

// if not an empty directory or symlink let // fullydelete handle it. if (!fullyDelete(contents[i])) { deletionSucceeded = false;

continue; // continue deletion of other files/dirs under dir } } } }

return deletionSucceeded; } /**

* Recursively delete a directory. *

* @param fs {@link FileSystem} on which the path is present * @param dir directory to recursively delete * @throws IOException

* @deprecated Use {@link FileSystem#delete(Path, boolean)} */

@Deprecated

public static void fullyDelete(FileSystem fs, Path dir) throws IOException { fs.delete(dir, true); } //

// If the destination is a subdirectory of the source, then // generate exception //

private static void checkDependencies(FileSystem srcFS, Path src,

FileSystem dstFS,

Path dst)

throws IOException { if (srcFS == dstFS) {

String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR; String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR; if (dstq.startsWith(srcq)) {

if (srcq.length() == dstq.length()) {

throw new IOException(\ } else {

throw new IOException(\ dst); } } } }

/** Copy files between FileSystems. */

public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource,

Configuration conf) throws IOException { return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); }

public static boolean copy(FileSystem srcFS, Path[] srcs, FileSystem dstFS, Path dst, boolean deleteSource,

boolean overwrite, Configuration conf) throws IOException { boolean gotException = false; boolean returnVal = true;

StringBuffer exceptions = new StringBuffer();

if (srcs.length == 1)

return copy(srcFS, srcs[0], dstFS, dst, deleteSource, overwrite, conf);

// Check if dest is directory if (!dstFS.exists(dst)) {

throw new IOException(\ \ } else {

FileStatus sdst = dstFS.getFileStatus(dst); if (!sdst.isDir())

throw new IOException(\ dst + \ }

for (Path src : srcs) { try {

if (!copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf)) returnVal = false; } catch (IOException e) { gotException = true;

exceptions.append(e.getMessage()); exceptions.append(\ } }

if (gotException) {

throw new IOException(exceptions.toString()); }

return returnVal; }

/** Copy files between FileSystems. */

public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite,

Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst, overwrite);

if (srcFS.getFileStatus(src).isDir()) {

checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; }

FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i].getPath(), dstFS,

new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); }

} else if (srcFS.isFile(src)) { InputStream in=null; OutputStream out = null; try {

in = srcFS.open(src);

out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } else {

throw new IOException(src.toString() + \ }

if (deleteSource) {

return srcFS.delete(src, true); } else { return true; } }

/** Copy all files in a directory to one output file (merge). */ public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,

Configuration conf, String addString) throws IOException {

dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);

if (!srcFS.getFileStatus(srcDir).isDir()) return false;

OutputStream out = dstFS.create(dstFile); try {

FileStatus contents[] = srcFS.listStatus(srcDir); for (int i = 0; i < contents.length; i++) { if (!contents[i].isDir()) {

InputStream in = srcFS.open(contents[i].getPath()); try {

IOUtils.copyBytes(in, out, conf, false); if (addString!=null)

out.write(addString.getBytes(\ } finally { in.close(); } } } } finally { out.close(); }

if (deleteSource) {

return srcFS.delete(srcDir, true);

} else { return true; } }

/** Copy local files to a FileSystem. */ public static boolean copy(File src,

FileSystem dstFS, Path dst, boolean deleteSource,

Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst, false);

if (src.isDirectory()) { if (!dstFS.mkdirs(dst)) { return false; }

File contents[] = listFiles(src); for (int i = 0; i < contents.length; i++) {

copy(contents[i], dstFS, new Path(dst, contents[i].getName()), deleteSource, conf); }

} else if (src.isFile()) { InputStream in = null; OutputStream out =null; try {

in = new FileInputStream(src); out = dstFS.create(dst); IOUtils.copyBytes(in, out, conf); } catch (IOException e) { IOUtils.closeStream( out ); IOUtils.closeStream( in ); throw e; } } else {

throw new IOException(src.toString() +

\ }

if (deleteSource) {

return FileUtil.fullyDelete(src); } else { return true; } }

/** Copy FileSystem files to local files. */

public static boolean copy(FileSystem srcFS, Path src, File dst, boolean deleteSource,

Configuration conf) throws IOException { if (srcFS.getFileStatus(src).isDir()) { if (!dst.mkdirs()) { return false; }

FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i].getPath(),

new File(dst, contents[i].getPath().getName()), deleteSource, conf); }

} else if (srcFS.isFile(src)) { InputStream in = srcFS.open(src);

IOUtils.copyBytes(in, new FileOutputStream(dst), conf); } else {

throw new IOException(src.toString() +

\ }

if (deleteSource) {

return srcFS.delete(src, true); } else { return true; }

}

private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException { if (dstFS.exists(dst)) {

FileStatus sdst = dstFS.getFileStatus(dst); if (sdst.isDir()) { if (null == srcName) {

throw new IOException(\ }

return checkDest(null, dstFS, new Path(dst, srcName), overwrite); } else if (!overwrite) {

throw new IOException(\ }

} else if (dst.toString().isEmpty()) {

return checkDest(null, dstFS, new Path(srcName), overwrite); }

return dst; } /**

* This class is only used on windows to invoke the cygpath command. */

private static class CygPathCommand extends Shell { String[] command; String result;

CygPathCommand(String path) throws IOException { command = new String[]{\ run(); }

String getResult() throws IOException { return result; }

protected String[] getExecString() { return command;

}

protected void parseExecResult(BufferedReader lines) throws IOException { String line = lines.readLine(); if (line == null) {

throw new IOException(\ \ }

result = line; } } /**

* Convert a os-native filename to a path that works for the shell. * @param filename The filename to convert * @return The unix pathname

* @throws IOException on windows, there can be problems with the subprocess */

public static String makeShellPath(String filename) throws IOException { if (Path.WINDOWS) {

return new CygPathCommand(filename).getResult(); } else {

return filename; } } /**

* Convert a os-native filename to a path that works for the shell. * @param file The filename to convert * @return The unix pathname

* @throws IOException on windows, there can be problems with the subprocess */

public static String makeShellPath(File file) throws IOException { return makeShellPath(file, false); }

/**

* Convert a os-native filename to a path that works for the shell. * @param file The filename to convert * @param makeCanonicalPath

* Whether to make canonical path for the file passed * @return The unix pathname

* @throws IOException on windows, there can be problems with the subprocess */

public static String makeShellPath(File file, boolean makeCanonicalPath) throws IOException { if (makeCanonicalPath) {

return makeShellPath(file.getCanonicalPath()); } else {

return makeShellPath(file.toString()); } } /**

* Takes an input dir and returns the du on that local directory. Very basic * implementation. *

* @param dir

* The input dir to get the disk space of this local dir * @return The total disk space of the input local directory */

public static long getDU(File dir) { long size = 0; if (!dir.exists()) return 0;

if (!dir.isDirectory()) { return dir.length(); } else {

File[] allFiles = dir.listFiles(); if(allFiles != null) {

for (int i = 0; i < allFiles.length; i++) {

boolean isSymLink; try {

isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]); } catch(IOException ioe) { isSymLink = true; }

if(!isSymLink) {

size += getDU(allFiles[i]); } } }

return size; } } /**

* Given a File input it will unzip the file in a the unzip directory * passed as the second parameter * @param inFile The zip file as input

* @param unzipDir The unzip directory where to unzip the zip file. * @throws IOException */

public static void unZip(File inFile, File unzipDir) throws IOException { Enumeration entries; ZipFile zipFile = new ZipFile(inFile); try {

entries = zipFile.entries();

while (entries.hasMoreElements()) { ZipEntry entry = entries.nextElement(); if (!entry.isDirectory()) {

InputStream in = zipFile.getInputStream(entry); try {

File file = new File(unzipDir, entry.getName()); if (!file.getParentFile().mkdirs()) {

if (!file.getParentFile().isDirectory()) {

throw new IOException(\ file.getParentFile().toString()); } }

OutputStream out = new FileOutputStream(file); try {

byte[] buffer = new byte[8192]; int i;

while ((i = in.read(buffer)) != -1) { out.write(buffer, 0, i); } } finally { out.close(); } } finally { in.close(); } } } } finally { zipFile.close(); } } /**

* Given a Tar File as input it will untar the file in a the untar directory * passed as the second parameter *

* This utility will untar \ *

* @param inFile The tar file as input.

* @param untarDir The untar directory where to untar the tar file. * @throws IOException */

public static void unTar(File inFile, File untarDir) throws IOException { if (!untarDir.mkdirs()) { if (!untarDir.isDirectory()) {

throw new IOException(\ } }

StringBuffer untarCommand = new StringBuffer(); boolean gzipped = inFile.toString().endsWith(\ if (gzipped) {

untarCommand.append(\

untarCommand.append(FileUtil.makeShellPath(inFile)); untarCommand.append(\ }

untarCommand.append(\

untarCommand.append(FileUtil.makeShellPath(untarDir)); untarCommand.append(\ untarCommand.append(\

if (gzipped) {

untarCommand.append(\ } else {

untarCommand.append(FileUtil.makeShellPath(inFile)); }

String[] shellCmd = { \

ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); shexec.execute();

int exitcode = shexec.getExitCode(); if (exitcode != 0) {

throw new IOException(\ \ } } /**

* Create a soft link between a src and destination * only on a local disk. HDFS does not support this * @param target the target for symlink * @param linkname the symlink

* @return value returned by the command */

public static int symLink(String target, String linkname) throws IOException{ String cmd = \ Process p = Runtime.getRuntime().exec(cmd, null); int returnVal = -1; try{

returnVal = p.waitFor(); } catch(InterruptedException e){ //do nothing as of yet }

if (returnVal != 0) {

LOG.warn(\ \ }

return returnVal; }

private static String copyStderr(Process p) throws IOException { InputStream err = p.getErrorStream(); StringBuilder result = new StringBuilder(); byte[] buff = new byte[4096]; int len = err.read(buff); while (len > 0) {

result.append(new String(buff, 0 , len)); len = err.read(buff); }

return result.toString(); } /**

* Change the permissions on a filename. * @param filename the name of the file to change * @param perm the permission string * @return the exit code from the command * @throws IOException * @throws InterruptedException */

public static int chmod(String filename, String perm

) throws IOException, InterruptedException { return chmod(filename, perm, false); } /**

* Change the permissions on a file / directory, recursively, if * needed.

* @param filename name of the file whose permissions are to change * @param perm permission string

* @param recursive true, if permissions should be changed recursively * @return the exit code from the command. * @throws IOException * @throws InterruptedException */

public static int chmod(String filename, String perm, boolean recursive) throws IOException { StringBuffer cmdBuf = new StringBuffer(); cmdBuf.append(\ if (recursive) {

cmdBuf.append(\ }

cmdBuf.append(perm).append(\ cmdBuf.append(filename);

String[] shellCmd = {\

ShellCommandExecutor shExec = new ShellCommandExecutor(shellCmd); try {

shExec.execute();

}catch(IOException e) { if(LOG.isDebugEnabled()) {

LOG.debug(\ +\ } }

return shExec.getExitCode(); } /**

* Set permissions to the required value. Uses the java primitives instead * of forking if group == other. * @param f the file to change

* @param permission the new permissions * @throws IOException */

public static void setPermission(File f, FsPermission permission ) throws IOException { FsAction user = permission.getUserAction(); FsAction group = permission.getGroupAction(); FsAction other = permission.getOtherAction();

// use the native/fork if the group/other permissions are different // or if the native is available

if (group != other || NativeIO.isAvailable()) { execSetPermission(f, permission); return; }

boolean rv = true;

// read perms

rv = f.setReadable(group.implies(FsAction.READ), false); checkReturnValue(rv, f, permission);

if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {

f.setReadable(user.implies(FsAction.READ), true); checkReturnValue(rv, f, permission); }

// write perms

rv = f.setWritable(group.implies(FsAction.WRITE), false); checkReturnValue(rv, f, permission);

if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) { f.setWritable(user.implies(FsAction.WRITE), true); checkReturnValue(rv, f, permission); }

// exec perms

rv = f.setExecutable(group.implies(FsAction.EXECUTE), false); checkReturnValue(rv, f, permission);

if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) { f.setExecutable(user.implies(FsAction.EXECUTE), true); checkReturnValue(rv, f, permission); } }

private static void checkReturnValue(boolean rv, File p,

FsPermission permission ) throws IOException {

System.out.println(\

// if (!rv) {

// throw new IOException(\ // \

// String.format(\// } }

private static void execSetPermission(File f,

FsPermission permission ) throws IOException {

if (NativeIO.isAvailable()) {

NativeIO.chmod(f.getCanonicalPath(), permission.toShort()); } else {

execCommand(f, Shell.SET_PERMISSION_COMMAND, String.format(\ } }

static String execCommand(File f, String... cmd) throws IOException { String[] args = new String[cmd.length + 1]; System.arraycopy(cmd, 0, args, 0, cmd.length); args[cmd.length] = f.getCanonicalPath(); String output = Shell.execCommand(args); return output; } /**

* Create a tmp file for a base file.

* @param basefile the base file of the tmp * @param prefix file name prefix of tmp

* @param isDeleteOnExit if true, the tmp will be deleted when the VM exits * @return a newly created tmp file

* @exception IOException If a tmp file cannot created * @see java.io.File#createTempFile(String, String, File) * @see java.io.File#deleteOnExit() */

public static final File createLocalTempFile(final File basefile, final String prefix,

final boolean isDeleteOnExit) throws IOException {

File tmp = File.createTempFile(prefix + basefile.getName(), \ if (isDeleteOnExit) { tmp.deleteOnExit(); }

return tmp; } /**

* Move the src file to the name specified by target. * @param src the source file * @param target the target file

* @exception IOException If this operation fails */

public static void replaceFile(File src, File target) throws IOException { /* renameTo() has two limitations on Windows platform. * src.renameTo(target) fails if * 1) If target already exists OR

* 2) If target is already open for reading/writing. */

if (!src.renameTo(target)) { int retries = 5;

while (target.exists() && !target.delete() && retries-- >= 0) { try {

Thread.sleep(1000);

} catch (InterruptedException e) {

throw new IOException(\ } }

if (!src.renameTo(target)) {

throw new IOException(\ \ } } } /**

* A wrapper for {@link File#listFiles()}. This java.io API returns null * when a dir is not a directory or for any I/O error. Instead of having * null check everywhere File#listFiles() is used, we will add utility API

* to get around this problem. For the majority of cases where we prefer * an IOException to be thrown.

* @param dir directory for which listing should be performed * @return list of files or empty list

* @exception IOException for invalid directory or for a bad disk. */

public static File[] listFiles(File dir) throws IOException { File[] files = dir.listFiles(); if(files == null) {

throw new IOException(\ + dir.toString()); }

return files; } /**

* A wrapper for {@link File#list()}. This java.io API returns null * when a dir is not a directory or for any I/O error. Instead of having * null check everywhere File#list() is used, we will add utility API * to get around this problem. For the majority of cases where we prefer * an IOException to be thrown.

* @param dir directory for which listing should be performed * @return list of file names or empty string list

* @exception IOException for invalid directory or for a bad disk. */

public static String[] list(File dir) throws IOException { String[] fileNames = dir.list(); if(fileNames == null) {

throw new IOException(\ + dir.toString()); }

return fileNames; } }

重新编译打包hadoop-core-1.0.2.jar,替换掉hadoop-1.0.2根目录下的hadoop-core-1.0.2.jar即可。

替换之后,刷新项目,设置好正确的jar包依赖,现在再运行WordCountTest,即可。 成功之后,在Eclipse下刷新HDFS目录,可以看到生成了ouput2目录:

点击“ part-r-00000”文件,可以看到排序结果:

Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2

嗯,一样可以正常Debug调试该程序,设置断点(右键 –> Debug As – > Java Application),即可(每次运行之前,都需要收到删除输出目录)。

另外,该插件会在eclipse对应的workspace\\.metadata\\.plugins\\org.apache.hadoop.eclipse下,自动生成jar文件,以及其他文件,包括Haoop的一些具体配置等。 嗯,更多细节,慢慢体验吧。

本文来源:https://www.bwwdw.com/article/cgmg.html

Top