·天新网首页·加入收藏·设为首页·网站导航
数码笔记本手机摄像机相机MP3MP4GPS
硬件台式机网络服务器主板CPU硬盘显卡
办公投影打印传真
家电电视影院空调
游戏网游单机动漫
汽车新车购车试驾
下载驱动源码
学院开发设计
考试公务员高考考研
业界互联网通信探索
您现在的位置:天新网 > 软件开发
hadoop深入研究(二)——java访问hdfs
http://www.21tx.com 2013年07月16日 CSDN 逆火天麟

1 2 下一页

所有源码在github上,https://github.com/lastsweetop/styhadoop

读数据使用hadoop url读取

比较简单的读取hdfs数据的方法就是通过Java.net.URL打开一个流,不过在这之前先要预先调用它的setURLStreamHandlerFactory方法设置为FsUrlStreamHandlerFactory(由此工厂取解析hdfs协议),这个方法只能调用一次,所以要写在静态块中。然后调用IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。我们这里要设置为false,标准输出流不关闭,我们要手动关闭输入流。

package com.sweetop.styhadoop;  
      
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;  
import org.apache.hadoop.io.IOUtils;  
      
import java.io.InputStream;  
import java.net.URL;  
      
/** 
 * Created with IntelliJ IDEA. 
 * User: lastsweetop 
 * Date: 13-5-31 
 * Time: 上午10:16 
 * To change this template use File | Settings | File Templates. 
 */
public class URLCat {  
      
    static {  
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());  
    }  
      
    public static void main(String[] args) throws Exception {  
        InputStream in = null;  
        try {  
            in = new URL(args[0]).openStream();  
            IOUtils.copyBytes(in, System.out, 4096, false);  
        } finally {  
            IOUtils.closeStream(in);  
        }  
    }  
}

使用FileSystem API读取数据

首先是实例化FileSystem对象,通过FileSystem类的get方法,这里要传入一个java.net.URL和一个配置Configuration。

然后FileSystem可以通过一个Path对象打开一个流,之后的操作和上面的例子一样

package com.sweetop.styhadoop;  
      
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IOUtils;  
      
import java.io.InputStream;  
import java.net.URI;  
      
/** 
 * Created with IntelliJ IDEA. 
 * User: lastsweetop 
 * Date: 13-5-31 
 * Time: 上午11:24 
 * To change this template use File | Settings | File Templates. 
 */
public class FileSystemCat {  
    public static void main(String[] args) throws Exception {  
        String uri=args[0];  
        Configuration conf=new Configuration();  
        FileSystem fs=FileSystem.get(URI.create(uri),conf);  
        InputStream in=null;  
        try {  
            in=fs.open(new Path(uri));  
            IOUtils.copyBytes(in, System.out, 4096, false);  
        }   finally {  
            IOUtils.closeStream(in);  
        }  
    }  
}

FSDataInputStream

通过FileSystem打开流返回的对象是个FSDataInputStream对象,该类实现了Seekable接口,

public interface Seekable {  
    void seek(long l) throws java.io.IOException;  
    long getPos() throws java.io.IOException;  
    boolean seekToNewSource(long l) throws java.io.IOException;  
}

seek方法可跳到文件中的任意位置,我们这里跳到文件的初始位置再重新读一次

public class FileSystemDoubleCat {  
    public static void main(String[] args) throws Exception {  
        String uri = args[0];  
        Configuration conf = new Configuration();  
        FileSystem fs = FileSystem.get(URI.create(uri), conf);  
        FSDataInputStream in=null;  
        try {  
            in = fs.open(new Path(uri));  
            IOUtils.copyBytes(in, System.out, 4096, false);  
            in.seek(0);  
            IOUtils.copyBytes(in, System.out, 4096, false);  
        }   finally {  
            IOUtils.closeStream(in);  
        }  
    }  
}

FSDataInputStream还实现了PositionedReadable接口,

public interface PositionedReadable {  
    int read(long l, byte[] bytes, int i, int i1) throws java.io.IOException;  
    void readFully(long l, byte[] bytes, int i, int i1) throws java.io.IOException;  
    void readFully(long l, byte[] bytes) throws java.io.IOException;  
}

可以在任意位置(第一个参数),偏移量(第三个参数),长度(第四个参数),到数组中(第二个参数)
这里就不实现了,大家可以试下写数据

FileSystem类有很多种创建文件的方法,最简单的一种是

public FSDataOutputStreamcreate(Path f) throws IOException它还有很多重载方法,可以指定是否强制覆盖已存在的文件,文件的重复因子,写缓存的大小,文件的块大小,文件的权限等。

还可以指定一个回调接口:

public interface Progressable {  
    void progress();  
}

和普通文件系统一样,也支持apend操作,写日志时最常用

public FSDataOutputStreamappend(Path f) throws IOException

但并非所有hadoop文件系统都支持append,hdfs支持,s3就不支持。

以下是个拷贝本地文件到hdfs的例子

package com.sweetop.styhadoop;  
      
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IOUtils;  
import org.apache.hadoop.util.Progressable;  
      
import java.io.BufferedInputStream;  
import java.io.FileInputStream;  
import java.io.InputStream;  
import java.io.OutputStream;  
import java.net.URI;  
      
/** 
 * Created with IntelliJ IDEA. 
 * User: lastsweetop 
 * Date: 13-6-2 
 * Time: 下午4:54 
 * To change this template use File | Settings | File Templates. 
 */
public class FileCopyWithProgress {  
    public static void main(String[] args) throws Exception {  
        String localSrc = args[0];  
        String dst = args[1];  
      
        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));  
      
        Configuration conf = new Configuration();  
        FileSystem fs = FileSystem.get(URI.create(dst), conf);  
        OutputStream out = fs.create(new Path(dst), new Progressable() {  
            @Override
            public void progress() {  
                System.out.print(".");  
            }  
        });  
      
        IOUtils.copyBytes(in, out, 4096, true);

目录

上一篇: 使用 Meteor 快速开发 Web 应用程序
下一篇: PHP 5.3.27 发布,最后一个常规版本

1 2 下一页

关于我们 | 联系我们 | 加入我们 | 广告服务 | 投诉意见 | 网站导航
Copyright © 2000-2011 21tx.com, All Rights Reserved.
晨新科技 版权所有 Created by TXSite.net