import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
public class hdfs {
private final String hdfs_path="hdfs://bigdata01:9000";
private FileSystem fileSystem=null;
private Configuration configuration=null;
@Before
public void setUp() throws URISyntaxException, IOException, InterruptedException {
//获取Configuration配置对象
configuration = new Configuration();
//设置副本数
//configuration.set("dfs.replication","2");
//初始化文件系统连接,此时需要在windows中配置环境变量HADOOP_USER_NAME了
fileSystem = FileSystem.get(new URI(hdfs_path),configuration,"AzF");
System.out.println("----------------------已经连接上集群-------------------------");
System.out.println("\n");
}
//打印配置文件信息
@Test
public void test() throws IOException{
System.out.println(configuration);
System.out.println(fileSystem);
}
//创建目录
@Test
public void mkdirs() throws IOException {
fileSystem.mkdirs(new Path("/input/aa/bb/cc"));
//可以同时创建多级目录 /aaa/bbb/ccc
System.out.println("成功创建HDFS 文件目录");
}
//创建文件
@Test
public void creatNewFile() throws IOException {
boolean newFile = fileSystem.createNewFile(new Path("/input/cc.txt"));
if (newFile){
System.out.println("创建文件"+newFile+"成功");
}else {
System.out.println("创建文件"+newFile+"失败");
}
}
//本地复制文件到HDFS
@Test
public void copyFile() throws IOException {
//设置副本数
//configuration.set("dfs.replication","2");
fileSystem.copyFromLocalFile(new Path("I:/test.txt"),new Path("/AzF/"));
System.out.println("复制成功");
}
//HDFS上文件下载到本地
@Test
public void uploadFile() throws IOException {
//第一个参数true表示是"剪切"操作,没有或者false表示复制
fileSystem.copyToLocalFile(false,new Path("/input/test.txt"),new Path("I:/"));
System.out.println("下载成功");
}
//删除HDFS上的文件
@Test
public void del() throws IOException {
//delete默认为true,false时只能删除空的文件夹
boolean delete =fileSystem.delete(new Path("/input/test.txt"),false);
if (delete){
System.out.println("删除成功");
}else {
System.out.println("删除失败");
}
}
//判断是文件夹还是文件
@Test
public void isOrNotFile() throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/input"));
for (FileStatus fileStatus:fileStatuses) {
if (fileStatus.isDirectory()){
System.out.println(fileStatus.getPath().getName()+"是文件夹");
}else {
System.out.println(fileStatus.getPath().getName()+"是文件");
}
}
}
//判断文件是否存在
@Test
public void fileisexists() throws IOException {
boolean file = fileSystem.exists(new Path("/input"));
if (file){
System.out.println("文件存在");
}else {
System.out.println("文件不存在");
}
}
//查看文件详细信息(名称、权限、大小、块信息等),true:递归获取
@Test
public void detailedFiles() throws IOException {
System.out.println("查看文件详细信息");
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path("/AzF/output/a.txt"), true);
while (iterator.hasNext()){
LocatedFileStatus next = iterator.next();
System.out.println("文件名:"+ next.getPath().getName());
System.out.println("权限:"+ next.getPermission());
System.out.println("长度:"+ next.getLen());
System.out.println("获取访问时间:"+ next.getAccessTime());
System.out.println("所属组名:"+ next.getGroup());
System.out.println("所有者:"+ next.getOwner());
System.out.println("块大小:"+ next.getBlockSize());
System.out.println("副本数:"+next.getReplication());
System.out.println("获取修改时间:"+ next.getModificationTime());
System.out.println("是否是目录(文件夹):"+next.isDirectory());
System.out.println("是否是文件:"+ next.isFile());
BlockLocation[] blockLocations = next.getBlockLocations();
for (BlockLocation blockLocation:blockLocations){
String[] hosts = blockLocation.getHosts();
for (String host:hosts){
//副本存储在哪个主机上
System.out.println("副本主机:"+ host);
}
}
}
}
//文件重命名
@Test
public void reName() throws IOException {
Path filePath1 = new Path("/input/cc.txt");
Path filePath2 = new Path("/input/dd.txt");
boolean rename = fileSystem.rename(filePath1, filePath2);
if (rename){
System.out.println("文件重命名成功");
}else {
System.out.println("文件重命名失败");
}
}
//向文件末尾添加内容
@Test
public void writ() throws IOException {
FSDataOutputStream fdos = fileSystem.create(new Path("/input/dd.txt"));
fdos.writeUTF("136454564");
fdos.flush();
fdos.close();
System.out.println("内容添加完成");
}
@Test
public void readerFilrAll() throws IOException {
//将下载文件转换为流
FSDataInputStream inputStream = fileSystem.open(new Path("/input/dd.txt"));
//从HDFS读取到本地
IOUtils.copyBytes(inputStream,System.out,40,false);
}
//定位读取
@Test
public void getStringByLine() throws IOException {
FSDataInputStream dis = fileSystem.open(new Path("/aim.xml"));
BufferedReader br = new BufferedReader(new InputStreamReader(dis));
FileWriter fileWriter = new FileWriter(new File("F:/down/ss.xml"));
char[] temp = new char[16];
for (int i = 0; i < 16; i++) {//读几个字符写几个字符,共读取256个字符,一次读取16个,只需要读取16次
br.read(temp);
fileWriter.write(new String(temp));
}
br.close();
fileWriter.close();
System.out.println("定位读取文件完成");
}
//从指定位置开始读取
@Test
public void getStringByIOUtils() throws IOException {//定位读取
FSDataInputStream dis = fileSystem.open(new Path("/input/dd.txt"));
dis.seek(256);//定位
BufferedReader br = new BufferedReader(new InputStreamReader(dis));
FileOutputStream fos = new FileOutputStream(new File("I:/input/dd.txt"));
IOUtils.copyBytes(dis, fos,configuration);
IOUtils.closeStream(dis);
IOUtils.closeStream(fos);
}
@After
public void tearDown() throws IOException {
fileSystem.close();
System.out.println("\n*******************关闭资源**********************");
}
}