java怎么连接hdfshdfs与linux文件系统统,需要哪些包?

16:30 提问
java连接hadoop hdfs文件系统报错
报错信息:
java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "localhost.localdomain/127.0.0.1"; destination host is: "172.16.6.57":9000;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:763)
at org.apache.hadoop.ipc.Client.call(Client.java:1229)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
at $Proxy9.create(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
at $Proxy9.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:193)
at org.apache.hadoop.hdfs.DFSOutputStream.(DFSOutputStream.java:1324)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1343)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1255)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1212)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:276)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:265)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:82)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:781)
at com.zk.hdfs.FileCopyToHdfs.uploadToHdfs(FileCopyToHdfs.java:44)
at com.zk.hdfs.FileCopyToHdfs.main(FileCopyToHdfs.java:21)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
at com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:73)
at com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
at com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:213)
at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:746)
at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
at com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:282)
at com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:760)
at com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:288)
at com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:752)
at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:985)
at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:938)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:836)
代码是在网上找的:
package com.zk.
import java.io.BufferedInputS
import java.io.FileInputS
import java.io.FileNotFoundE
import java.io.IOE
import java.io.InputS
import java.io.OutputS
import java.net.URI;
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IOU
import org.apache.hadoop.util.P
public class FileCopyToHdfs {
public static void main(String[] args) throws Exception {
uploadToHdfs();
//deleteFromHdfs();
//getDirectoryFromHdfs();
appendToHdfs();
readFromHdfs();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("SUCCESS");
/**上传文件到HDFS上去*/
public static void uploadToHdfs() throws FileNotFoundException,IOException {
String localSrc = "e:/test.txt";
String dst = "hdfs://172.16.6.57:9000/user/abc/zk/test1.txt";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
IOUtils.copyBytes(in, out, 4096, true);
总是报连接问题,网上搜不到资料,大牛帮下忙啊
按赞数排序
----------------------biu~biu~biu~~~在下问答机器人小D,这是我依靠自己的聪明才智给出的答案,如果不正确,你来咬我啊!
链接地址 跟你的电脑IP一样吗?
local host is: "localhost.localdomain/127.0.0.1"; destination host is: "172.16.6.57":9000;
这访问ip问题吧
其他相似问题hadoop2.5.2学习及实践笔记(六)—— Hadoop文件系统及其java接口 - 委琐牢头 - 博客园
文件系统概述
org.apache.hadoop.fs.FileSystem是hadoop的抽象文件系统,为不同的数据访问提供了统一的接口,并提供了大量具体文件系统的实现,满足hadoop上各种数据访问需求,如以下几个具体实现(原表格见《hadoop权威指南》):
(org.apache.hadoop)
fs.LocalFileSystem
支持有客户端校验和本地文件系统。带有校验和的本地系统文件在fs.RawLocalFileSystem中实现。
hdfs.DistributionFileSystem
Hadoop的分布式文件系统。
hdfs.HftpFileSystem
支持通过HTTP方式以只读的方式访问HDFS,distcp经常用在不同的HDFS集群间复制数据。
hdfs.HsftpFileSystem
支持通过HTTPS方式以只读的方式访问HDFS。
fs.HarFileSystem
构建在Hadoop文件系统之上,对文件进行归档。Hadoop归档文件主要用来减少NameNode的内存使用。
fs.kfs.KosmosFileSystem
Cloudstore(其前身是Kosmos文件系统)文件系统是类似于HDFS和Google的GFS文件系统,使用C++编写。
fs.ftp.FtpFileSystem
由FTP服务器支持的文件系统。
S3(本地)
fs.s3native.NativeS3FileSystem
基于Amazon S3的文件系统。
S3(基于块)
fs.s3.NativeS3FileSystem
基于Amazon S3的文件系统,以块格式存储解决了S3的5GB文件大小的限制。
在环境搭建时,我们配置fs.defaultFS属性值为hdfs://localhost:9000,即已指定文件系统为HDFS系统。
通过源码,可以查看FileSystem类的层次结构如下
文件系统的方法分为两类:一部分处理文件和目录;一部分读写文件数据。hadoop抽象文件系统的文件操作与java、linux的对应关系(原表格见《Hadoop技术内幕 深入解析HADOOP COMMON和HDFS架构设计与实现原理》):
Hadoop的FileSystem
URL.openSteam
FileSystem.open
FileSystem.create
FileSystem.append
URL.openStream
打开一个文件
FSDataInputStream.read
InputSteam.read
读取文件中的数据
FSDataOutputStream.write
OutputSteam.write
向文件写入数据
FSDataInputStream.close
FSDataOutputStream.close
InputSteam.close
OutputSteam.close
关闭一个文件
FSDataInputStream.seek
RandomAccessFile.seek
改变文件读写位置
FileSystem.getFileStatus
FileSystem.get*
获取文件/目录的属性
FileSystem.set*
改变文件的属性
FileSystem.createNewFile
File.createNewFile
创建一个文件
FileSystem.delete
File.delete
从文件系统中删除一个文件
FileSystem.rename
File.renameTo
更改文件/目录名
FileSystem.mkdirs
File.mkdir
在给定目录下创建一个子目录
FileSystem.delete
File.delete
从一个目录中删除一个空的子目录
FileSystem.listStatus
读取一个目录下的项目
FileSystem.getWorkingDirectory
getcwd/getwd
返回当前工作目录
FileSystem.setWorkingDirectory
更改当前工作目录
一. 获取文件系统实例
通过FileSystem的get()或newInstance()方法获取文件系统的实例。
get()和newInstance()方法分别有3个重载方法:
//返回默认文件系统,core-site.xml中指定的,如果没有指定,则默认本地文件系统
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem newInstance(Configuration conf) throws IOException
//通过给定URI方案和权限来确定要使用的文件系统,若URI中未指定方案,返回默认文件系统
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem newInstance(URI uri, Configuration conf) throws IOException
//作为给定用户来访问文件系统,对安全来说很重要
public static FileSystem get(final URI uri, final Configuration conf, final String user)
throws IOException, InterruptedException
public static FileSystem newInstance(final URI uri, final Configuration conf, final String user) throws IOException, InterruptedException
另外可以通过getLocal()或newInstanceLocal()获取本地文件系统:
public static LocalFileSystem getLocal(Configuration conf) throws IOException
public static LocalFileSystem newInstanceLocal(Configuration conf) throws IOException
二. 读取数据
1. 从hadoop url读取数据
读取文件最简单的方法是使用java.net.URL对象打开数据流,从中读取数据,但让java程序能识别hadoop的hdfs url需要通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。
HDFS中有一个/input/input1.txt文件,文件内容&hello hadoop!&
&java测试类代码:
public class ReadFromHadoopURL {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
public static void main(String[] args) throws Exception{
String uri = "hdfs://localhost:9000/input/input1.txt";
InputStream in =
in = new URL(uri).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
IOUtils.closeStream(in);
&运行结果:
这种文件读取的方法具有一定的限制性。因为Java.net.URL的setURLStreamHandlerFactory方法每个java虚拟机最多调用一次,如果程序中有不受自己控制的第三方组件调用了这个方法,将无法使用这种方法从hadoop中读取数据。
附setURLStreamHandlerFactory源码:
public static void setURLStreamHandlerFactory(URLStreamHandlerFactory fac) {
synchronized (streamHandlerLock) {
if (factory != null) {
throw new Error("factory already defined");
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkSetFactory();
handlers.clear();
2.通过FileSystem API读取数据&
hadoop文件系统中通过org.apache.hadoop.fs.Path对象来代表文件。
获取到FileSystem实例后通过open()方法获取文件的输入流
//缓冲区默认大小4KB,bufferSize指定缓冲区大小
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOE
java测试类代码:
public class ReadFromFileSystemAPI {
public static void main(String[] args) throws Exception{
String uri = "hdfs://localhost:9000/input/input1.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
//第二种获取文件系统的方法
//FileSystem fs = FileSystem.newInstance(URI.create(uri), conf);
InputStream in =
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
IOUtils.closeStream(in);
运行结果:
输入流FSDataInputStream对象介绍
FileSystem对象中的open()方法返回的是org.apache.hadoop.fs.FSDataInputStream对象,这个对象继承了java.io.DataInputStream,并支持随机访问,从流的任意位置读取数据。
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess{
//implementation
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量的方法。注:seek()方法开销相对高,需要慎用。
public interface Seekable {
//定位到从文件起始位置开始指定的偏移量的位置,若偏移量超出文件位置会报异常
void seek(long pos) throws IOE
//返回当前位置相对于文件起始位置的偏移量
long getPos() throws IOE
//查找数据的其他副本,若找到一个新副本则返回true,否则返回false
boolean seekToNewSource(long targetPos) throws IOE
PositionedReadable接口从一个指定偏移量处读取文件的一部分。
public interface PositionedReadable {
//从文件指定position处读取至多length字节的数据,并存入缓冲区buffer的指定偏移量offset处
//返回值是督导的字节数,可能比length的长度小
public int read(long position, byte[] buffer, int offset, int length) throws IOE
//从文件指定position处读取指定length的字节,并存入缓冲区buffer指定偏移量offset处
//若读到文件末尾仍不足length字节,则抛出异常
public void readFully(long position, byte[] buffer, int offset, int length) throws IOE
//从文件指定position处读取缓冲区buffer大小的字节,并存入buffer
//若读到文件末尾仍不足length字节,则抛出异常
public void readFully(long position, byte[] buffer) throws IOE
测试代码:
public class TestFSDataInputStream {
private FileSystem fs =
private FSDataInputStream in =
private String uri = "hdfs://localhost:9000/input/input1.txt";
private Logger log = Logger.getLogger(TestFSDataInputStream.class);
PropertyConfigurator.configure("conf/log4j.properties");
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(URI.create(uri), conf);
public void test() throws Exception{
in = fs.open(new Path(uri));
("文件内容:");
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(6);
Long pos = in.getPos();
("当前偏移量:"+pos);
("读取内容:");
IOUtils.copyBytes(in, System.out, 4096, false);
byte[] bytes = new byte[10];
int num = in.read(7, bytes, 0, 10);
("从偏移量7读取10个字节到bytes,共读取"+num+"字节");
("读取内容:"+(new String(bytes)));
//以下代码会抛出EOFException
in.readFully(6, bytes);
in.readFully(6, bytes, 0, 10);
IOUtils.closeStream(in);
&运行结果:
三. 写入数据
1.新建文件
给准备建的文件指定一个Path对象,然后通过FileSystem的create()方法返回一个用于写入数据的输出流。
Create()方法有多个重载版本,允许指定是否需要强制覆盖现有文件、文件备份数量、写入文件时缓冲区大小、文件块大小及文件权限。还可指定Progressable回调接口,这样可以把数据写入datanode的进度通知给应用。
Create()方法能为需要写入且当前不存在的文件创建父目录,若不希望这样,则应先调用exists()方法检查父目录是否存在。
create()方法的所有重载方法:
//创建一个输出流,默认覆盖现有文件
public FSDataOutputStream create(Path f) throws IOException
//创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
//创建一个输出流,默认覆盖现有文件,progress用来报告进度
public FSDataOutputStream create(Path f, Progressable progress)
throws IOException
//创建一个输出流,默认覆盖现有文件,replication指定文件备份数
public FSDataOutputStream create(Path f, short replication) throws IOException
//创建一个输出流,默认覆盖现有文件,replication指定文件备份数,progress用来报告进度
public FSDataOutputStream create(Path f, short replication, Progressable progress)
throws IOException
//创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
//bufferSize指定写入时缓冲区大小
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException
//创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
// bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize ) throws IOException
//创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
// bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
// progress用来报告进度
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,short replication, long blockSize,
Progressable progress ) throws IOException
//创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
// bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
// progress用来报告进度,permission指定文件权限
public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
throws IOE
//创建一个输出流,permission指定文件权限, bufferSize指定写入时缓冲区大小
// replication指定文件备份数,progress用来报告进度
// flags指定创建标志,标志如下:
CREATE - 如果文件不存在则创建文件,否则抛出异常
APPEND - 如果文件存在则向文件追加内容,否则抛出异常
OVERWRITE - 文件存在时,覆盖现有文件,否则抛出异常
CREATE|APPEND - 文件不存在时创建文件,文件已存在时向文件追加内容
CREATE|OVERWRITE - 文件不存在时创建文件,否则覆盖已有文件
SYNC_BLOCK - 强制关闭文件块,如果需要同步操作,每次写入后还需调用Syncable.hsync()方法
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet&CreateFlag& flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException
//创建一个输出流,permission指定文件权限, bufferSize指定写入时缓冲区大小
// replication指定文件备份数,progress用来报告进度,blockSize指定文件块大小
// checksumOpt指定校验和选项,若为空,则使用配置文件中的值
// flags指定创建标志,标志如下:
CREATE - 如果文件不存在则创建文件,否则抛出异常
APPEND - 如果文件存在则向文件追加内容,否则抛出异常
OVERWRITE - 文件存在时,覆盖现有文件,否则抛出异常
CREATE|APPEND - 文件不存在时创建文件,文件已存在时向文件追加内容
CREATE|OVERWRITE - 文件不存在时创建文件,否则覆盖已有文件
SYNC_BLOCK - 强制关闭文件块,如果需要同步操作,每次写入后还需调用Syncable.hsync()方法
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet&CreateFlag& flags, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt)
throws IOException
写入前HDFS中目录结构:
测试代码:
public class WriteByCreate {
PropertyConfigurator.configure("conf/log4j.properties");
public void createTest() throws Exception {
String localSrc = "/home/hadoop/merge.txt";
String dst = "hdfs://localhost:9000/input/merge.txt";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out =
out = fs.create(new Path(dst),
new Progressable() {
public void progress() {
System.out.print(".");
("write start!");
IOUtils.copyBytes(in, out, 4096, true);
System.out.println();
("write end!");
IOUtils.closeStream(in);
IOUtils.closeStream(out);
运行结果:
2.向已存在文件末尾追加数据
&FileSystem的append()方法允许在一个已存在文件的最后偏移量处追加数据。追加操作是可选的,并不是所有hadoop文件系统都实现了该操作。
Append()的重载方法&
//向指定文件中追加数据,默认缓冲区大小4096,文件不存在时抛出异常
public FSDataOutputStream append(Path f) throws IOException
//向指定文件中追加数据,bufferSize指定缓冲区大小,文件不存在时抛出异常
public FSDataOutputStream append(Path f, int bufferSize) throws IOException
//向指定文件中追加数据,bufferSize指定缓冲区大小,文件不存在时抛出异常,progress报告进度
public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOE
public class WriteByAppend{
PropertyConfigurator.configure("conf/log4j.properties");
public void appendTest() throws Exception {
String localSrc = "/home/hadoop/merge.txt";
String dst = "hdfs://localhost:9000/input/merge.txt";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out =
out = fs.append(new Path(dst),4096,
new Progressable() {
public void progress() {
System.out.print(".");
("write start!");
IOUtils.copyBytes(in, out, 4096, true);
System.out.println();
("write end!");
IOUtils.closeStream(in);
IOUtils.closeStream(out);
输出流FSDataOutputStream对象
FileSystem的create()方法及append()方法返回的是FSDataOutputStream对象,它也有一个查询文件当前位置的方法getPos()。与FSDataInputStream不同,FSDataOutputStream不允许在文件中定位,因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件末尾追加数据,不支持在除文件末尾外的其他位置进行写入,因此写入时定位没有意义。
四. 创建目录
FileSystem提供了创建目录的方法。可以一次性创建所有必要但还没有的父目录。
&public boolean mkdirs(Path f) throws IOException
&public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOE
通常不需要显示创建一个目录,因为调用create()方法写入文件时会自动创建父目录。
五. 查询文件系统
文件元数据FileStatus
FileStatus类封装了文件系统中文件和目录的元数据,FileStatus源码中可以看到如下属性
public class FileStatus implements Writable, Comparable {
private P//文件或目录的path
//文件字节数
p//是否是目录
private short block_//文件块备份数
pr//文件块大小
private long modification_//修改时间
private long access_//访问时间
private FsP//权限
private S//所属用户
private S//所属用户组
private P //软连接
FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象
测试代码:
public class ShowFileStatus {
private MiniDFSC // use an in-process HDFS cluster for testing
private FileS
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
cluster = new MiniDFSCluster(conf, 1, true, null);
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
public void tearDown() throws IOException {
if (fs != null) {
fs.close();
if (cluster != null) {
cluster.shutdown();
@Test(expected = FileNotFoundException.class)
public void throwsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
("文件filestatus:");
FileStatus stat = fs.getFileStatus(file);
("path:"+stat.getPath().toUri().getPath());
("isdir:"+String.valueOf(stat.isDir()));
("length:"+String.valueOf(stat.getLen()));
("modification:"+String.valueOf(stat.getModificationTime()));
("replication:"+String.valueOf(stat.getReplication()));
("blicksize:"+String.valueOf(stat.getBlockSize()));
("owner:"+stat.getOwner());
("group:"+stat.getGroup());
("permission:"+stat.getPermission().toString());
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
("目录filestatus:");
FileStatus stat = fs.getFileStatus(dir);
("path:"+stat.getPath().toUri().getPath());
("isdir:"+String.valueOf(stat.isDir()));
("length:"+String.valueOf(stat.getLen()));
("modification:"+String.valueOf(stat.getModificationTime()));
("replication:"+String.valueOf(stat.getReplication()));
("blicksize:"+String.valueOf(stat.getBlockSize()));
("owner:"+stat.getOwner());
("group:"+stat.getGroup());
("permission:"+stat.getPermission().toString());
&运行结果:
列出目录中内容,可以使用FileSystem的listStatus()方法。方法接收一个或一组路径,如果路径是文件,以数组方法返回长度为1的FileStatus对象,如果路径是目录,返回0个或多个FileStatus对象表示目录中包含的文件或目录;如果是一组路径,依次轮流对每个路径调用listStatus方法,将结果累积到一个数组
//列出给定路径下的文件或目录的status
public abstract FileStatus[] listStatus(Path f)
throws FileNotFoundException, IOE
//列出给定路径下符合用户提供的filter限制的文件或目录的status
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException
//列出给定的一组路径下文件或目录的status
public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException
//列出给定的一组路径下符合用户提供的filter限制的文件或目录的status
public FileStatus[] listStatus(Path[] files, PathFilter filter)
throws FileNotFoundException, IOException
目录结构:
测试代码:
public class ListFileStatus {
private FileSystem fs =
private String uri = "hdfs://localhost:9000/input/input1.txt";
private Path[] paths = new Path[]{new Path("/input.zip"),new Path("/input/"),new Path("/output/")};
private Logger log = Logger.getLogger(TestFSDataInputStream.class);
PropertyConfigurator.configure("conf/log4j.properties");
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(URI.create(uri), conf);
public void listStatusTest() throws Exception {
("--------------------------------");
("列出文件 ["+paths[0]+"] 的status:");
FileStatus[] status = fs.listStatus(paths[0]);
printFileStatus(status);
("--------------------------------");
("--------------------------------");
("列出目录 ["+paths[1]+"] 的status:");
status = fs.listStatus(paths[1]);
printFileStatus(status);
("--------------------------------");
("--------------------------------");
("列出一组path "+Arrays.toString(paths)+" 的status:");
status = fs.listStatus(paths);
printFileStatus(status);
("--------------------------------");
protected void printFileStatus(FileStatus[] status){
for (FileStatus s : status) {
(s.getPath()+" status:");
("isdir:"+String.valueOf(s.isDir()));
("length:"+String.valueOf(s.getLen()));
("modification:"+String.valueOf(s.getModificationTime()));
("replication:"+String.valueOf(s.getReplication()));
("blicksize:"+String.valueOf(s.getBlockSize()));
("owner:"+s.getOwner());
("group:"+s.getGroup());
("permission:"+s.getPermission().toString());
&测试结果:
另外,需要在一次操作中处理一批文件时,hadoop提供了通配符来匹配多个文件。
匹配0或多个字符
匹配单衣字符
匹配{a,b}集合里的一个字符
匹配非{a,b}集合里的一个字符
匹配一个{a,b}范围内的字符,包括ab,a的字典顺序要小于等于b
非字符范围
匹配一个不在{a,b}范围内的字符,包括ab,a的字典顺序要小于等于b
匹配包含a或b中一个的
匹配原字符c
hadoop的FileSystem为通配提供了2个globStatus()方法,方法返回所有文件路径与给定的通配符相匹配的文件的FileStatus,filter可进一步对匹配进行限制:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
├── 2007/
│ └── 12/
│ &&&&&├── 30/
│&&&& &└── 31/
└── 2008/
&&&&& └── 01/
&&&&&&&&&& ├── 01/
&&&&&&&&&& └── 02/
通配符示例:
/*/*/{31,01}
/*/*/3{0,1}
/*/{12/31,01/01}
目录结构:
测试代码:
public class ListFileStatus {
private FileSystem fs =
private String uri = "hdfs://localhost:9000/input/input1.txt";
private Path[] globPaths = new Path[]{new Path("/*"),new Path("/*/*"),new Path("/*/12/*"),new Path("/200?")
,new Path("/200[78]"),new Path("/200[7-8]"),new Path("/200[^]")
,new Path("/*/*/{31,01}"),new Path("/*/*/3{0,1}"),new Path("/*/{12/31,01/01}")};
private Logger log = Logger.getLogger(TestFSDataInputStream.class);
PropertyConfigurator.configure("conf/log4j.properties");
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(URI.create(uri), conf);
public void globStatusTest() throws Exception {
for(Path p:globPaths){
("glob ["+p+"]: ");
FileStatus[] status = fs.globStatus(p);
printFilePath(status);
protected void printFilePath(FileStatus[] status){
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
&运行结果:
通配符并不总能精确的描述的描述想要访问的文件集,如使用通配符排除一个特定的文件就不太可能。FileSystem的listStatus()方法和globStatus()方法提供可选的PathFilter对象,以编程方式控制通配符。过滤器只能作用于文件名,不能针对文件属性进行过滤
PathFilter接口:
public interface PathFilter {
boolean accept(Path path);
public class ListFileStatus {
private FileSystem fs =
private String uri = "hdfs://localhost:9000/input/input1.txt";
private Logger log = Logger.getLogger(TestFSDataInputStream.class);
PropertyConfigurator.configure("conf/log4j.properties");
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(URI.create(uri), conf);
public void pathFilterTest() throws Exception {
("glob [/2007/*/*]: ");
FileStatus[] status = fs.globStatus(new Path("/2007/*/*"));
printFilePath(status);
("glob [/2007/*/*] except [/]: ");
status = fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/$"));
printFilePath(status);
protected void printFilePath(FileStatus[] status){
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
class RegexExcludePathFilter implements PathFilter {
private final S
public RegexExcludePathFilter(String regex) {
this.regex =
public boolean accept(Path path) {
return !path.toString().matches(regex);
&运行结果:
六. 删除数据
FileSystem的delete()方法可以永久删除文件或目录。
public boolean delete(Path f) throws IOException
//recursive为true时,非空目录及其内容才会被删除,否则抛出异常
public abstract boolean delete(Path f, boolean recursive) throws IOException
//标记当文件系统关闭时将删除的文件。当JVM关闭时,被标记的文件将被删除
public boolean deleteOnExit(Path f) throws IOException

我要回帖

更多关于 hadoop hdfs文件系统 的文章

 

随机推荐