现在的位置: 首页 > 大数据 > hadoop > 正文
Hadoop 合并hadoop中的多个文件到一个文件
2014年06月23日 hadoop ⁄ 共 3491字 Hadoop 合并hadoop中的多个文件到一个文件已关闭评论 ⁄ 被围观 12,705 views+

需要分析来自许多Web服务器的Apache日志文件时。 虽然我们可以吧每个日志文件都复制到HDFS中, 但通常而言HADOOP处理单个大文件会比处理多个小文件更有效率。此外从分析的目的来看,我们吧日志数据视为一个大文件。 日志数据被分散在多个文件是由于WEB服务器采用分布式构架带来的副作用。一种解决办法是先将所有的文件合并,然后复制到HDFS。 可是文件合并需要占用本地计算机打来能干的磁盘空间爱你,如果我们能够在向HDFS复制的过程中合并他们 事情就简单的多。

应此,我们需要一个PutMerge类型的操作。Hadoop command tool supply 一个getmerge命令, 用于把一组HDFS文件复制到本地计算机以前进行合并。但我们想要的相反。故无法在Hadoop的文件爱你管理工具中获得。我们用HDFS API来变成实现它。

image

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class PutMerge {

public static void main(String[] args) throws IOException {

Configuration conf = new Configuration();

FileSystem hdfs  = FileSystem.get(conf);

FileSystem local = FileSystem.getLocal(conf);

Path inputDir = new Path(args[0]); // 设定输入目录与输出文件

Path hdfsFile = new Path(args[1]);

try {

FileStatus[] inputFiles = local.listStatus(inputDir); //得到本地文件列表

FSDataOutputStream out = hdfs.create(hdfsFile);   //生成HDFS输出流

for (int i=0; i

System.out.println(inputFiles[i].getPath().getName());

FSDataInputStream in = local.open(inputFiles[i].getPath()); //打开本地输入流

byte buffer[] = new byte[256];

int bytesRead = 0;

while( (bytesRead = in.read(buffer)) > 0) {

out.write(buffer, 0, bytesRead);

}

in.close();

}

out.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

PS

测试:

直接在Eclipse上跑会出出错:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:8020/user/training, expected: file:///

at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:410)

at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:56)

at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:335)

at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:493)

at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:378)

at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:365)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:584)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:565)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:472)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:464)

at my.utility.PutMerge.main(PutMerge.java:25)

查看源代码

static final URI NAME = URI.create("file:///");

是在比对的时候 总是会认为本地的scheme是 file:/// 而我给的是 hdfs:///就不匹配了

方法 1:

export 出class变成 jar 直接在 服务器上跑就可以了:

hadoop jar merge.jar my.utility.PutMerge /home/training/putmerge   /user/training/mergedata

方法 2 (eclipse:)

package my.utility;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class PutMerge {

public static void main(String[] args) throws IOException {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://localhost:8020/"); //设置fs配置

FileSystem hdfs  = FileSystem.get(conf);

FileSystem local = FileSystem.getLocal(conf);

Path inputDir = new Path(args[0]);//( "file://" + args[0]);

Path hdfsFile = new Path("hdfs://localhost:8020/" + args[1]);

try {

FileStatus[] inputFiles = local.listStatus(inputDir);

FSDataOutputStream out = hdfs.create(hdfsFile);

for (int i=0; i<inputFiles.length; i++) {

System.out.println(inputFiles[i].getPath().getName());

FSDataInputStream in = local.open(inputFiles[i].getPath());

byte buffer[] = new byte[256];

int bytesRead = 0;

while( (bytesRead = in.read(buffer)) > 0) {

out.write(buffer, 0, bytesRead);

}

in.close();

}

out.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

抱歉!评论已关闭.

×