hadoop单机部署、集群部署及win7本地Eclipse远程配置管理
更新时间:2024-05-02 18:48:01 阅读量: 综合文库 文档下载
准备工作:
Window版hadoop下载地址:
http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1-bin.tar.gz
下载Eclipse hadoop的插件地址:hadoop-eclipse-plugin-1.2.1.jar
Linux Hadoop下载地址:
http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz
在linux服务器上创建用户名为hadoop的用户: [root@localhost ~]# useradd hadoop 设置密码:
[root@localhost ~]# passwd hadoop
添加hadoop用户使用vim、vi等命令的权限: [root@localhost ~]# vim /etc/sudoers root ALL=(ALL) ALL hadoop ALL=(ALL) ALL
此处保存是可能需要使用:wq!强制保存。
以下文档如无特殊说明均使用hadoop账户进行操作
1.Hadoop单机部署
1.下载hadoop-1.2.1.tar.gz文件。
2.运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)
3.编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4.到此出Hadoop单机部署基本完成。 5.单机模式的操作方法
默认情况下,Hadoop被配置成以非分布式模式运行的一个独立Java进程。这对调试非常有帮助。
下面的实例将已解压的 conf 目录拷贝作为输入,查找并显示匹配给定正则表达式的条目。输出写入到指定的output目录。
[hadoop@localhost hadoop-1.2.1]$ mkdir input
[hadoop@localhost hadoop-1.2.1]$ cp conf/*.xml input
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop jar hadoop-examples-1.2.1.jar grep input output 'dfs[a-z.]+' $ cat output/* 注:语法不理解没关系看下面进一步说明 显示结果 1 dfsadmin
2.Hadoop伪分布式模式部署
1.下载hadoop-1.2.1.tar.gz文件。
2.运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)
3.编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4.编辑配置文件
注:以前的版本是hadoop-site.xml,可hadoop在0.20版本,配置文件由以前的
hadoop-site.xml文件变成三个配置文件core-site.xml,hdfs- site.xml,mapred-site.xml.内在的原因是因为hadoop代码量越来越宠大,拆解成三个大的分支进行独立开发,配置文件也独立了
[hadoop@localhost hadoop-1.2.1]$ vim conf/core-site.xml:
[hadoop@localhost hadoop-1.2.1]$ vim conf/hdfs-site.xml:
[hadoop@localhost hadoop-1.2.1]$ vim conf/mapred-site.xml:
5.免密码ssh设置
现在确认能否不输入口令就用ssh登录localhost: [hadoop@localhost hadoop-1.2.1]$ ssh localhost
如果不输入口令就无法用ssh登陆localhost,执行下面的命令:
[hadoop@localhost hadoop-1.2.1]$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
[hadoop@localhost hadoop-1.2.1]$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
6.执行
首先使用 hadoop 命令对 Hadoop File System (HDFS) 进行格式化。
首先,请求 namenode 对 DFS 文件系统进行格式化。在安装过程中完成了这个步骤,但是了解是否需要生成干净的文件系统是有用的。
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop namenode -format 接下来,启动 Hadoop 守护进程。 启动Hadoop守护进程:
[hadoop@localhost hadoop-1.2.1]$ bin/start-all.sh
注:1)Hadoop守护进程的日志写入到 ${HADOOP_LOG_DIR} 目录 (默认是 ${HADOOP_HOME}/logs)
2)启动hadoop,但ssh 端口不是默认的22怎么样?好在它可以配置。在conf/hadoop-env.sh里改下。如:
export HADOOP_SSH_OPTS=\
浏览NameNode和JobTracker的网络接口,它们的地址默认为: NameNode - http://localhost:50070/ JobTracker - http://localhost:50030/ 将输入文件拷贝到分布式文件系统:
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -put conf input
运行发行版提供的示例程序: [hadoop@localhost
hadoop-1.2.1]$ bin/hadoop jar hadoop-examples-1.2.1.jar grep input output 'dfs[a-z.]+' 查看输出文件:
将输出文件从分布式文件系统拷贝到本地文件系统查看:
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -get output output $ cat output/* 或者
在分布式文件系统上查看输出文件:
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -cat output/* 完成全部操作后,停止守护进程:
[hadoop@localhost hadoop-1.2.1]$ bin/stop-all.sh Hadoop 在这个伪分布式配置中启动 5 个守护进程:namenode、secondarynamenode、datanode、jobtracker 和 tasktracker。在启动每个守护进程时,会看到一些相关信息(指出存储日志的位置)。每个守护进程都在后台运行。图 1 说明完成启动之后伪分布式配置的架构。 图 1. 伪分布式 Hadoop 配置
3.Hadoop集群搭建
用了三台服务器作了hadoop集群的部署测试,服务器有 192.168.153.129(下面简称129) 192.168.153.130(下面简称130) 192.168.153.131(下面简称131) 架构规化如下:
1)129作为NameNode,SecondaryNameNode,JobTracker; 2)130和131作为 DataNode,TaskTracker
1.创建hadoop用户
在三台linux服务器上分别创建用户名为hadoop的用户: [root@localhost ~]# useradd hadoop 设置密码:
[root@localhost ~]# passwd hadoop
添加hadoop用户使用vim、vi等命令的权限: [root@localhost ~]# vim /etc/sudoers root ALL=(ALL) ALL hadoop ALL=(ALL) ALL
此处保存是可能需要使用:wq!强制保存。
2.配置无密码登录
以hadoop用户名登陆名称节点(129)执行
[hadoop@localhost ~]$ ssh-keygen -t rsa
然后一路回车,完毕后生成文件.ssh/id_rsa.pub 按以下步骤执行命令:
[hadoop@localhost ~]$ cd .ssh/
[hadoop@localhost .ssh]$ cp id_rsa.pub authorized_keys [hadoop@localhost .ssh]$ ssh localhost
Last login: Mon Nov 24 17:09:56 2014 from localhost [hadoop@localhost ~]$
如果不需要密码则直接登陆进去的话,就达到要求;否则需检查authorized_keys的权限,看是否为644(-rw-r--r--)。 然后执行命令
[hadoop@localhost ~]$ ssh-copy-id -i hadoop@192.168.153.130 [hadoop@localhost ~]$ ssh 192.168.153.130
如果不需要密码能登陆成功192.168.153.130,则ssh的配置结束。 同理执行
[hadoop@localhost ~]$ ssh-copy-id -i hadoop@192.168.153.131 [hadoop@localhost ~]$ ssh 192.168.153.131
如果不需要密码能登陆成功192.168.153.131,则ssh的配置结束。
(免密码登录也可以按以下方式操作:以hadoop用户登陆数据节点服务器(130,131),创建.ssh目录,并给
与600权限(chmod 600 .ssh); 再把名称129服务器上的authorized_keys复制到目录数据节点(130,131)./ssh,注意权限和目录结构跟名称节点保持一致,然后再从名称129节点用ssh登陆数据节点(130,131),如果不需要密码能登陆成功,则ssh的配置结束。)
3.hadoop软件安装及集群部署
1)下载hadoop-1.2.1.tar.gz文件。
2)运行命令 tar zxvf hadoop-1.2.1.tar.gz将hadoop解压到自己喜欢的目录下(我的解压在/usr/local/目录下)
3)编辑hadoop-1.2.1目录下的conf/hadoop-env.sh文件,将其中的JAVA_HOME配置为自己的jdk目录(如我的为:JAVA_HOME=/usr/local/jdk1.7.0_60) 4)修改masters和slaves配置
修改文件/usr/local/hadoop-1.2.1/conf/slaves及/usr/local/hadoop-1.2.1/conf/masters,把数据节点的主机名加到slaves、名称节点主机名加到masters。可以加多个,每行一个。注意主机名需要在每个服务器的/etc/hosts映射好。 在129上执行
[hadoop@localhost hadoop-1.2.1]$ vi conf/slaves 192.168.153.130 192.168.153.131 在130、131上执行
[hadoop@localhost hadoop-1.2.1]$ vi conf/masters 192.168.153.129
5)Master(129)配置
129为master结点,则129的配置文件如下: hadoop在0.20版本,配置文件由以前的hadoop-site.xml文件变成三个配置文件core-site.xml,hdfs- site.xml,mapred-site.xml。内在的原因是因为hadoop代码量越来越宠大,拆解成三个大的分支进行独立开发,配置文件也独立 了。 下面是129三个配置文件示例:
[hadoop@localhost hadoop-1.2.1]$ cat conf/core-site.xml
[hadoop@localhost hadoop-1.2.1]$ cat conf/hdfs-site.xml
[hadoop@localhost hadoop-1.2.1]$ cat conf/mapred-site.xml
6)Slave(130、131上)配置
在Slave(130、131上)上的配置文件如下(hdfs-site.xml不需要配置): conf/core-site.xml 、conf/mapred-site.xml文件与129上的相同
至此hadoop集群部署完成。
4.初始化和启动hadoop集群
4.1 初始化文件系统
初始化namenode,为HDFS作第一次运行的准备。
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop namenode -format
注:一看到format就和磁盘格式化联想到一起,然后这个format是对hdfs来说的,所以有些人害怕真的是格式化自己的文件系统了,其实大可不必担心, namenode format只是初始化一些目录和文件而已。 4.2 启动Hadoop
在master结点配置用户环境变量,在master结点192.168.153.129上启动hadoop集群程序, 执行bin目录下的start-all.sh
[hadoop@localhost hadoop-1.2.1]$ bin/start-all.sh 停止hadoop
[hadoop@localhost hadoop-1.2.1]$ bin/stop-all.sh
4.3测试
在hdfs上创建test1文件夹,上传文件到此目录下
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -mkdir test1
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -put ./README.txt test1 [hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -ls Found 1 items
drwxr-xr-x - hadoop supergroup 0 2011-07-21 19:58 /user/hadoop/test1 运行一个map-reduce示例程序wordcount,运行结果如下:
[hadoop@localhost hadoop-1.2.1]$ hadoop jar hadoop-examples-1.2.1.jar wordcount /user/hadoop/test1/README.txt output1
查看输出结果文件,这个文件在hdfs上:
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -ls output1
[hadoop@localhost hadoop-1.2.1]$ bin/hadoop fs -cat output1/part-r-00000
4.4.管理界面与命令
http://192.168.153.129:50070/dfshealth.jsp http://192.168.153.129:50030/jobtracker.jsp
http://192.168.153.130:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=/
5.win7本地Eclipse远程管理配置hadoop
5.1将hadoop-eclipse-plugin-1.2.1.jar插件添加到本地eclipse\\plugins目录下
5.2将hadoop-1.2.1-bin.tar.gz解压到本地目录(我解压在D:\\myworkTools\\hadoop-1.2.1)下
5.3重启Eclipse,通过Open Perspective菜单打开Map Reduce视图,如下:
选中大象图标,编辑Hadoop配置信息:
hadoop.tmp.dir 中的值为hadoop服务器上 conf/core-site.xml中hadoop.tmp.dir的值 通过 window-->open prespective-->resource
打开Resource视图,即可看到DFS:
这样可以正常的进行HDFS分布式文件系统的管理:上传,删除等操作。
为下面测试做准备,需要先建了一个目录 user/root/input2,然后上传两个txt文件到此目录: intput1.txt 对应内容:Hello Hadoop Goodbye Hadoop intput2.txt 对应内容:Hello World Bye World HDFS的准备工作好了,下面可以开始测试了。
新建简单Mapreduce项目
通过向导新建一个Map/Reduce项目。在此过程中,点击
配置Hadoop的安装路径。
新建一个测试类WordCountTest: package com.hadoop.learn.test;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.Logger; /**
* 运行测试程序 *
* @author yongboy * @date 2012-04-16 */
public class WordCountTest { private static final Logger log = Logger.getLogger(WordCountTest.class); public static class TokenizerMapper extends
}
Mapper
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { log.info(\ log.info(\ StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String wordStr = itr.nextToken(); word.set(wordStr); log.info(\ context.write(word, one); } }
public static class IntSumReducer extends Reducer
public void reduce(Text key, Iterable
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println(\ System.exit(2); }
Job job = new Job(conf, \ job.setJarByClass(WordCountTest.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
右键,选择“Run Configurations”,弹出窗口,点击“Arguments”选项卡,在“Program argumetns”处预先输入参数:
hdfs://192.168.153.129:9000/user/hadoop/test1 hdfs://192.168.153.129:9000/user/hadoop/output3
“VM argumetns”处输入参数-DHADOOP_USER_NAME=hadoop
(VM argumetns如果不设置参数可能会有权限的问题Permission denied,如果你的hadoop账号为hadooptest,或者xxx 则此处设置为-DHADOOP_USER_NAME=hadooptest、-DHADOOP_USER_NAME=xxx 网上也有其他解决方案
网上的解决方案:
修改hdfs-core.xml配置文件,关闭权限验证。
备注:参数为了在本地调试使用,而非真实环境。
然后,点击“Apply”,然后“Close”。现在可以右键,选择“Run on Hadoop”,运行。 但此时会出现类似异常信息:
12/04/24 15:32:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/04/24 15:32:44 ERROR security.UserGroupInformation:
PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path:
\\tmp\\hadoop-Administrator\\mapred\\staging\\Administrator-519341271\\.staging to 0700
Exception in thread \of path:
\\tmp\\hadoop-Administrator\\mapred\\staging\\Administrator-519341271\\.sta
ging to 0700
at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:682) at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:655) at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509) at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344) at
org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189) at
org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) at
org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:500) at
org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530) at
com.hadoop.learn.test.WordCountTest.main(WordCountTest.java:85) 这个是Windows下文件权限问题,在Linux下可以正常运行,不存在这样的问题。 解决方法是,修改/hadoop-1.0.2/src/core/org/apache/hadoop/fs/FileUtil.java里面的checkReturnValue,注释掉即可(有些粗暴,在Window下,可以不用检查): /**
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * \ * with the License. You may obtain a copy of the License at *
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an \
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.hadoop.fs;
import java.io.*;
import java.util.Enumeration; import java.util.zip.ZipEntry; import java.util.zip.ZipFile;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; /**
* A collection of file-processing util methods */
public class FileUtil {
private static final Log LOG = LogFactory.getLog(FileUtil.class); /**
* convert an array of FileStatus to an array of Path *
* @param stats
* an array of FileStatus objects
* @return an array of paths corresponding to the input */
public static Path[] stat2Paths(FileStatus[] stats) { if (stats == null) return null;
Path[] ret = new Path[stats.length]; for (int i = 0; i < stats.length; ++i) { ret[i] = stats[i].getPath(); } return ret; } /**
* convert an array of FileStatus to an array of Path. * If stats if null, return path * @param stats
* an array of FileStatus objects * @param path
* default path to return in stats is null * @return an array of paths corresponding to the input */
public static Path[] stat2Paths(FileStatus[] stats, Path path) { if (stats == null)
return new Path[]{path}; else
return stat2Paths(stats); } /**
* Delete a directory and all its contents. If
* we return false, the directory may be partially-deleted. */
public static boolean fullyDelete(File dir) throws IOException { if (!fullyDeleteContents(dir)) { return false; }
return dir.delete(); } /**
* Delete the contents of a directory, not the directory itself. If * we return false, the directory may be partially-deleted. */
public static boolean fullyDeleteContents(File dir) throws IOException { boolean deletionSucceeded = true; File contents[] = dir.listFiles(); if (contents != null) {
for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (!contents[i].delete()) { deletionSucceeded = false;
continue; // continue deletion of other files/dirs under dir } } else {
//try deleting the directory // this might be a symlink boolean b = false; b = contents[i].delete(); if (b){
//this was indeed a symlink or an empty directory
continue; }
// if not an empty directory or symlink let // fullydelete handle it. if (!fullyDelete(contents[i])) { deletionSucceeded = false;
continue; // continue deletion of other files/dirs under dir } } } }
return deletionSucceeded; } /**
* Recursively delete a directory. *
* @param fs {@link FileSystem} on which the path is present * @param dir directory to recursively delete * @throws IOException
* @deprecated Use {@link FileSystem#delete(Path, boolean)} */
@Deprecated
public static void fullyDelete(FileSystem fs, Path dir) throws IOException { fs.delete(dir, true); } //
// If the destination is a subdirectory of the source, then // generate exception //
private static void checkDependencies(FileSystem srcFS, Path src,
FileSystem dstFS,
Path dst)
throws IOException { if (srcFS == dstFS) {
String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR; String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR; if (dstq.startsWith(srcq)) {
if (srcq.length() == dstq.length()) {
throw new IOException(\ } else {
throw new IOException(\ dst); } } } }
/** Copy files between FileSystems. */
public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource,
Configuration conf) throws IOException { return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); }
public static boolean copy(FileSystem srcFS, Path[] srcs, FileSystem dstFS, Path dst, boolean deleteSource,
boolean overwrite, Configuration conf) throws IOException { boolean gotException = false; boolean returnVal = true;
StringBuffer exceptions = new StringBuffer();
if (srcs.length == 1)
return copy(srcFS, srcs[0], dstFS, dst, deleteSource, overwrite, conf);
// Check if dest is directory if (!dstFS.exists(dst)) {
throw new IOException(\ \ } else {
FileStatus sdst = dstFS.getFileStatus(dst); if (!sdst.isDir())
throw new IOException(\ dst + \ }
for (Path src : srcs) { try {
if (!copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf)) returnVal = false; } catch (IOException e) { gotException = true;
exceptions.append(e.getMessage()); exceptions.append(\ } }
if (gotException) {
throw new IOException(exceptions.toString()); }
return returnVal; }
/** Copy files between FileSystems. */
public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite,
Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst, overwrite);
if (srcFS.getFileStatus(src).isDir()) {
checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; }
FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i].getPath(), dstFS,
new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); }
} else if (srcFS.isFile(src)) { InputStream in=null; OutputStream out = null; try {
in = srcFS.open(src);
out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } else {
throw new IOException(src.toString() + \ }
if (deleteSource) {
return srcFS.delete(src, true); } else { return true; } }
/** Copy all files in a directory to one output file (merge). */ public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,
Configuration conf, String addString) throws IOException {
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
if (!srcFS.getFileStatus(srcDir).isDir()) return false;
OutputStream out = dstFS.create(dstFile); try {
FileStatus contents[] = srcFS.listStatus(srcDir); for (int i = 0; i < contents.length; i++) { if (!contents[i].isDir()) {
InputStream in = srcFS.open(contents[i].getPath()); try {
IOUtils.copyBytes(in, out, conf, false); if (addString!=null)
out.write(addString.getBytes(\ } finally { in.close(); } } } } finally { out.close(); }
if (deleteSource) {
return srcFS.delete(srcDir, true);
} else { return true; } }
/** Copy local files to a FileSystem. */ public static boolean copy(File src,
FileSystem dstFS, Path dst, boolean deleteSource,
Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst, false);
if (src.isDirectory()) { if (!dstFS.mkdirs(dst)) { return false; }
File contents[] = listFiles(src); for (int i = 0; i < contents.length; i++) {
copy(contents[i], dstFS, new Path(dst, contents[i].getName()), deleteSource, conf); }
} else if (src.isFile()) { InputStream in = null; OutputStream out =null; try {
in = new FileInputStream(src); out = dstFS.create(dst); IOUtils.copyBytes(in, out, conf); } catch (IOException e) { IOUtils.closeStream( out ); IOUtils.closeStream( in ); throw e; } } else {
throw new IOException(src.toString() +
\ }
if (deleteSource) {
return FileUtil.fullyDelete(src); } else { return true; } }
/** Copy FileSystem files to local files. */
public static boolean copy(FileSystem srcFS, Path src, File dst, boolean deleteSource,
Configuration conf) throws IOException { if (srcFS.getFileStatus(src).isDir()) { if (!dst.mkdirs()) { return false; }
FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i].getPath(),
new File(dst, contents[i].getPath().getName()), deleteSource, conf); }
} else if (srcFS.isFile(src)) { InputStream in = srcFS.open(src);
IOUtils.copyBytes(in, new FileOutputStream(dst), conf); } else {
throw new IOException(src.toString() +
\ }
if (deleteSource) {
return srcFS.delete(src, true); } else { return true; }
}
private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException { if (dstFS.exists(dst)) {
FileStatus sdst = dstFS.getFileStatus(dst); if (sdst.isDir()) { if (null == srcName) {
throw new IOException(\ }
return checkDest(null, dstFS, new Path(dst, srcName), overwrite); } else if (!overwrite) {
throw new IOException(\ }
} else if (dst.toString().isEmpty()) {
return checkDest(null, dstFS, new Path(srcName), overwrite); }
return dst; } /**
* This class is only used on windows to invoke the cygpath command. */
private static class CygPathCommand extends Shell { String[] command; String result;
CygPathCommand(String path) throws IOException { command = new String[]{\ run(); }
String getResult() throws IOException { return result; }
protected String[] getExecString() { return command;
}
protected void parseExecResult(BufferedReader lines) throws IOException { String line = lines.readLine(); if (line == null) {
throw new IOException(\ \ }
result = line; } } /**
* Convert a os-native filename to a path that works for the shell. * @param filename The filename to convert * @return The unix pathname
* @throws IOException on windows, there can be problems with the subprocess */
public static String makeShellPath(String filename) throws IOException { if (Path.WINDOWS) {
return new CygPathCommand(filename).getResult(); } else {
return filename; } } /**
* Convert a os-native filename to a path that works for the shell. * @param file The filename to convert * @return The unix pathname
* @throws IOException on windows, there can be problems with the subprocess */
public static String makeShellPath(File file) throws IOException { return makeShellPath(file, false); }
/**
* Convert a os-native filename to a path that works for the shell. * @param file The filename to convert * @param makeCanonicalPath
* Whether to make canonical path for the file passed * @return The unix pathname
* @throws IOException on windows, there can be problems with the subprocess */
public static String makeShellPath(File file, boolean makeCanonicalPath) throws IOException { if (makeCanonicalPath) {
return makeShellPath(file.getCanonicalPath()); } else {
return makeShellPath(file.toString()); } } /**
* Takes an input dir and returns the du on that local directory. Very basic * implementation. *
* @param dir
* The input dir to get the disk space of this local dir * @return The total disk space of the input local directory */
public static long getDU(File dir) { long size = 0; if (!dir.exists()) return 0;
if (!dir.isDirectory()) { return dir.length(); } else {
File[] allFiles = dir.listFiles(); if(allFiles != null) {
for (int i = 0; i < allFiles.length; i++) {
boolean isSymLink; try {
isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]); } catch(IOException ioe) { isSymLink = true; }
if(!isSymLink) {
size += getDU(allFiles[i]); } } }
return size; } } /**
* Given a File input it will unzip the file in a the unzip directory * passed as the second parameter * @param inFile The zip file as input
* @param unzipDir The unzip directory where to unzip the zip file. * @throws IOException */
public static void unZip(File inFile, File unzipDir) throws IOException { Enumeration extends ZipEntry> entries; ZipFile zipFile = new ZipFile(inFile); try {
entries = zipFile.entries();
while (entries.hasMoreElements()) { ZipEntry entry = entries.nextElement(); if (!entry.isDirectory()) {
InputStream in = zipFile.getInputStream(entry); try {
File file = new File(unzipDir, entry.getName()); if (!file.getParentFile().mkdirs()) {
if (!file.getParentFile().isDirectory()) {
throw new IOException(\ file.getParentFile().toString()); } }
OutputStream out = new FileOutputStream(file); try {
byte[] buffer = new byte[8192]; int i;
while ((i = in.read(buffer)) != -1) { out.write(buffer, 0, i); } } finally { out.close(); } } finally { in.close(); } } } } finally { zipFile.close(); } } /**
* Given a Tar File as input it will untar the file in a the untar directory * passed as the second parameter *
* This utility will untar \ *
* @param inFile The tar file as input.
* @param untarDir The untar directory where to untar the tar file. * @throws IOException */
public static void unTar(File inFile, File untarDir) throws IOException { if (!untarDir.mkdirs()) { if (!untarDir.isDirectory()) {
throw new IOException(\ } }
StringBuffer untarCommand = new StringBuffer(); boolean gzipped = inFile.toString().endsWith(\ if (gzipped) {
untarCommand.append(\
untarCommand.append(FileUtil.makeShellPath(inFile)); untarCommand.append(\ }
untarCommand.append(\
untarCommand.append(FileUtil.makeShellPath(untarDir)); untarCommand.append(\ untarCommand.append(\
if (gzipped) {
untarCommand.append(\ } else {
untarCommand.append(FileUtil.makeShellPath(inFile)); }
String[] shellCmd = { \
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); shexec.execute();
int exitcode = shexec.getExitCode(); if (exitcode != 0) {
throw new IOException(\ \ } } /**
* Create a soft link between a src and destination * only on a local disk. HDFS does not support this * @param target the target for symlink * @param linkname the symlink
* @return value returned by the command */
public static int symLink(String target, String linkname) throws IOException{ String cmd = \ Process p = Runtime.getRuntime().exec(cmd, null); int returnVal = -1; try{
returnVal = p.waitFor(); } catch(InterruptedException e){ //do nothing as of yet }
if (returnVal != 0) {
LOG.warn(\ \ }
return returnVal; }
private static String copyStderr(Process p) throws IOException { InputStream err = p.getErrorStream(); StringBuilder result = new StringBuilder(); byte[] buff = new byte[4096]; int len = err.read(buff); while (len > 0) {
result.append(new String(buff, 0 , len)); len = err.read(buff); }
return result.toString(); } /**
* Change the permissions on a filename. * @param filename the name of the file to change * @param perm the permission string * @return the exit code from the command * @throws IOException * @throws InterruptedException */
public static int chmod(String filename, String perm
) throws IOException, InterruptedException { return chmod(filename, perm, false); } /**
* Change the permissions on a file / directory, recursively, if * needed.
* @param filename name of the file whose permissions are to change * @param perm permission string
* @param recursive true, if permissions should be changed recursively * @return the exit code from the command. * @throws IOException * @throws InterruptedException */
public static int chmod(String filename, String perm, boolean recursive) throws IOException { StringBuffer cmdBuf = new StringBuffer(); cmdBuf.append(\ if (recursive) {
cmdBuf.append(\ }
cmdBuf.append(perm).append(\ cmdBuf.append(filename);
String[] shellCmd = {\
ShellCommandExecutor shExec = new ShellCommandExecutor(shellCmd); try {
shExec.execute();
}catch(IOException e) { if(LOG.isDebugEnabled()) {
LOG.debug(\ +\ } }
return shExec.getExitCode(); } /**
* Set permissions to the required value. Uses the java primitives instead * of forking if group == other. * @param f the file to change
* @param permission the new permissions * @throws IOException */
public static void setPermission(File f, FsPermission permission ) throws IOException { FsAction user = permission.getUserAction(); FsAction group = permission.getGroupAction(); FsAction other = permission.getOtherAction();
// use the native/fork if the group/other permissions are different // or if the native is available
if (group != other || NativeIO.isAvailable()) { execSetPermission(f, permission); return; }
boolean rv = true;
// read perms
rv = f.setReadable(group.implies(FsAction.READ), false); checkReturnValue(rv, f, permission);
if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
f.setReadable(user.implies(FsAction.READ), true); checkReturnValue(rv, f, permission); }
// write perms
rv = f.setWritable(group.implies(FsAction.WRITE), false); checkReturnValue(rv, f, permission);
if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) { f.setWritable(user.implies(FsAction.WRITE), true); checkReturnValue(rv, f, permission); }
// exec perms
rv = f.setExecutable(group.implies(FsAction.EXECUTE), false); checkReturnValue(rv, f, permission);
if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) { f.setExecutable(user.implies(FsAction.EXECUTE), true); checkReturnValue(rv, f, permission); } }
private static void checkReturnValue(boolean rv, File p,
FsPermission permission ) throws IOException {
System.out.println(\
// if (!rv) {
// throw new IOException(\ // \
// String.format(\// } }
private static void execSetPermission(File f,
FsPermission permission ) throws IOException {
if (NativeIO.isAvailable()) {
NativeIO.chmod(f.getCanonicalPath(), permission.toShort()); } else {
execCommand(f, Shell.SET_PERMISSION_COMMAND, String.format(\ } }
static String execCommand(File f, String... cmd) throws IOException { String[] args = new String[cmd.length + 1]; System.arraycopy(cmd, 0, args, 0, cmd.length); args[cmd.length] = f.getCanonicalPath(); String output = Shell.execCommand(args); return output; } /**
* Create a tmp file for a base file.
* @param basefile the base file of the tmp * @param prefix file name prefix of tmp
* @param isDeleteOnExit if true, the tmp will be deleted when the VM exits * @return a newly created tmp file
* @exception IOException If a tmp file cannot created * @see java.io.File#createTempFile(String, String, File) * @see java.io.File#deleteOnExit() */
public static final File createLocalTempFile(final File basefile, final String prefix,
final boolean isDeleteOnExit) throws IOException {
File tmp = File.createTempFile(prefix + basefile.getName(), \ if (isDeleteOnExit) { tmp.deleteOnExit(); }
return tmp; } /**
* Move the src file to the name specified by target. * @param src the source file * @param target the target file
* @exception IOException If this operation fails */
public static void replaceFile(File src, File target) throws IOException { /* renameTo() has two limitations on Windows platform. * src.renameTo(target) fails if * 1) If target already exists OR
* 2) If target is already open for reading/writing. */
if (!src.renameTo(target)) { int retries = 5;
while (target.exists() && !target.delete() && retries-- >= 0) { try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IOException(\ } }
if (!src.renameTo(target)) {
throw new IOException(\ \ } } } /**
* A wrapper for {@link File#listFiles()}. This java.io API returns null * when a dir is not a directory or for any I/O error. Instead of having * null check everywhere File#listFiles() is used, we will add utility API
* to get around this problem. For the majority of cases where we prefer * an IOException to be thrown.
* @param dir directory for which listing should be performed * @return list of files or empty list
* @exception IOException for invalid directory or for a bad disk. */
public static File[] listFiles(File dir) throws IOException { File[] files = dir.listFiles(); if(files == null) {
throw new IOException(\ + dir.toString()); }
return files; } /**
* A wrapper for {@link File#list()}. This java.io API returns null * when a dir is not a directory or for any I/O error. Instead of having * null check everywhere File#list() is used, we will add utility API * to get around this problem. For the majority of cases where we prefer * an IOException to be thrown.
* @param dir directory for which listing should be performed * @return list of file names or empty string list
* @exception IOException for invalid directory or for a bad disk. */
public static String[] list(File dir) throws IOException { String[] fileNames = dir.list(); if(fileNames == null) {
throw new IOException(\ + dir.toString()); }
return fileNames; } }
重新编译打包hadoop-core-1.0.2.jar,替换掉hadoop-1.0.2根目录下的hadoop-core-1.0.2.jar即可。
替换之后,刷新项目,设置好正确的jar包依赖,现在再运行WordCountTest,即可。 成功之后,在Eclipse下刷新HDFS目录,可以看到生成了ouput2目录:
点击“ part-r-00000”文件,可以看到排序结果:
Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2
嗯,一样可以正常Debug调试该程序,设置断点(右键 –> Debug As – > Java Application),即可(每次运行之前,都需要收到删除输出目录)。
另外,该插件会在eclipse对应的workspace\\.metadata\\.plugins\\org.apache.hadoop.eclipse下,自动生成jar文件,以及其他文件,包括Haoop的一些具体配置等。 嗯,更多细节,慢慢体验吧。
正在阅读:
hadoop单机部署、集群部署及win7本地Eclipse远程配置管理05-02
高考模拟试题汇编答案01-20
如何处理这起“一物二卖”纠纷05-04
小学寒假计划作文06-15
高等教育学考博复习(厦大版)06-22
领导在2022年全区机关效能建设动员大会上的讲话范文03-22
关于乡镇年度抓建设、强管理本年工作开展实施情况总结报告及来年下一步工作计划发展08-05
七年级生物上册 第1单元 认识生命(第2课时)复习学案 北师大版08-28
制度管理办法02-01
初级会计学习题详细解答05-10
- 多层物业服务方案
- (审判实务)习惯法与少数民族地区民间纠纷解决问题(孙 潋)
- 人教版新课标六年级下册语文全册教案
- 词语打卡
- photoshop实习报告
- 钢结构设计原理综合测试2
- 2014年期末练习题
- 高中数学中的逆向思维解题方法探讨
- 名师原创 全国通用2014-2015学年高二寒假作业 政治(一)Word版
- 北航《建筑结构检测鉴定与加固》在线作业三
- XX县卫生监督所工程建设项目可行性研究报告
- 小学四年级观察作文经典评语
- 浅谈110KV变电站电气一次设计-程泉焱(1)
- 安全员考试题库
- 国家电网公司变电运维管理规定(试行)
- 义务教育课程标准稿征求意见提纲
- 教学秘书面试技巧
- 钢结构工程施工组织设计
- 水利工程概论论文
- 09届九年级数学第四次模拟试卷
- 部署
- 集群
- 单机
- 远程
- 本地
- 配置
- Eclipse
- hadoop
- 管理
- win7
- 某职工住宅大楼的水电施工组织设计方案
- 盐边县新九乡工矿废弃地复垦利用试点项目施工组织
- 江苏省苏州市2016-2017学年高二上学期期末考试英语试题 Word版含
- 贵州省建筑施工企业安全生产评价达标实施指南
- 第五章 设计一个合理的教学
- 大学物理期末试卷(盐城工学院)
- 中国象棋活动小组教案51课时
- 规范汉字书写大赛资料及答案
- 小班幼儿体格锻炼计划
- 2015融合教育竞赛权威复习资料完整版,保证90分以上 - 图文
- 工业电视系统工程设计规范
- 案例1 帕累托的潜台词
- 作物栽培学复习资料2
- 三年级公共安全教育期末测试题
- 模具CADCAM讲稿
- 电力电缆不同敷设方式注意问题
- 河道治理工程
- 大型商业地产项目前期策划模版 - 图文
- 合江荔枝文化广场园林景观施工组织设计
- 2014江苏省第九届高校哲学社会科学研究优秀成果奖名单